zoukankan      html  css  js  c++  java
  • MapReduce数据连接

    对于不同文件里的数据,有时候有相应关系,须要进行连接(join),获得一个新的文件以便进行分析。比方有两个输入文件a.txt,b.txt,当中的数据格式分别例如以下

    1 a
    2 b
    3 c
    4 d

    1 good
    2 bad
    3 ok
    4 hello

    须要将其连接成一个新的例如以下的文件:

    a good
    b bad
    c ok
    d hello

    处理步骤能够分成两步:

    1.map阶段,将两个输入文件里的数据进行打散,例如以下:

    1 a
    1 good
    2 b
    2 bad
    3 c
    3 ok
    4 d
    4 hello

    2.reduce阶段,进行数据的连接操作,此处数据较简单,仅仅要推断map结果的value的长度是不是1就决定是新的键还是值。

    package cn.zhf.hadoop;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class SingleJoin extends Configured implements Tool{
    	public static void main(String[] args) throws Exception {
    		Tool tool = new SingleJoin();
    		ToolRunner.run(tool, args);
    		print(tool);
    	}
    
    	@Override
    	public int run(String[] arg0) throws Exception {
    		Configuration conf = getConf();
    		Job job = new Job();
    		job.setJarByClass(getClass());
    		FileSystem fs = FileSystem.get(conf);
    		fs.delete(new Path("out"),true);
    		FileInputFormat.addInputPath(job, new Path("a.txt"));
    		FileInputFormat.addInputPath(job, new Path("b.txt"));
    		FileOutputFormat.setOutputPath(job,new Path("out"));
    		
    		job.setMapperClass(JoinMapper.class);
    		job.setReducerClass(JoinReducer.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		job.waitForCompletion(true);
    		return 0;
    	}
    	public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
    		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
    			String[] str = value.toString().split(" ");
    			context.write(new Text(str[0]), new Text(str[1]));
    		}
    	}
    
    	public static class JoinReducer extends Reducer<Text,Text,Text,Text>{
    		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
    			Iterator<Text> iterator = values.iterator();
    			Text keyy = new Text();
    			Text valuee = new Text();
    			while(iterator.hasNext()){
    				Text temp = iterator.next();
    				if(temp.toString().length() == 1){
    					keyy.set(temp);
    					valuee.set(iterator.next());
    				}else{
    					valuee.set(temp);
    					keyy.set(iterator.next());
    				}
    			}
    			context.write(keyy, valuee);
    		}
    	}
    	public static void print(Tool tool) throws IOException{
    		FileSystem fs = FileSystem.get(tool.getConf());
    		Path path = new Path("out/part-r-00000");
    		FSDataInputStream fsin = fs.open(path);
    		int length = 0;
    		byte[] buff = new byte[128];
    		while((length = fsin.read(buff,0,128)) != -1)
    			System.out.println(new String(buff,0,length));
    	}
    }
    

    reference:《MapReduce2.0源代码分析及编程实践》

  • 相关阅读:
    hibernate中的配置参数详解
    js 提示框
    Caused by: java.sql.SQLException: 数字溢出
    什么是Assembly(程序集)?
    我的邮箱
    hdu 3746(KMP的循环节问题)
    hdu 1176(一道简单的dp)
    hdu 1385(求出最短路并输出最短路径)
    hdu 1003(最大连续字串)
    hdu 4512(最长公共递增子序列加强版)
  • 原文地址:https://www.cnblogs.com/lcchuguo/p/4006430.html
Copyright © 2011-2022 走看看