zoukankan      html  css  js  c++  java
  • mapreduce join操作

    上次和朋友讨论到mapreduce,join应该发生在map端,理由太想当然到sql里面的执行过程了 wheremap端 join在map之前(笛卡尔积),但实际上网上看了,mapreduce的笛卡尔积发生在reduce端,下面哥们有个实现过程可以参考(http://blog.csdn.net/xyilu/article/details/8996204)。有空再看看 实际上实现过程是不是和他写的代码一样。
     
     
     
     
     
     
    前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地。今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到算法实现还有很多路要走。
     

    数据准备

    首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。
    准备好下面两张表:
    (1)m_ys_lab_jointest_a(以下简称表A)
    建表语句为:
    [sql] view plain copy
     
     print?
    1. create table if not exists m_ys_lab_jointest_a (  
    2.      id bigint,  
    3.      name string  
    4. )  
    5. row format delimited  
    6. fields terminated by '9'  
    7. lines terminated by '10'  
    8. stored as textfile;  
    数据:
    id     name
    1     北京
    2     天津
    3     河北
    4     山西
    5     内蒙古
    6     辽宁
    7     吉林
    8     黑龙江
    (2)m_ys_lab_jointest_b(以下简称表B)
    建表语句为:
    [sql] view plain copy
     
     print?
    1. create table if not exists m_ys_lab_jointest_b (  
    2.      id bigint,  
    3.      statyear bigint,  
    4.      num bigint  
    5. )  
    6. row format delimited  
    7. fields terminated by '9'  
    8. lines terminated by '10'  
    9. stored as textfile;  
    数据:
    id     statyear     num
    1     2010     1962
    1     2011     2019
    2     2010     1299
    2     2011     1355
    4     2010     3574
    4     2011     3593
    9     2010     2303
    9     2011     2347

    我们的目的是,以id为key做join操作,得到以下表:
    m_ys_lab_jointest_ab
    id     name    statyear     num
    1       北京    2011    2019
    1       北京    2010    1962
    2       天津    2011    1355
    2       天津    2010    1299
    4       山西    2011    3593
    4       山西    2010    3574

    计算模型

    整个计算过程是:
    (1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。
    (2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。
    如下图所示:

    代码

    代码如下:
    [java] view plain copy
     
     print?
    1. import java.io.IOException;  
    2. import java.util.HashMap;  
    3. import java.util.Iterator;  
    4. import java.util.Vector;  
    5.   
    6. import org.apache.hadoop.io.LongWritable;  
    7. import org.apache.hadoop.io.Text;  
    8. import org.apache.hadoop.io.Writable;  
    9. import org.apache.hadoop.mapred.FileSplit;  
    10. import org.apache.hadoop.mapred.JobConf;  
    11. import org.apache.hadoop.mapred.MapReduceBase;  
    12. import org.apache.hadoop.mapred.Mapper;  
    13. import org.apache.hadoop.mapred.OutputCollector;  
    14. import org.apache.hadoop.mapred.RecordWriter;  
    15. import org.apache.hadoop.mapred.Reducer;  
    16. import org.apache.hadoop.mapred.Reporter;  
    17.   
    18. /** 
    19.  * MapReduce实现Join操作 
    20.  */  
    21. public class MapRedJoin {  
    22.     public static final String DELIMITER = "u0009"; // 字段分隔符  
    23.       
    24.     // map过程  
    25.     public static class MapClass extends MapReduceBase implements  
    26.             Mapper<LongWritable, Text, Text, Text> {  
    27.                           
    28.         public void configure(JobConf job) {  
    29.             super.configure(job);  
    30.         }  
    31.           
    32.         public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,  
    33.                 Reporter reporter) throws IOException, ClassCastException {  
    34.             // 获取输入文件的全路径和名称  
    35.             String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();  
    36.             // 获取记录字符串  
    37.             String line = value.toString();  
    38.             // 抛弃空记录  
    39.             if (line == null || line.equals("")) return;   
    40.               
    41.             // 处理来自表A的记录  
    42.             if (filePath.contains("m_ys_lab_jointest_a")) {  
    43.                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
    44.                 if (values.length < 2) return;  
    45.                   
    46.                 String id = values[0]; // id  
    47.                 String name = values[1]; // name  
    48.                   
    49.                 output.collect(new Text(id), new Text("a#"+name));  
    50.             }  
    51.             // 处理来自表B的记录  
    52.             else if (filePath.contains("m_ys_lab_jointest_b")) {  
    53.                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
    54.                 if (values.length < 3) return;  
    55.                   
    56.                 String id = values[0]; // id  
    57.                 String statyear = values[1]; // statyear  
    58.                 String num = values[2]; //num  
    59.                   
    60.                 output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));  
    61.             }  
    62.         }  
    63.     }  
    64.       
    65.     // reduce过程  
    66.     public static class Reduce extends MapReduceBase  
    67.             implements Reducer<Text, Text, Text, Text> {  
    68.         public void reduce(Text key, Iterator<Text> values,  
    69.                 OutputCollector<Text, Text> output, Reporter reporter)  
    70.                 throws IOException {  
    71.                       
    72.             Vector<String> vecA = new Vector<String>(); // 存放来自表A的值  
    73.             Vector<String> vecB = new Vector<String>(); // 存放来自表B的值  
    74.               
    75.             while (values.hasNext()) {  
    76.                 String value = values.next().toString();  
    77.                 if (value.startsWith("a#")) {  
    78.                     vecA.add(value.substring(2));  
    79.                 } else if (value.startsWith("b#")) {  
    80.                     vecB.add(value.substring(2));  
    81.                 }  
    82.             }  
    83.               
    84.             int sizeA = vecA.size();  
    85.             int sizeB = vecB.size();  
    86.               
    87.             // 遍历两个向量  
    88.             int i, j;  
    89.             for (i = 0; i < sizeA; i ++) {  
    90.                 for (j = 0; j < sizeB; j ++) {  
    91.                     output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));  
    92.                 }  
    93.             }     
    94.         }  
    95.     }  
    96.       
    97.     protected void configJob(JobConf conf) {  
    98.         conf.setMapOutputKeyClass(Text.class);  
    99.         conf.setMapOutputValueClass(Text.class);  
    100.         conf.setOutputKeyClass(Text.class);  
    101.         conf.setOutputValueClass(Text.class);  
    102.         conf.setOutputFormat(ReportOutFormat.class);  
    103.     }  
    104. }  

    技术细节

    下面说一下其中的若干技术细节:
    (1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下:
    String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
    (2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。
    (3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。
  • 相关阅读:
    java 各个文件夹的含义
    对称加密 & 非对称加密
    leetcode 155 最小栈
    leetcode 53 最大自序列和
    leetcode 146 LRU 缓存机制
    notebook 开启 有限元学习
    leetcode 232 用栈实现队列
    LINUX装机问题:无法使用“Ctrl+Alt+[F1~F6]”快捷键切换到终端
    JAVA笔记4-static关键字
    C++构造函数、析构函数、虚析构函数
  • 原文地址:https://www.cnblogs.com/yaohaitao/p/6696921.html
Copyright © 2011-2022 走看看