zoukankan      html  css  js  c++  java
  • Win7中使用Eclipse连接虚拟机中的Ubuntu中的Hadoop2.4<3>

    • 经过前几天的学习,基本上能够小试牛刀编写一些小程序玩一玩了,在此之前做几项准备工作
    1. 明白我要用hadoop干什么
    2. 大体学习一下mapreduce
    3. ubuntu重新启动后,再启动hadoop会报连接异常的问题
    • 答:
    1. 数据提炼、探索数据、挖掘数据
    2. map=切碎,reduce=合并
    3. 重新启动后会清空tmp目录,默认namenode会存在这里,须要在core-site.xml文件里添加(别忘了创建目录,没权限的话,须要用root创建并把权限改成777):
      <property>
           <name>hadoop.tmp.dir</name>
           <value>/usr/local/hadoop/tmp</value>
      </property>
    • 大数据,我的第一反应是现有关系型数据库中的数据怎么跟hadoop结合使用,网上搜了一些资料,使用的是DBInputFormat,那就简单编写一个从数据库读取数据,然后经过处理后,生成文件的小样例吧
    • 数据库弄的简单一点吧,id是数值整型、test是字符串型,需求非常easy,统计TEST字段出现的数量



    • 数据读取类:
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    
    public class DBRecoder implements Writable, DBWritable{
    	String test;
    	int id;
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(test);
    		out.writeInt(id);
    	}
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		test = in.readUTF();
    		id = in.readInt();
    	}
    	@Override
    	public void readFields(ResultSet arg0) throws SQLException {
    		test = arg0.getString("test");
    		id = arg0.getInt("id");
    	}
    	@Override
    	public void write(PreparedStatement arg0) throws SQLException {
    		arg0.setString(1, test);
    		arg0.setInt(2, id);
    	}
    }
    • mapreduce操作类
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class DataCountTest {
    	public static class TokenizerMapper extends Mapper<LongWritable, DBRecoder, Text, IntWritable> {
    		public void map(LongWritable key, DBRecoder value, Context context) throws IOException, InterruptedException {
    			context.write(new Text(value.test), new IntWritable(1));
    		}
    	}
    
    	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    		private IntWritable result = new IntWritable();
    
    		public void reduce(Text key, Iterable<IntWritable> values,
    				Context context) throws IOException, InterruptedException {
    			int sum = 0;
    			for (IntWritable val : values) {
    				sum += val.get();
    			}
    			result.set(sum);
    			context.write(key, result);
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		args = new String[1];
    		args[0] = "hdfs://192.168.203.137:9000/user/chenph/output1111221";
    
    		Configuration conf = new Configuration();
    		
            DBConfiguration.configureDB(conf, "oracle.jdbc.driver.OracleDriver",  
                    "jdbc:oracle:thin:@192.168.101.179:1521:orcl", "chenph", "chenph");  
    		
    		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
    		Job job = new Job(conf, "DB count");
    		
    		job.setJarByClass(DataCountTest.class);
    		job.setMapperClass(TokenizerMapper.class);
    		job.setReducerClass(IntSumReducer.class); 
    		job.setOutputKeyClass(Text.class); 
    		job.setOutputValueClass(IntWritable.class);
    		job.setMapOutputKeyClass(Text.class);  
    		job.setMapOutputValueClass(IntWritable.class);  
            String[] fields1 = { "id", "test"};  
            DBInputFormat.setInput(job, DBRecoder.class, "t1", null, "id",  fields1);  
    
    		FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
    		
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    }
    
    --------------------------------------------------------------------------------------------------
    开发过程中遇到的问题:
    1. Job被标记为已作废,那应该用什么我还没有查到
    2. 乱码问题,hadoop默认是utf8格式的,假设读取的是gbk的须要进行处理
    3. 这类样例网上挺少的,有也是老版的,新版的资料没有,我全然是拼凑出来的,非常多地方还不甚了解,须要进一步学习官方资料
    4. 搜索资料时,有资料说不建议採用这样的方式处理实际的大数据问题,原因就是并发过高,会瞬间秒杀掉数据库,一般都会採用导成文本文件的形式


  • 相关阅读:
    .net core读取appsettings.config中文乱码问题
    vs2017错误:当前页面的脚本发生错误
    VS Code中无法识别npm命令
    Visual Studio报错/plugin.vs.js,行:1074,错误:缺少标识符、字符串或数字
    记录一次在生成数据库服务器上出现The timeout period elapsed prior to completion of the operation or the server is not responding.和Exception has been thrown by the target of an invocation的解决办法
    Java集合框架
    java hash表
    Java Dictionary 类存储键值
    java数据结构 栈stack
    java封装
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/3763413.html
Copyright © 2011-2022 走看看