zoukankan      html  css  js  c++  java
  • MapReduce实现ReduceSideJoin操作

    本文转载于:http://blog.csdn.net/xyilu/article/details/8996204

    一.准备两张表以及对应的数据

    (1)m_ys_lab_jointest_a(以下简称表A)

    建表语句:

    create table if not exists m_ys_lab_jointest_a (  
         id bigint,  
         name string  
    )  
    row format delimited  
    fields terminated by '9'  
    lines terminated by '10'  
    stored as textfile; 

    具体数据如下:

    id    name
    1     北京
    2     天津
    3     河北
    4     山西
    5     内蒙古
    6     辽宁
    7     吉林
    8     黑龙江
     
     
     
     
     
     
     
     
     
     
    (2)m_ys_lab_jointest_b(以下简称表B)
    建表语句为:
    create table if not exists m_ys_lab_jointest_b (  
         id bigint,  
         statyear bigint,  
         num bigint  
    )  
    row format delimited  
    fields terminated by '9'  
    lines terminated by '10'  
    stored as textfile;  

     具体数据如下:

    id   statyear  num
    1     2010     1962
    1     2011     2019
    2     2010     1299
    2     2011     1355
    4     2010     3574
    4     2011     3593
    9     2010     2303
    9     2011     2347
    我们的目的是,以id为key做join操作,得到以下表:
    m_ys_lab_jointest_ab
    id     name    statyear     num
    1       北京    2011    2019
    1       北京    2010    1962
    2       天津    2011    1355
    2       天津    2010    1299
    4       山西    2011    3593
    4       山西    2010    3574

    二.计算模型

    整个计算过程是:

    (1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。
    (2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。
    如下图所示:

    上代码:

     1 import java.io.IOException;
     2 import java.util.ArrayList;
     3 import java.util.Iterator;
     4 import java.util.List;
     5 
     6 import org.apache.hadoop.io.LongWritable;
     7 import org.apache.hadoop.io.Text;
     8 import org.apache.hadoop.mapred.FileSplit;
     9 import org.apache.hadoop.mapred.JobConf;
    10 import org.apache.hadoop.mapred.MapReduceBase;
    11 import org.apache.hadoop.mapred.Mapper;
    12 import org.apache.hadoop.mapred.OutputCollector;
    13 import org.apache.hadoop.mapred.Reducer;
    14 import org.apache.hadoop.mapred.Reporter;  
    15   
    16 /** 
    17  * MapReduce实现Join操作 
    18  */  
    19 public class MapRedJoin {  
    20     public static final String DELIMITER = "u0009"; // 字段分隔符  
    21       
    22     // map过程  
    23     public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {  
    24         public void configure(JobConf job) {  
    25             super.configure(job);  
    26         }  
    27           
    28         public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, ClassCastException {  
    29             // 获取输入文件的全路径和名称  
    30             String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();  
    31             // 获取记录字符串  
    32             String line = value.toString();  
    33             // 抛弃空记录  
    34             if (line == null || line.equals("")){
    35                 return;
    36             }   
    37             // 处理来自表A的记录  
    38             if (filePath.contains("m_ys_lab_jointest_a")) {  
    39                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
    40                 if (values.length < 2){
    41                     return;
    42                 }   
    43                 String id = values[0]; // id  
    44                 String name = values[1]; // name  
    45                 output.collect(new Text(id), new Text("a#"+name));  
    46             } else if (filePath.contains("m_ys_lab_jointest_b")) {// 处理来自表B的记录
    47                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
    48                 if (values.length < 3){
    49                     return;
    50                 }  
    51                 String id = values[0]; // id  
    52                 String statyear = values[1]; // statyear  
    53                 String num = values[2]; //num  
    54                 output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));  
    55             }  
    56         }  
    57     }  
    58       
    59     // reduce过程  
    60     public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {  
    61         public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {  
    62             List<String> listA = new ArrayList<String>(); // 存放来自表A的值  
    63             List<String> listB = new ArrayList<String>(); // 存放来自表B的值  
    64             while (values.hasNext()) {
    65                 String value = values.next().toString();  
    66                 if (value.startsWith("a#")) {  
    67                     listA.add(value.substring(2));  
    68                 } else if (value.startsWith("b#")) {  
    69                     listB.add(value.substring(2));  
    70                 }  
    71             }
    72             int sizeA = listA.size();  
    73             int sizeB = listB.size();  
    74             // 遍历两个向量  
    75             int i, j;  
    76             for (i = 0; i < sizeA; i ++) {  
    77                 for (j = 0; j < sizeB; j ++) {  
    78                     output.collect(key, new Text(listA.get(i) + DELIMITER +listB.get(j)));  
    79                 }  
    80             }     
    81         }  
    82     }  
    83       
    84     protected void configJob(JobConf conf) {  
    85         conf.setMapOutputKeyClass(Text.class);  
    86         conf.setMapOutputValueClass(Text.class);  
    87         conf.setOutputKeyClass(Text.class);  
    88         conf.setOutputValueClass(Text.class);  
    89         conf.setOutputFormat(ReportOutFormat.class);  
    90     }  
    91 }  

     三.技术细节

    下面说一下其中的若干技术细节:
    (1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下:
    String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
    (2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。
    (3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。
  • 相关阅读:
    计算机图形学和OpenGL(二)坐标系和绘制点线函数
    计算机图形学和OpenGL(一)OpenGL初步
    C++ 实现链表常用功能
    Cocos2d-x环境搭建
    2014年学习计划
    2013年终总结
    AS3开发必须掌握的内容
    starling性能优化
    后补个2012年的总结吧
    原生javascript实现图片懒加载
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/7692042.html
Copyright © 2011-2022 走看看