zoukankan      html  css  js  c++  java
  • MapReduce数据连接

    对于不同文件里的数据,有时候有相应关系,须要进行连接(join),获得一个新的文件以便进行分析。比方有两个输入文件a.txt,b.txt,当中的数据格式分别例如以下

    1 a
    2 b
    3 c
    4 d

    1 good
    2 bad
    3 ok
    4 hello

    须要将其连接成一个新的例如以下的文件:

    a good
    b bad
    c ok
    d hello

    处理步骤能够分成两步:

    1.map阶段,将两个输入文件里的数据进行打散,例如以下:

    1 a
    1 good
    2 b
    2 bad
    3 c
    3 ok
    4 d
    4 hello

    2.reduce阶段,进行数据的连接操作,此处数据较简单,仅仅要推断map结果的value的长度是不是1就决定是新的键还是值。

    package cn.zhf.hadoop;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class SingleJoin extends Configured implements Tool{
    	public static void main(String[] args) throws Exception {
    		Tool tool = new SingleJoin();
    		ToolRunner.run(tool, args);
    		print(tool);
    	}
    
    	@Override
    	public int run(String[] arg0) throws Exception {
    		Configuration conf = getConf();
    		Job job = new Job();
    		job.setJarByClass(getClass());
    		FileSystem fs = FileSystem.get(conf);
    		fs.delete(new Path("out"),true);
    		FileInputFormat.addInputPath(job, new Path("a.txt"));
    		FileInputFormat.addInputPath(job, new Path("b.txt"));
    		FileOutputFormat.setOutputPath(job,new Path("out"));
    		
    		job.setMapperClass(JoinMapper.class);
    		job.setReducerClass(JoinReducer.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		job.waitForCompletion(true);
    		return 0;
    	}
    	public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
    		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
    			String[] str = value.toString().split(" ");
    			context.write(new Text(str[0]), new Text(str[1]));
    		}
    	}
    
    	public static class JoinReducer extends Reducer<Text,Text,Text,Text>{
    		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
    			Iterator<Text> iterator = values.iterator();
    			Text keyy = new Text();
    			Text valuee = new Text();
    			while(iterator.hasNext()){
    				Text temp = iterator.next();
    				if(temp.toString().length() == 1){
    					keyy.set(temp);
    					valuee.set(iterator.next());
    				}else{
    					valuee.set(temp);
    					keyy.set(iterator.next());
    				}
    			}
    			context.write(keyy, valuee);
    		}
    	}
    	public static void print(Tool tool) throws IOException{
    		FileSystem fs = FileSystem.get(tool.getConf());
    		Path path = new Path("out/part-r-00000");
    		FSDataInputStream fsin = fs.open(path);
    		int length = 0;
    		byte[] buff = new byte[128];
    		while((length = fsin.read(buff,0,128)) != -1)
    			System.out.println(new String(buff,0,length));
    	}
    }
    

    reference:《MapReduce2.0源代码分析及编程实践》

  • 相关阅读:
    miniconda安装和使用
    linux下git push出现“更新被拒绝,因为远程仓库包含您本地尚不存在的提交。”问题的处理
    win8、win10系统添加组策略的方法
    Unable to guess the mime type as no guessers are available (Did you enable the php_fileinfo extension?)
    thinkphp5 连接SQLserver
    thinkphp5 上传图片压缩
    在Vue中使用了Swiper ,从后台获取动态数据后,swiper滑动失效
    微信小程序多图上传及后台处理(后台用thinkphp3.2)
    PHP 数组下标从0开始
    微信小程序去除左上角返回的按钮
  • 原文地址:https://www.cnblogs.com/lcchuguo/p/4006430.html
Copyright © 2011-2022 走看看