zoukankan      html  css  js  c++  java
  • MapReduce数据处理两表join连接 (Ruduce端连接)

    http://blog.csdn.net/qq272936993/article/details/7457553


    现在这里有两个text文档,需要把它合并成一个文档,并且里面的数据不能有冗余..


    user.txt文件: 

    UserId       UserName   DepNo
    10000000     Li         1001
    10000001     Wang       1001
    10000002     Zhang      1002
    10000003     Wei        1004
    10000004     He         1003
    10000005     Jin        1002



    depart.txt文件: 

    DepNo        DepName
    1001         Develop
    1002         Test
    1003         HR
    1004         Market 



    生成文件: 

    10000000     Li         1001       Develop
    10000001     Wang       1001       Develop
    10000002     Zhang      1002       Test
    10000003     Wei        1004       Market
    10000004     He         1003       HR
    10000005     Jin        1002       Test



    因为user.txt文档的第3个字段与depart.txt的第1个字段是相同的, 所以我把他们做为key值. 

     

    public class Advanced extends Configured implements Tool {
    
    	public static class AdMap extends Mapper<LongWritable, Text, Text, TextPair>{
    
    		@Override
    		protected void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    		 	String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();	
    		 	String line = value.toString();
    		 	String[] childline = line.split(" ");    //以空格截取			       
                            if(filePath.contains("user.txt") ){  //判断是哪一张表
    				TextPair pair = new TextPair();
    				pair.setFlag("0");         //这是个标识   0.表示 user.txt     1表示depart.txt
    				pair.setKey(childline[2]);
    				pair.setValue(childline[0]+" "+childline[1]);
    				pair.setContent(pair.toString());	
                               context.write(new Text(pair.getKey()), pair);
    			}else if (filePath.contains("depart.txt")){
    			 	TextPair pair = new  TextPair();
    				pair.setFlag("1");
    				pair.setKey(childline[0]);
    				pair.setValue(childline[0]+" " +childline[1]);
    				pair.setContent(pair.toString()); 
    				context.write(new Text(pair.getKey()), pair);
    		       }		
    			
    		}		
    	}
    	
    	public static class AdReduce extends Reducer<Text, TextPair, Text, Text>{
    
    		@Override
    		public void reduce(Text key, Iterable<TextPair> values,
    				Context context)
    				throws IOException, InterruptedException {
    			 
                            List<Text> listUser = new ArrayList<Text>();     
    			List<Text> listDepart = new ArrayList<Text>();
    			Iterator<TextPair> it = values.iterator();
    			TextPair pair = new TextPair();
    			while(it.hasNext()){
    				pair = it.next();
    				if("0".equals(pair.getFlag())){
    				    listUser.add(new Text(pair.getValue()));
                                }
    				else {
    				    listDepart.add(new Text(pair.getValue()));
                                }
    			}
    			
    			for(int i = 0 ; i<listUser.size(); i++){
    			    for(int j = 0 ;j<listDepart.size();j++){ i 
    				context.write(key, new Text(listUser.get(j)+" " +listDepart.get(i)));
    			    }
    			}
    			
    			
    		}
    		
    	}
    	 
    	public static void main(String[] args) {
    		try {
                       int res = ToolRunner.run(new Configuration(), new Advanced(), args);
    			System.exit(res);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		
    		FileSystem fs = FileSystem.get(conf);
    		if(fs.exists(new Path(args[2]))){
    			System.out.println("error : file is exists");
    			System.exit(-1);
    		}
    		
    		Job job = new Job(conf , "Advanced");
    		job.setJarByClass(Advanced.class);
    		job.setMapperClass(AdMap.class);
    		job.setReducerClass(AdReduce.class);
    		
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(TextPair.class);
    		
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		
    		FileInputFormat.setInputPaths(job, new Path(args[0]),new Path(args[1]));
    		FileOutputFormat.setOutputPath(job, new Path(args[2]));
    		return job.waitForCompletion(true) ? 0 : 1;
    	} 
    }
     		


    class TextPair implements WritableComparable<TextPair>{
    
    		public String getValue() {
    			return value;
    		}
    
    		public void setValue(String value) {
    			this.value = value;
    		}
    
    		@Override
    		public String toString() {
    			return " " + key +" "+ value; 
    		}
    
    		public String getFlag() {
    			return flag;
    		}
    
    		public void setFlag(String flag) {
    			this.flag = flag;
    		}
    
    		public String getKey() {
    			return key;
    		}
    
    		public void setKey(String key) {
    			this.key = key;
    		}
    
    		public String getContent() {
    			return content;
    		}
    
    		public void setContent(String content) {
    			this.content = content;
    		}
    
    		private String flag = "";
    		private String key ="";
    		private String value ="";
    		private String content = "";
    		
    		
    
    		public TextPair(String flag, String key, String value, String content) {
    			this.flag = flag;
    			this.key = key;
    			this.value = value;
    			this.content = content;
    		}
    
    		public TextPair() {
    		}
    
    		@Override
    		public void write(DataOutput out) throws IOException {
    			// TODO Auto-generated method stub
    			out.writeUTF(this.flag);
    			out.writeUTF(this.key);
    			out.writeUTF(this.value);
    			out.writeUTF(this.content);
    		}
    
    		@Override
    		public void readFields(DataInput in) throws IOException {
    			// TODO Auto-generated method stub
    			this.flag = in.readUTF();
    			this.key = in.readUTF();
    			this.value = in.readUTF();
    			this.content = in.readUTF();
    		}
    
    		@Override
    		public int compareTo(TextPair o) {
    			// TODO Auto-generated method stub
    			return 0;
    		}
    		
    		
    	}


  • 相关阅读:
    SharedPreferences数据、openFileOutput文件、SQLite数据库文件存储位置
    Android Activity 生命周期的透彻理解
    Android中半透明Activity效果另法
    android开发3:四大基本组件的介绍与生命周期
    Activity生命周期的学习以及Logcat的使用
    Android 之 Window、WindowManager 与窗口管理
    Android相关类关系
    android应用开发之Window,View和WindowManager .
    Android窗口管理服务WindowManagerService计算Activity窗口大小的过程分析
    Android应用程序窗口(Activity)的窗口对象(Window) 的创建过程分析
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276278.html
Copyright © 2011-2022 走看看