zoukankan      html  css  js  c++  java
  • 【原创】MapReduce编程系列之表连接

    • 问题描述

            需要连接的表如下:其中左边是child,右边是parent,我们要做的是找出grandchild和grandparent的对应关系,为此需要进行表的连接。


    Tom Lucy
    Tom Jim
    Lucy David
    Lucy Lili
    Jim Lilei
    Jim SuSan
    Lily Green
    Lily Bians
    Green Well
    Green MillShell
    Havid James
    James LiT
    Richard Cheng
    Cheng LiHua
     
    • 思路分析
      诚然,在写MR程序的时候要结合MR数据处理的一些特性。例如如果我们用默认的TextInputFormat来处理传入的文件数据,传入的格式是key为行号,value为这一行的值(如上例中的第一行,key为0,value为[Tom,Lucy]),在shuffle过程中,我们的值如果有相同的key,会merge到一起(这一点很重要!)。我们利用shuffle阶段的特性,merge到一组的数据够成一组关系,然后我们在这组关系中想办法区分晚辈和长辈,最后对merge里的value一一作处理,分离出grandchild和grandparent的关系。
           例如,Tom Lucy传入处理后我们将其反转,成为Lucy Tom输出。当然,输出的时候,为了达到join的效果,我们要输出两份,因为join要两个表,一个表为L1:child parent,一个表为L2:child parent,为了达到关联的目的和利用shuffle阶段的特性,我们需要将L1反转,把parent放在前面,这样L1表中的parent和L2表中的child如果字段是相同的那么在shuffle阶段就能merge到一起。还有,为了区分merge到一起后如何区分child和parent,我们把L1表中反转后的child(未来的 grandchild)字段后面加一个1,L2表中parent(未来的grandparent)字段后加2。
     1 package com.test.join;
     2 
     3 import java.io.IOException;
     4 import java.util.ArrayList;
     5 import java.util.Iterator;
     6 
     7 import org.apache.hadoop.conf.Configuration;
     8 import org.apache.hadoop.fs.Path;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Job;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 import org.apache.hadoop.mapreduce.Reducer;
    13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    15 
    16 public class STJoin {
    17 
    18     public static class STJoinMapper extends Mapper<Object, Text, Text, Text>{
    19 
    20         @Override
    21         protected void map(Object key, Text value, Context context)
    22                 throws IOException, InterruptedException {
    23             // TODO Auto-generated method stub
    24             String[] rela = value.toString().trim().split(" ",2);
    25             if(rela.length!=2)
    26                 return;
    27             String child = rela[0];
    28             String parent = rela[1];
    29             context.write(new Text(parent), new Text((child+"1")));
    30             context.write(new Text(child), new Text((parent+"2")));
    31             
    32         }
    33         
    34     }
    35     public static class STJoinReducer extends Reducer<Text, Text, Text, Text>{
    36 
    37         @Override
    38         protected void reduce(Text arg0, Iterable<Text> arg1,Context context)
    39                 throws IOException, InterruptedException {
    40             // TODO Auto-generated method stub
    41             ArrayList<String> grandParent = new ArrayList<>();
    42             ArrayList<String> grandChild = new ArrayList<>();
    43             Iterator<Text> iterator = arg1.iterator();
    44             while(iterator.hasNext()){
    45                 String text = iterator.next().toString();
    46                 if(text.endsWith("1"))
    47                     grandChild.add(text.substring(0, text.length()-1));
    48                 if(text.endsWith("2"))
    49                     grandParent.add(text.substring(0, text.length()-1));
    50             }
    51             
    52             for(String grandparent:grandParent){
    53                 for(String grandchild:grandChild){
    54                     context.write(new Text(grandchild), new Text(grandparent));
    55                 }
    56             }
    57         }
    58     }
    59     
    60     
    61     public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {
    62         Configuration conf = new Configuration();
    63         Job job = new Job(conf,"STJoin");
    64         job.setMapperClass(STJoinMapper.class);
    65         job.setReducerClass(STJoinReducer.class);
    66         job.setOutputKeyClass(Text.class);
    67         job.setOutputValueClass(Text.class);
    68         FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/hadoop/STJoin/joinFile"));
    69         FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/STJoin/joinResult"));
    70         
    71         System.exit(job.waitForCompletion(true)?0:1);
    72     }
    73 }
    • 结果显示

     

    Richard    LiHua
    Lily    Well
    Lily    MillShell
    Havid    LiT
    Tom    Lilei
    Tom    SuSan
    Tom    Lili
    Tom    David

     以上代码在hadoop1.0.3平台实现

    转载请注明:http://www.cnblogs.com/gslyyq/
  • 相关阅读:
    System.Data.SQLite数据库介绍
    php 链接mssql问题 ntext不能读取
    使用NeatUpload上传文件
    js 和后台交互
    oracle学习第五天【RMAN】
    oracle学习第三天【sqlplus常用命令】
    js操作url(window.location)
    jquery资料收集【转】
    php学习3字符串
    linux read命令 小记
  • 原文地址:https://www.cnblogs.com/gslyyq/p/mapreduce.html
Copyright © 2011-2022 走看看