- 问题描述
需要连接的表如下:其中左边是child,右边是parent,我们要做的是找出grandchild和grandparent的对应关系,为此需要进行表的连接。
Tom Lucy Tom Jim Lucy David Lucy Lili Jim Lilei Jim SuSan Lily Green Lily Bians Green Well Green MillShell Havid James James LiT Richard Cheng Cheng LiHua
- 思路分析
诚然,在写MR程序的时候要结合MR数据处理的一些特性。例如如果我们用默认的TextInputFormat来处理传入的文件数据,传入的格式是key为行号,value为这一行的值(如上例中的第一行,key为0,value为[Tom,Lucy]),在shuffle过程中,我们的值如果有相同的key,会merge到一起(这一点很重要!)。我们利用shuffle阶段的特性,merge到一组的数据够成一组关系,然后我们在这组关系中想办法区分晚辈和长辈,最后对merge里的value一一作处理,分离出grandchild和grandparent的关系。
例如,Tom Lucy传入处理后我们将其反转,成为Lucy Tom输出。当然,输出的时候,为了达到join的效果,我们要输出两份,因为join要两个表,一个表为L1:child parent,一个表为L2:child parent,为了达到关联的目的和利用shuffle阶段的特性,我们需要将L1反转,把parent放在前面,这样L1表中的parent和L2表中的child如果字段是相同的那么在shuffle阶段就能merge到一起。还有,为了区分merge到一起后如何区分child和parent,我们把L1表中反转后的child(未来的 grandchild)字段后面加一个1,L2表中parent(未来的grandparent)字段后加2。
1 package com.test.join; 2 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.Iterator; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 public class STJoin { 17 18 public static class STJoinMapper extends Mapper<Object, Text, Text, Text>{ 19 20 @Override 21 protected void map(Object key, Text value, Context context) 22 throws IOException, InterruptedException { 23 // TODO Auto-generated method stub 24 String[] rela = value.toString().trim().split(" ",2); 25 if(rela.length!=2) 26 return; 27 String child = rela[0]; 28 String parent = rela[1]; 29 context.write(new Text(parent), new Text((child+"1"))); 30 context.write(new Text(child), new Text((parent+"2"))); 31 32 } 33 34 } 35 public static class STJoinReducer extends Reducer<Text, Text, Text, Text>{ 36 37 @Override 38 protected void reduce(Text arg0, Iterable<Text> arg1,Context context) 39 throws IOException, InterruptedException { 40 // TODO Auto-generated method stub 41 ArrayList<String> grandParent = new ArrayList<>(); 42 ArrayList<String> grandChild = new ArrayList<>(); 43 Iterator<Text> iterator = arg1.iterator(); 44 while(iterator.hasNext()){ 45 String text = iterator.next().toString(); 46 if(text.endsWith("1")) 47 grandChild.add(text.substring(0, text.length()-1)); 48 if(text.endsWith("2")) 49 grandParent.add(text.substring(0, text.length()-1)); 50 } 51 52 for(String grandparent:grandParent){ 53 for(String grandchild:grandChild){ 54 context.write(new Text(grandchild), new Text(grandparent)); 55 } 56 } 57 } 58 } 59 60 61 public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException { 62 Configuration conf = new Configuration(); 63 Job job = new Job(conf,"STJoin"); 64 job.setMapperClass(STJoinMapper.class); 65 job.setReducerClass(STJoinReducer.class); 66 job.setOutputKeyClass(Text.class); 67 job.setOutputValueClass(Text.class); 68 FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/hadoop/STJoin/joinFile")); 69 FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/STJoin/joinResult")); 70 71 System.exit(job.waitForCompletion(true)?0:1); 72 } 73 }
- 结果显示
Richard LiHua
Lily Well
Lily MillShell
Havid LiT
Tom Lilei
Tom SuSan
Tom Lili
Tom David
以上代码在hadoop1.0.3平台实现