zoukankan      html  css  js  c++  java
  • MapReduce数据处理两表join连接 (Ruduce端连接)

    http://blog.csdn.net/qq272936993/article/details/7457553


    现在这里有两个text文档,需要把它合并成一个文档,并且里面的数据不能有冗余..


    user.txt文件: 

    UserId       UserName   DepNo
    10000000     Li         1001
    10000001     Wang       1001
    10000002     Zhang      1002
    10000003     Wei        1004
    10000004     He         1003
    10000005     Jin        1002



    depart.txt文件: 

    DepNo        DepName
    1001         Develop
    1002         Test
    1003         HR
    1004         Market 



    生成文件: 

    10000000     Li         1001       Develop
    10000001     Wang       1001       Develop
    10000002     Zhang      1002       Test
    10000003     Wei        1004       Market
    10000004     He         1003       HR
    10000005     Jin        1002       Test



    因为user.txt文档的第3个字段与depart.txt的第1个字段是相同的, 所以我把他们做为key值. 

     

    public class Advanced extends Configured implements Tool {
    
    	public static class AdMap extends Mapper<LongWritable, Text, Text, TextPair>{
    
    		@Override
    		protected void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    		 	String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();	
    		 	String line = value.toString();
    		 	String[] childline = line.split(" ");    //以空格截取			       
                            if(filePath.contains("user.txt") ){  //判断是哪一张表
    				TextPair pair = new TextPair();
    				pair.setFlag("0");         //这是个标识   0.表示 user.txt     1表示depart.txt
    				pair.setKey(childline[2]);
    				pair.setValue(childline[0]+" "+childline[1]);
    				pair.setContent(pair.toString());	
                               context.write(new Text(pair.getKey()), pair);
    			}else if (filePath.contains("depart.txt")){
    			 	TextPair pair = new  TextPair();
    				pair.setFlag("1");
    				pair.setKey(childline[0]);
    				pair.setValue(childline[0]+" " +childline[1]);
    				pair.setContent(pair.toString()); 
    				context.write(new Text(pair.getKey()), pair);
    		       }		
    			
    		}		
    	}
    	
    	public static class AdReduce extends Reducer<Text, TextPair, Text, Text>{
    
    		@Override
    		public void reduce(Text key, Iterable<TextPair> values,
    				Context context)
    				throws IOException, InterruptedException {
    			 
                            List<Text> listUser = new ArrayList<Text>();     
    			List<Text> listDepart = new ArrayList<Text>();
    			Iterator<TextPair> it = values.iterator();
    			TextPair pair = new TextPair();
    			while(it.hasNext()){
    				pair = it.next();
    				if("0".equals(pair.getFlag())){
    				    listUser.add(new Text(pair.getValue()));
                                }
    				else {
    				    listDepart.add(new Text(pair.getValue()));
                                }
    			}
    			
    			for(int i = 0 ; i<listUser.size(); i++){
    			    for(int j = 0 ;j<listDepart.size();j++){ i 
    				context.write(key, new Text(listUser.get(j)+" " +listDepart.get(i)));
    			    }
    			}
    			
    			
    		}
    		
    	}
    	 
    	public static void main(String[] args) {
    		try {
                       int res = ToolRunner.run(new Configuration(), new Advanced(), args);
    			System.exit(res);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		
    		FileSystem fs = FileSystem.get(conf);
    		if(fs.exists(new Path(args[2]))){
    			System.out.println("error : file is exists");
    			System.exit(-1);
    		}
    		
    		Job job = new Job(conf , "Advanced");
    		job.setJarByClass(Advanced.class);
    		job.setMapperClass(AdMap.class);
    		job.setReducerClass(AdReduce.class);
    		
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(TextPair.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		
    		FileInputFormat.setInputPaths(job, new Path(args[0]),new Path(args[1]));
    		FileOutputFormat.setOutputPath(job, new Path(args[2]));
    		return job.waitForCompletion(true) ? 0 : 1;
    	} 
    }
     		


    class TextPair implements WritableComparable<TextPair>{
    
    		public String getValue() {
    			return value;
    		}
    
    		public void setValue(String value) {
    			this.value = value;
    		}
    
    		@Override
    		public String toString() {
    			return " " + key +" "+ value; 
    		}
    
    		public String getFlag() {
    			return flag;
    		}
    
    		public void setFlag(String flag) {
    			this.flag = flag;
    		}
    
    		public String getKey() {
    			return key;
    		}
    
    		public void setKey(String key) {
    			this.key = key;
    		}
    
    		public String getContent() {
    			return content;
    		}
    
    		public void setContent(String content) {
    			this.content = content;
    		}
    
    		private String flag = "";
    		private String key ="";
    		private String value ="";
    		private String content = "";
    		
    		
    
    		public TextPair(String flag, String key, String value, String content) {
    			this.flag = flag;
    			this.key = key;
    			this.value = value;
    			this.content = content;
    		}
    
    		public TextPair() {
    		}
    
    		@Override
    		public void write(DataOutput out) throws IOException {
    			// TODO Auto-generated method stub
    			out.writeUTF(this.flag);
    			out.writeUTF(this.key);
    			out.writeUTF(this.value);
    			out.writeUTF(this.content);
    		}
    
    		@Override
    		public void readFields(DataInput in) throws IOException {
    			// TODO Auto-generated method stub
    			this.flag = in.readUTF();
    			this.key = in.readUTF();
    			this.value = in.readUTF();
    			this.content = in.readUTF();
    		}
    
    		@Override
    		public int compareTo(TextPair o) {
    			// TODO Auto-generated method stub
    			return 0;
    		}
    		
    		
    	}


  • 相关阅读:
    114. Flatten Binary Tree to Linked List 把二叉树变成链表
    426. Convert Binary Search Tree to Sorted Doubly Linked List把bst变成双向链表
    微服务之如何建模微服务
    我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=3t37r4hauhq8c
    剑指offer之面试题2:实现Singleton模式
    微服务之演化式架构师(二)
    ASP.NET Core 框架本质学习
    java之maven之maven的使用
    java之maven之初识maven
    java之mybatis整合spring
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276278.html
Copyright © 2011-2022 走看看