zoukankan      html  css  js  c++  java
  • Win7中使用Eclipse连接虚拟机中的Ubuntu中的Hadoop2.4<3>

    • 经过前几天的学习,基本上能够小试牛刀编写一些小程序玩一玩了,在此之前做几项准备工作
    1. 明白我要用hadoop干什么
    2. 大体学习一下mapreduce
    3. ubuntu重新启动后,再启动hadoop会报连接异常的问题
    • 答:
    1. 数据提炼、探索数据、挖掘数据
    2. map=切碎,reduce=合并
    3. 重新启动后会清空tmp目录,默认namenode会存在这里,须要在core-site.xml文件里添加(别忘了创建目录,没权限的话,须要用root创建并把权限改成777):
      <property>
           <name>hadoop.tmp.dir</name>
           <value>/usr/local/hadoop/tmp</value>
      </property>
    • 大数据,我的第一反应是现有关系型数据库中的数据怎么跟hadoop结合使用,网上搜了一些资料,使用的是DBInputFormat,那就简单编写一个从数据库读取数据,然后经过处理后,生成文件的小样例吧
    • 数据库弄的简单一点吧,id是数值整型、test是字符串型,需求非常easy,统计TEST字段出现的数量



    • 数据读取类:
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    
    public class DBRecoder implements Writable, DBWritable{
    	String test;
    	int id;
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(test);
    		out.writeInt(id);
    	}
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		test = in.readUTF();
    		id = in.readInt();
    	}
    	@Override
    	public void readFields(ResultSet arg0) throws SQLException {
    		test = arg0.getString("test");
    		id = arg0.getInt("id");
    	}
    	@Override
    	public void write(PreparedStatement arg0) throws SQLException {
    		arg0.setString(1, test);
    		arg0.setInt(2, id);
    	}
    }
    • mapreduce操作类
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class DataCountTest {
    	public static class TokenizerMapper extends Mapper<LongWritable, DBRecoder, Text, IntWritable> {
    		public void map(LongWritable key, DBRecoder value, Context context) throws IOException, InterruptedException {
    			context.write(new Text(value.test), new IntWritable(1));
    		}
    	}
    
    	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    		private IntWritable result = new IntWritable();
    
    		public void reduce(Text key, Iterable<IntWritable> values,
    				Context context) throws IOException, InterruptedException {
    			int sum = 0;
    			for (IntWritable val : values) {
    				sum += val.get();
    			}
    			result.set(sum);
    			context.write(key, result);
    		}
    	}
    
    	public static void main(String[] args) throws Exception {
    		args = new String[1];
    		args[0] = "hdfs://192.168.203.137:9000/user/chenph/output1111221";
    
    		Configuration conf = new Configuration();
    		
            DBConfiguration.configureDB(conf, "oracle.jdbc.driver.OracleDriver",  
                    "jdbc:oracle:thin:@192.168.101.179:1521:orcl", "chenph", "chenph");  
    		
    		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
    		Job job = new Job(conf, "DB count");
    		
    		job.setJarByClass(DataCountTest.class);
    		job.setMapperClass(TokenizerMapper.class);
    		job.setReducerClass(IntSumReducer.class); 
    		job.setOutputKeyClass(Text.class); 
    		job.setOutputValueClass(IntWritable.class);
    		job.setMapOutputKeyClass(Text.class);  
    		job.setMapOutputValueClass(IntWritable.class);  
            String[] fields1 = { "id", "test"};  
            DBInputFormat.setInput(job, DBRecoder.class, "t1", null, "id",  fields1);  
    
    		FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
    		
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    }
    
    --------------------------------------------------------------------------------------------------
    开发过程中遇到的问题:
    1. Job被标记为已作废,那应该用什么我还没有查到
    2. 乱码问题,hadoop默认是utf8格式的,假设读取的是gbk的须要进行处理
    3. 这类样例网上挺少的,有也是老版的,新版的资料没有,我全然是拼凑出来的,非常多地方还不甚了解,须要进一步学习官方资料
    4. 搜索资料时,有资料说不建议採用这样的方式处理实际的大数据问题,原因就是并发过高,会瞬间秒杀掉数据库,一般都会採用导成文本文件的形式


  • 相关阅读:
    FastAPI(60)- 针对 WebSocket 进行单元测试
    FastAPI(59)- 详解使用 OAuth2PasswordBearer + JWT 认证
    FastAPI(58)- 使用 OAuth2PasswordBearer 的简单栗子
    FastAPI(57)- 安全相关的概念
    FastAPI(56)- 使用 Websocket 打造一个迷你聊天室
    FastAPI(55)- Events: startup
    FastAPI(54)- 详解 Request 请求对象
    FastAPI(53)- Response Headers 响应设置 Headers
    FastAPI(52)- Response Cookies 响应设置 Cookies
    FastAPI(51)- 自定义响应之 StreamingResponse、FileResponse
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/3763413.html
Copyright © 2011-2022 走看看