zoukankan      html  css  js  c++  java
  • MapReduce实现两表的Join--原理及python和java代码实现

    用Hive一句话搞定的,可是有时必需要用mapreduce

    方法介绍

    1. 概述

    在传统数据库(如:MYSQL)中,JOIN操作是很常见且很耗时的。而在HADOOP中进行JOIN操作。相同常见且耗时,因为Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧。
    本文首先介绍了Hadoop上通常的JOIN实现方法。然后给出了几种针对不同输入数据集的优化方法。

    2. 常见的join方法介绍

    如果要进行join的数据分别来自File1和File2.

    2.1 reduce side join

    reduce side join是一种最简单的join方式,其主要思想例如以下:
    在map阶段,map函数同一时候读取两个文件File1和File2,为了区分两种来源的key/value数据对。对每条数据打一个标签(tag),比方:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件里的数据打标签。
    在reduce阶段。reduce函数获取key同样的来自File1和File2文件的value list。 然后对于同一个key。对File1和File2中的数据进行join(笛卡尔乘积)。

    即:reduce阶段进行实际的连接操作。


    2.2 map side join

    之所以存在reduce side join。是由于在map阶段不能获取全部须要的join字段,即:同一个key相应的字段可能位于不同map中。Reduce side join是很低效的,由于shuffle阶段要进行大量的传输数据。


    Map side join是针对下面场景进行的优化:两个待连接表中。有一个表很大。而还有一个表很小。以至于小表能够直接存放到内存中。

    这样,我们能够将小表复制多份。让每一个map task内存中存在一份(比方存放到hash table中)。然后仅仅扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有同样的key的记录,假设有,则连接后输出就可以。


    为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法例如以下:
    (1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的參数是文件的URI(假设是HDFS上的文件。能够这样:hdfs://namenode:9000/home/XXX/file,当中9000是自己配置的NameNodeport号)。JobTracker在作业启动之前会获取这个URI列表,并将对应的文件复制到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件文件夹。并使用标准的文件读写API读取对应的文件。

    2.3 SemiJoin

    SemiJoin,也叫半连接。是从分布式数据库中借鉴过来的方法。

    它的产生动机是:对于reduce side join,跨机器的传输数据量很大,这成了join操作的一个瓶颈。假设可以在map端过滤掉不会參加join操作的数据,则可以大大节省网络IO。
    实现方法非常easy:选取一个小表。如果是File1。将其參与join的key抽取出来,保存到文件File3中。File3文件一般非常小,能够放到内存中。

    在map阶段。使用DistributedCache将File3拷贝到各个TaskTracker上,然后将File2中不在File3中的key相应的记录过滤掉。剩下的reduce阶段的工作与reduce side join同样。
    很多其它关于半连接的介绍,可參考:半连接介绍:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

    2.4 reduce side join + BloomFilter

    在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候能够使用BloomFiler以节省空间。
    BloomFilter最常见的作用是:推断某个元素是否在一个集合里面。

    它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:假设contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:假设contains()返回true。则该元素可能在集合中。


    因而可将小表中的key保存到BloomFilter中。在map阶段过滤大表。可能有一些不在小表中的记录没有过滤掉(可是在小表中的记录一定不会过滤掉),这没关系,仅仅只是添加了少量的网络IO而已。
    很多其它关于BloomFilter的介绍,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500

    3. 二次排序

    在Hadoop中,默认情况下是依照key进行排序,假设要依照value进行排序怎么办?即:对于同一个key,reduce函数接收到的value list是依照value排序的。

    这样的应用需求在join操作中非经常见,比方,希望同样的key中。小表相应的value排在前面。
    有两种方法进行二次排序,分别为:buffer and in memory sort和 value-to-key conversion。
    对于buffer and in memory sort。主要思想是:在reduce()函数中,将某个key相应的全部value保存下来。然后进行排序。

    这样的方法最大的缺点是:可能会造成out of memory。
    对于value-to-key conversion。主要思想是:将key和部分value拼接成一个组合key(实现WritableComparable接口或者调用setSortComparatorClass函数),这样reduce获取的结果便是先按key排序。后按value排序的结果。须要注意的是,用户须要自己实现Paritioner。以便仅仅依照key进行数据划分。Hadoop显式的支持二次排序,在Configuration类中有个setGroupingComparatorClass()方法。可用于设置排序group的key值,

    reduce-side-join python代码

    hadoop有个工具叫做steaming,可以支持python、shell、C++、PHP等其它不论什么支持标准输入stdin及标准输出stdout的语言。其执行原理可以通过和标准java的map-reduce程序对照来说明:

    使用原生java语言实现Map-reduce程序
    1. hadoop准备好数据后,将数据传送给java的map程序
    2. java的map程序将数据处理后,输出O1
    3. hadoop将O1打散、排序,然后传给不同的reduce机器
    4. 每一个reduce机器将传来的数据传给reduce程序
    5. reduce程序将数据处理,输出终于数据O2
    借助hadoop streaming使用python语言实现Map-reduce程序
    1. hadoop准备好数据后,将数据传送给java的map程序
    2. java的map程序将数据处理成“键/值”对,并传送给python的map程序
    3. python的map程序将数据处理后,将结果传回给java的map程序
    4. java的map程序将数据输出为O1
    5. hadoop将O1打散、排序。然后传给不同的reduce机器
    6. 每一个reduce机器将传来的数据处理成“键/值”对。并传送给python的reduce程序
    7. python的reduce程序将数据处理后,将结果返回给java的reduce程序
    8. java的reduce程序将数据处理。输出终于数据O2

    上面红色表示map的对照,蓝色表示reduce的对照,能够看出streaming程序多了一步中间处理。这样说来steaming程序的效率和性能应该低于java版的程序。然而python的开发效率、执行性能有时候会大于java。这就是streaming的优势所在。

    hadoop之实现集合join的需求

    hadoop是用来做数据分析的,大都是对集合进行操作。因此该过程中将集合join起来使得一个集合能得到还有一个集合相应的信息的需求很常见。

    比方下面这个需求。有两份数据:学生信息(学号,姓名)和学生成绩(学号、课程、成绩),特点是有个共同的主键“学号”,如今须要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:

    学号。姓名) join (学号,课程,成绩)= (学号。姓名。课程。成绩)

    数据事例1-学生信息:

    学号sno 姓名name
    01 name1
    02 name2
    03 name3
    04 name4

    数据事例2:-学生成绩:

    学号sno 课程号courseno 成绩grade
    01 01 80
    01 02 90
    02 01 82
    02 02 95

    期待的终于输出:

    学号sno 姓名name 课程courseno 成绩grade
    01 name1 01 80
    01 name1 02 90
    02 name2 01 82
    02 name2 02 95

    实现join的注意点和易踩坑总结

    假设你想写一个完好健壮的map reduce程序。我建议你首先弄清楚输入数据的格式、输出数据的格式,然后自己手动构建输入数据并手动计算出输出数据,这个过程中你会发现一些敲代码中须要特别处理的地方:

    1. 实现join的key是哪个。是1个字段还是2个字段,本例中key是sno,1个字段
    2. 每一个集合中key能否够反复,本例中数据1不可反复,数据2的key能够反复
    3. 每一个集合中key的相应值能否够不存在,本例中有学生会没成绩,所以数据2的key能够为空

    第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中详细的代码实现方式。第3条相同影响代码编写方式。

    hadoop实现join操作的思路

    详细思路是给每一个数据源加上一个数字标记label。这样hadoop对其排序后同一个字段的数据排在一起而且依照label排好序了,于是直接将相邻同样key的数据合并在一起输出就得到了结果。

    1、 map阶段:给表1和表2加标记,事实上就是多输出一个字段,比方表一加标记为0,表2加标记为2;

    2、 partion阶段:依据学号key为第一主键,标记label为第二主键进行排序和分区

    3、 reduce阶段:因为已经依照第一主键、第二主键排好了序,将相邻同样key数据合并输出

    hadoop使用python实现join的map和reduce代码

    mapper.py的代码:

    reducer的代码:

    使用shell脚本启动hadoop程序的方法:

    能够自己手工构造输入输出数据进行測试。本程序是验证过的。

    很多其它须要注意的地方

    hadoop的join操作能够分为非常多类型,各种类型脚本的编写有所不同,其分类是依照key字段数目、value字段数目、key是否可反复来划分的。下面是一个个人总结的对比表,表示会影响的地方:

    影响类型 影响的范围
    key字段数目 1、启动脚本中num.key.fields.for.partition的配置2、启动脚本中stream.num.map.output.key.fields的配置

    3、map和reduce脚本中key的获取

    4、map和reduce脚本中每一条数据和上一条数据比較的方法key是否可反复假设数据源1可反复,标记为M。数据源2可反复标记为N,那么join能够分为:1*1、M*1、M*N类型

    1*1类型:reduce中先记录第一个value。然后在下一条直接合并输出。

    M*1类型:将类型1作为标记小的输出,然后每次遇见label=1就记录value,每遇见一次label=2就输出一次终于结果;

    M*N类型:遇见类型1,就用数组记录value值。遇见label=2就将将记录的数组值所有连同该行value输出。value字段数目影响每次label=1时记录的数据个数,须要将value都记录下来


    reduce-side-join java代码

    数据准备

    首先是准备好数据。这个倒已经是一个熟练的过程。所要做的是把演示样例数据准备好,记住路径和字段分隔符。
    准备好以下两张表:
    (1)m_ys_lab_jointest_a(下面简称表A)
    建表语句为:
    1. create table if not exists m_ys_lab_jointest_a (  
    2.      id bigint,  
    3.      name string  
    4. )  
    5. row format delimited  
    6. fields terminated by '9'  
    7. lines terminated by '10'  
    8. stored as textfile;  
    数据:
    id     name
    1     北京
    2     天津
    3     河北
    4     山西
    5     内蒙古
    6     辽宁
    7     吉林
    8     黑龙江

    (2)m_ys_lab_jointest_b(下面简称表B)
    建表语句为:
    1. create table if not exists m_ys_lab_jointest_b (  
    2.      id bigint,  
    3.      statyear bigint,  
    4.      num bigint  
    5. )  
    6. row format delimited  
    7. fields terminated by '9'  
    8. lines terminated by '10'  
    9. stored as textfile;  
    数据:
    id     statyear     num
    1     2010     1962
    1     2011     2019
    2     2010     1299
    2     2011     1355
    4     2010     3574
    4     2011     3593
    9     2010     2303
    9     2011     2347

    我们的目的是。以id为key做join操作。得到下面表:
    m_ys_lab_jointest_ab
    id     name    statyear     num
    1       北京    2011    2019
    1       北京    2010    1962
    2       天津    2011    1355
    2       天津    2010    1299
    4       山西    2011    3593
    4       山西    2010    3574

    计算模型

    整个计算过程是:
    (1)在map阶段。把全部记录标记成<key, value>的形式,当中key是id,value则依据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name。来源于表B的记录,value的值为"b#"+score。

    (2)在reduce阶段,先把每一个key下的value列表拆分为分别来自表A和表B的两部分。分别放入两个向量中。然后遍历两个向量做笛卡尔积。形成一条条终于结果。
    例如以下图所看到的:

    代码

    代码例如以下:
    [java] view plain copy
     print?
    1. import java.io.IOException;  
    2. import java.util.HashMap;  
    3. import java.util.Iterator;  
    4. import java.util.Vector;  
    5.   
    6. import org.apache.hadoop.io.LongWritable;  
    7. import org.apache.hadoop.io.Text;  
    8. import org.apache.hadoop.io.Writable;  
    9. import org.apache.hadoop.mapred.FileSplit;  
    10. import org.apache.hadoop.mapred.JobConf;  
    11. import org.apache.hadoop.mapred.MapReduceBase;  
    12. import org.apache.hadoop.mapred.Mapper;  
    13. import org.apache.hadoop.mapred.OutputCollector;  
    14. import org.apache.hadoop.mapred.RecordWriter;  
    15. import org.apache.hadoop.mapred.Reducer;  
    16. import org.apache.hadoop.mapred.Reporter;  
    17.   
    18. /** 
    19.  * MapReduce实现Join操作 
    20.  */  
    21. public class MapRedJoin {  
    22.     public static final String DELIMITER = "u0009"// 字段分隔符  
    23.       
    24.     // map过程  
    25.     public static class MapClass extends MapReduceBase implements  
    26.             Mapper<LongWritable, Text, Text, Text> {  
    27.                           
    28.         public void configure(JobConf job) {  
    29.             super.configure(job);  
    30.         }  
    31.           
    32.         public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,  
    33.                 Reporter reporter) throws IOException, ClassCastException {  
    34.             // 获取输入文件的全路径和名称  
    35.             String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();  
    36.             // 获取记录字符串  
    37.             String line = value.toString();  
    38.             // 抛弃空记录  
    39.             if (line == null || line.equals("")) return;   
    40.               
    41.             // 处理来自表A的记录  
    42.             if (filePath.contains("m_ys_lab_jointest_a")) {  
    43.                 String[] values = line.split(DELIMITER); // 按分隔符切割出字段  
    44.                 if (values.length < 2return;  
    45.                   
    46.                 String id = values[0]; // id  
    47.                 String name = values[1]; // name  
    48.                   
    49.                 output.collect(new Text(id), new Text("a#"+name));  
    50.             }  
    51.             // 处理来自表B的记录  
    52.             else if (filePath.contains("m_ys_lab_jointest_b")) {  
    53.                 String[] values = line.split(DELIMITER); // 按分隔符切割出字段  
    54.                 if (values.length < 3return;  
    55.                   
    56.                 String id = values[0]; // id  
    57.                 String statyear = values[1]; // statyear  
    58.                 String num = values[2]; //num  
    59.                   
    60.                 output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));  
    61.             }  
    62.         }  
    63.     }  
    64.       
    65.     // reduce过程  
    66.     public static class Reduce extends MapReduceBase  
    67.             implements Reducer<Text, Text, Text, Text> {  
    68.         public void reduce(Text key, Iterator<Text> values,  
    69.                 OutputCollector<Text, Text> output, Reporter reporter)  
    70.                 throws IOException {  
    71.                       
    72.             Vector<String> vecA = new Vector<String>(); // 存放来自表A的值  
    73.             Vector<String> vecB = new Vector<String>(); // 存放来自表B的值  
    74.               
    75.             while (values.hasNext()) {  
    76.                 String value = values.next().toString();  
    77.                 if (value.startsWith("a#")) {  
    78.                     vecA.add(value.substring(2));  
    79.                 } else if (value.startsWith("b#")) {  
    80.                     vecB.add(value.substring(2));  
    81.                 }  
    82.             }  
    83.               
    84.             int sizeA = vecA.size();  
    85.             int sizeB = vecB.size();  
    86.               
    87.             // 遍历两个向量  
    88.             int i, j;  
    89.             for (i = 0; i < sizeA; i ++) {  
    90.                 for (j = 0; j < sizeB; j ++) {  
    91.                     output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));  
    92.                 }  
    93.             }     
    94.         }  
    95.     }  
    96.       
    97.     protected void configJob(JobConf conf) {  
    98.         conf.setMapOutputKeyClass(Text.class);  
    99.         conf.setMapOutputValueClass(Text.class);  
    100.         conf.setOutputKeyClass(Text.class);  
    101.         conf.setOutputValueClass(Text.class);  
    102.         conf.setOutputFormat(ReportOutFormat.class);  
    103.     }  
    104. }  

    技术细节

    以下说一下当中的若干技术细节:
    (1)因为输入数据涉及两张表。我们须要推断当前处理的记录是来自表A还是来自表B。

    Reporter类getInputSplit()方法能够获取输入数据的路径,详细代码例如以下:

    String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
    (2)map的输出的结果,同id的全部记录(无论来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段须要将其拆开,保存为相当于笛卡尔积的m x n条记录。

    因为事先不知道m、n是多少。这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们须要的终于结果。

    (3)在MapReduce中能够使用System.out.println()方法输出,以方便调试。

    只是System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件里,这两个文件位于logs/userlogs/attempt_xxx文件夹下。

    能够通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。


    全部方法的java代码(巨长)


    从别人那转来

    1、在Reudce端进行连接。
    在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其详细的实现原理例如以下:
    Map端的主要工作:为来自不同表(文件)的key/value对打标签以差别不同来源的记录。

    然后用连接字段作为key。其余部分和新加的标志作为value。最后进行输出。

    reduce端的主要工作:在reduce端以连接字段作为key的分组已经完毕,我们仅仅须要在每个分组其中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔仅仅就ok了。原理很easy,以下来看一个实例:
    (1)自己定义一个value返回类型:
    1. package com.mr.reduceSizeJoin;   
    2. import java.io.DataInput;   
    3. import java.io.DataOutput;   
    4. import java.io.IOException;   
    5. import org.apache.hadoop.io.Text;   
    6. import org.apache.hadoop.io.WritableComparable;   
    7. public class CombineValues implements WritableComparable{   
    8.     //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);   
    9.     private Text joinKey;//链接keyword   
    10.     private Text flag;//文件来源标志   
    11.     private Text secondPart;//除了链接键外的其它部分   
    12.     public void setJoinKey(Text joinKey) {   
    13.         this.joinKey = joinKey;   
    14.     }   
    15.     public void setFlag(Text flag) {   
    16.         this.flag = flag;   
    17.     }   
    18.     public void setSecondPart(Text secondPart) {   
    19.         this.secondPart = secondPart;   
    20.     }   
    21.     public Text getFlag() {   
    22.         return flag;   
    23.     }   
    24.     public Text getSecondPart() {   
    25.         return secondPart;   
    26.     }   
    27.     public Text getJoinKey() {   
    28.         return joinKey;   
    29.     }   
    30.     public CombineValues() {   
    31.         this.joinKey =  new Text();   
    32.         this.flag = new Text();   
    33.         this.secondPart = new Text();   
    34.     }

    35.     @Override 
    36.     public void write(DataOutput out) throws IOException {   
    37.         this.joinKey.write(out);   
    38.         this.flag.write(out);   
    39.         this.secondPart.write(out);   
    40.     }   
    41.     @Override 
    42.     public void readFields(DataInput in) throws IOException {   
    43.         this.joinKey.readFields(in);   
    44.         this.flag.readFields(in);   
    45.         this.secondPart.readFields(in);   
    46.     }   
    47.     @Override 
    48.     public int compareTo(CombineValues o) {   
    49.         return this.joinKey.compareTo(o.getJoinKey());   
    50.     }   
    51.     @Override 
    52.     public String toString() {   
    53.         // TODO Auto-generated method stub   
    54.         return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";   
    55.     }   
    56. }


    (2)map、reduce主体代码
    1. package com.mr.reduceSizeJoin;   
    2. import java.io.IOException;   
    3. import java.util.ArrayList;   
    4. import org.apache.hadoop.conf.Configuration;   
    5. import org.apache.hadoop.conf.Configured;   
    6. import org.apache.hadoop.fs.Path;   
    7. import org.apache.hadoop.io.Text;   
    8. import org.apache.hadoop.mapreduce.Job;   
    9. import org.apache.hadoop.mapreduce.Mapper;   
    10. import org.apache.hadoop.mapreduce.Reducer;   
    11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
    12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
    13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
    14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
    15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
    16. import org.apache.hadoop.util.Tool;   
    17. import org.apache.hadoop.util.ToolRunner;   
    18. import org.slf4j.Logger;   
    19. import org.slf4j.LoggerFactory;   
    20. /**   
    21. * @author zengzhaozheng   
    22. * 用途说明:   
    23. * reudce side join中的left outer join   
    24. * 左连接。两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段   
    25. * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
    26. * tb_dim_city.dat文件内容,分隔符为"|":   
    27. * id     name  orderid  city_code  is_show   
    28. * 0       其它        9999     9999         0   
    29. * 1       长春        1        901          1   
    30. * 2       吉林        2        902          1   
    31. * 3       四平        3        903          1   
    32. * 4       松原        4        904          1   
    33. * 5       通化        5        905          1   
    34. * 6       辽源        6        906          1   
    35. * 7       白城        7        907          1   
    36. * 8       白山        8        908          1   
    37. * 9       延吉        9        909          1   
    38. * -------------------------风骚的切割线-------------------------------   
    39. * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
    40. * tb_user_profiles.dat文件内容,分隔符为"|":   
    41. * userID   network     flow    cityID   
    42. * 1           2G       123      1   
    43. * 2           3G       333      2   
    44. * 3           3G       555      1   
    45. * 4           2G       777      3   
    46. * 5           3G       666      4   
    47. *   
    48. * -------------------------风骚的切割线-------------------------------   
    49. *  结果:   
    50. *  1   长春  1   901 1   1   2G  123   
    51. *  1   长春  1   901 1   3   3G  555   
    52. *  2   吉林  2   902 1   2   3G  333   
    53. *  3   四平  3   903 1   4   2G  777   
    54. *  4   松原  4   904 1   5   3G  666   
    55. */ 
    56. public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{   
    57.     private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);   
    58.     public static class LeftOutJoinMapper extends Mapper {   
    59.         private CombineValues combineValues = new CombineValues();   
    60.         private Text flag = new Text();   
    61.         private Text joinKey = new Text();   
    62.         private Text secondPart = new Text();   
    63.         @Override 
    64.         protected void map(Object key, Text value, Context context)   
    65.                 throws IOException, InterruptedException {   
    66.             //获得文件输入路径   
    67.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
    68.             //数据来自tb_dim_city.dat文件,标志即为"0"   
    69.             if(pathName.endsWith("tb_dim_city.dat")){   
    70.                 String[] valueItems = value.toString().split("\|");   
    71.                 //过滤格式错误的记录   
    72.                 if(valueItems.length != 5){   
    73.                     return;   
    74.                 }   
    75.                 flag.set("0");   
    76.                 joinKey.set(valueItems[0]);   
    77.                 secondPart.set(valueItems[1]+" "+valueItems[2]+" "+valueItems[3]+" "+valueItems[4]);   
    78.                 combineValues.setFlag(flag);   
    79.                 combineValues.setJoinKey(joinKey);   
    80.                 combineValues.setSecondPart(secondPart);   
    81.                 context.write(combineValues.getJoinKey(), combineValues);

    82.                 }//数据来自于tb_user_profiles.dat。标志即为"1"   
    83.             else if(pathName.endsWith("tb_user_profiles.dat")){   
    84.                 String[] valueItems = value.toString().split("\|");   
    85.                 //过滤格式错误的记录   
    86.                 if(valueItems.length != 4){   
    87.                     return;   
    88.                 }   
    89.                 flag.set("1");   
    90.                 joinKey.set(valueItems[3]);   
    91.                 secondPart.set(valueItems[0]+" "+valueItems[1]+" "+valueItems[2]);   
    92.                 combineValues.setFlag(flag);   
    93.                 combineValues.setJoinKey(joinKey);   
    94.                 combineValues.setSecondPart(secondPart);   
    95.                 context.write(combineValues.getJoinKey(), combineValues);   
    96.             }   
    97.         }   
    98.     }   
    99.     public static class LeftOutJoinReducer extends Reducer {   
    100.         //存储一个分组中的左表信息   
    101.         private ArrayList leftTable = new ArrayList();   
    102.         //存储一个分组中的右表信息   
    103.         private ArrayList rightTable = new ArrayList();   
    104.         private Text secondPar = null;   
    105.         private Text output = new Text();   
    106.         /**   
    107.          * 一个分组调用一次reduce函数   
    108.          */ 
    109.         @Override 
    110.         protected void reduce(Text key, Iterable value, Context context)   
    111.                 throws IOException, InterruptedException {   
    112.             leftTable.clear();   
    113.             rightTable.clear();   
    114.             /**   
    115.              * 将分组中的元素依照文件分别进行存放   
    116.              * 这样的方法要注意的问题:   
    117.              * 假设一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,   
    118.              * 在处理分布式问题之前最好先了解数据的分布情况。依据不同的分布採取最   
    119.              * 适当的处理方法,这样能够有效的防止导致OOM和数据过度倾斜问题。   
    120.              */ 
    121.             for(CombineValues cv : value){   
    122.                 secondPar = new Text(cv.getSecondPart().toString());   
    123.                 //左表tb_dim_city   
    124.                 if("0".equals(cv.getFlag().toString().trim())){   
    125.                     leftTable.add(secondPar);   
    126.                 }   
    127.                 //右表tb_user_profiles   
    128.                 else if("1".equals(cv.getFlag().toString().trim())){   
    129.                     rightTable.add(secondPar);   
    130.                 }   
    131.             }   
    132.             logger.info("tb_dim_city:"+leftTable.toString());   
    133.             logger.info("tb_user_profiles:"+rightTable.toString());   
    134.             for(Text leftPart : leftTable){   
    135.                 for(Text rightPart : rightTable){   
    136.                     output.set(leftPart+ " " + rightPart);   
    137.                     context.write(key, output);   
    138.                 }   
    139.             }   
    140.         }   
    141.     }   
    142.     @Override 
    143.     public int run(String[] args) throws Exception {   
    144.           Configuration conf=getConf(); //获得配置文件对象   
    145.             Job job=new Job(conf,"LeftOutJoinMR");   
    146.             job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
    147.             FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   
    148.             FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
    149.             job.setMapperClass(LeftOutJoinMapper.class);   
    150.             job.setReducerClass(LeftOutJoinReducer.class);
    151.             job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   
    152.             job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格格式

    153.             //设置map的输出key和value类型   
    154.             job.setMapOutputKeyClass(Text.class);   
    155.             job.setMapOutputValueClass(CombineValues.class);

    156.             //设置reduce的输出key和value类型   
    157.             job.setOutputKeyClass(Text.class);   
    158.             job.setOutputValueClass(Text.class);   
    159.             job.waitForCompletion(true);   
    160.             return job.isSuccessful()?0:1;   
    161.     }   
    162.     public static void main(String[] args) throws IOException,   
    163.             ClassNotFoundException, InterruptedException {   
    164.         try {   
    165.             int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);   
    166.             System.exit(returnCode);   
    167.         } catch (Exception e) {   
    168.             // TODO Auto-generated catch block   
    169.             logger.error(e.getMessage());   
    170.         }   
    171.     }   
    172. }

    当中详细的分析以及数据的输出输入请看代码中的凝视已经写得比較清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这样的方式,我们可以非常明显的看出原:由于总体数据被切割了。每一个map task仅仅处理一部分数据而不可以获取到全部须要的join字段,因此我们须要在讲join key作为reduce端的分组将全部join key同样的记录集中起来进行处理。所以reduce join这样的方式就出现了。这样的方式的缺点非常明显就是会造成map和reduce端也就是shuffle阶段出现大量的传输数据,效率非常低。

    2、在Map端进行连接。

    使用场景:一张表十分小、一张表非常大。
    使用方法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释切割放到内存中(可以放大Hash Map等等容器中)。

    然后扫描大表,看大表中的每条记录的join key /value值是否可以在内存中找到同样join key的记录。假设有则直接输出结果。

    直接上代码。比較简单:
    1. package com.mr.mapSideJoin;   
    2. import java.io.BufferedReader;   
    3. import java.io.FileReader;   
    4. import java.io.IOException;   
    5. import java.util.HashMap;   
    6. import org.apache.hadoop.conf.Configuration;   
    7. import org.apache.hadoop.conf.Configured;   
    8. import org.apache.hadoop.filecache.DistributedCache;   
    9. import org.apache.hadoop.fs.Path;   
    10. import org.apache.hadoop.io.Text;   
    11. import org.apache.hadoop.mapreduce.Job;   
    12. import org.apache.hadoop.mapreduce.Mapper;   
    13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
    14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
    15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
    16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
    17. import org.apache.hadoop.util.Tool;   
    18. import org.apache.hadoop.util.ToolRunner;   
    19. import org.slf4j.Logger;   
    20. import org.slf4j.LoggerFactory;   
    21. /**   
    22. * @author zengzhaozheng   
    23. *   
    24. * 用途说明:   
    25. * Map side join中的left outer join   
    26. * 左连接。两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段   
    27. * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),   
    28. * 如果tb_dim_city文件记录数非常少。tb_dim_city.dat文件内容,分隔符为"|":   
    29. * id     name  orderid  city_code  is_show   
    30. * 0       其它        9999     9999         0   
    31. * 1       长春        1        901          1   
    32. * 2       吉林        2        902          1   
    33. * 3       四平        3        903          1   
    34. * 4       松原        4        904          1   
    35. * 5       通化        5        905          1   
    36. * 6       辽源        6        906          1   
    37. * 7       白城        7        907          1   
    38. * 8       白山        8        908          1   
    39. * 9       延吉        9        909          1   
    40. * -------------------------风骚的切割线-------------------------------   
    41. * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
    42. * tb_user_profiles.dat文件内容,分隔符为"|":   
    43. * userID   network     flow    cityID   
    44. * 1           2G       123      1   
    45. * 2           3G       333      2   
    46. * 3           3G       555      1   
    47. * 4           2G       777      3   
    48. * 5           3G       666      4   
    49. * -------------------------风骚的切割线-------------------------------   
    50. *  结果:   
    51. *  1   长春  1   901 1   1   2G  123   
    52. *  1   长春  1   901 1   3   3G  555   
    53. *  2   吉林  2   902 1   2   3G  333   
    54. *  3   四平  3   903 1   4   2G  777   
    55. *  4   松原  4   904 1   5   3G  666   
    56. */ 
    57. public class MapSideJoinMain extends Configured implements Tool{   
    58.     private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);   
    59.     public static class LeftOutJoinMapper extends Mapper {

    60.         private HashMap city_info = new HashMap();   
    61.         private Text outPutKey = new Text();   
    62.         private Text outPutValue = new Text();   
    63.         private String mapInputStr = null;   
    64.         private String mapInputSpit[] = null;   
    65.         private String city_secondPart = null;   
    66.         /**   
    67.          * 此方法在每一个task開始之前运行,这里主要用作从DistributedCache   
    68.          * 中取到tb_dim_city文件。并将里边记录取出放到内存中。

         

    69.          */ 
    70.         @Override 
    71.         protected void setup(Context context)   
    72.                 throws IOException, InterruptedException {   
    73.             BufferedReader br = null;   
    74.             //获得当前作业的DistributedCache相关文件   
    75.             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
    76.             String cityInfo = null;   
    77.             for(Path p : distributePaths){   
    78.                 if(p.toString().endsWith("tb_dim_city.dat")){   
    79.                     //读缓存文件,并放到mem中   
    80.                     br = new BufferedReader(new FileReader(p.toString()));   
    81.                     while(null!=(cityInfo=br.readLine())){   
    82.                         String[] cityPart = cityInfo.split("\|",5);   
    83.                         if(cityPart.length ==5){   
    84.                             city_info.put(cityPart[0], cityPart[1]+" "+cityPart[2]+" "+cityPart[3]+" "+cityPart[4]);   
    85.                         }   
    86.                     }   
    87.                 }   
    88.             }   
    89.         }

    90.         /**   
    91.          * Map端的实现相当简单,直接推断tb_user_profiles.dat中的   
    92.          * cityID是否存在我的map中就ok了,这样就能够实现Map Join了   
    93.          */ 
    94.         @Override 
    95.         protected void map(Object key, Text value, Context context)   
    96.                 throws IOException, InterruptedException {   
    97.             //排掉空行   
    98.             if(value == null || value.toString().equals("")){   
    99.                 return;   
    100.             }   
    101.             mapInputStr = value.toString();   
    102.             mapInputSpit = mapInputStr.split("\|",4);   
    103.             //过滤非法记录   
    104.             if(mapInputSpit.length != 4){   
    105.                 return;   
    106.             }   
    107.             //推断链接字段是否在map中存在   
    108.             city_secondPart = city_info.get(mapInputSpit[3]);   
    109.             if(city_secondPart != null){   
    110.                 this.outPutKey.set(mapInputSpit[3]);   
    111.                 this.outPutValue.set(city_secondPart+" "+mapInputSpit[0]+" "+mapInputSpit[1]+" "+mapInputSpit[2]);   
    112.                 context.write(outPutKey, outPutValue);   
    113.             }   
    114.         }   
    115.     }   
    116.     @Override 
    117.     public int run(String[] args) throws Exception {   
    118.             Configuration conf=getConf(); //获得配置文件对象   
    119.             DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job加入缓存文件   
    120.             Job job=new Job(conf,"MapJoinMR");   
    121.             job.setNumReduceTasks(0);

    122.             FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   
    123.             FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径

    124.             job.setJarByClass(MapSideJoinMain.class);   
    125.             job.setMapperClass(LeftOutJoinMapper.class);

    126.             job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   
    127.             job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

    128.             //设置map的输出key和value类型   
    129.             job.setMapOutputKeyClass(Text.class);

    130.             //设置reduce的输出key和value类型   
    131.             job.setOutputKeyClass(Text.class);   
    132.             job.setOutputValueClass(Text.class);   
    133.             job.waitForCompletion(true);   
    134.             return job.isSuccessful()?0:1;   
    135.     }   
    136.     public static void main(String[] args) throws IOException,   
    137.             ClassNotFoundException, InterruptedException {   
    138.         try {   
    139.             int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   
    140.             System.exit(returnCode);   
    141.         } catch (Exception e) {   
    142.             // TODO Auto-generated catch block   
    143.             logger.error(e.getMessage());   
    144.         }   
    145.     }   
    146. }

    这里说说DistributedCache。DistributedCache是分布式缓存的一种实现。它在整个MapReduce框架中起着相当关键的数据,他能够支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将相应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外。关于DistributedCache和作业的关系。比方权限、存储路径区分、public和private等属性。接下来实用再整理研究一下写一篇blog,这里就不具体说了。

    另外另一种比較变态的Map Join方式,就是结合HBase来做Map Join操作。

    这样的方式全然能够突破内存的控制,使你毫无忌惮的使用Map Join。并且效率也很不错。


    3、SemiJoin。
    SemiJoin就是所谓的半连接,事实上细致一看就是reduce join的一个变种。就是在map端过滤掉一些数据,在网络中仅仅传输參与连接的数据不參与连接的数据不必在网络中进行传输,从而降低了shuffle的网络传输量。使总体效率得到提高。其它思想和reduce join是一模一样的。说得更加接地气一点就是将小表中參与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(能够放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉。让那些參与join的记录通过shuffle传输到reduce端进行join操作。其它的和reduce join都是一样的。看代码:
    1. package com.mr.SemiJoin;   
    2. import java.io.BufferedReader;   
    3. import java.io.FileReader;   
    4. import java.io.IOException;   
    5. import java.util.ArrayList;   
    6. import java.util.HashSet;   
    7. import org.apache.hadoop.conf.Configuration;   
    8. import org.apache.hadoop.conf.Configured;   
    9. import org.apache.hadoop.filecache.DistributedCache;   
    10. import org.apache.hadoop.fs.Path;   
    11. import org.apache.hadoop.io.Text;   
    12. import org.apache.hadoop.mapreduce.Job;   
    13. import org.apache.hadoop.mapreduce.Mapper;   
    14. import org.apache.hadoop.mapreduce.Reducer;   
    15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
    16. import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
    17. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
    18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
    19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
    20. import org.apache.hadoop.util.Tool;   
    21. import org.apache.hadoop.util.ToolRunner;   
    22. import org.slf4j.Logger;   
    23. import org.slf4j.LoggerFactory;   
    24. /**   
    25. * @author zengzhaozheng   
    26. *   
    27. * 用途说明:   
    28. * reudce side join中的left outer join   
    29. * 左连接。两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段   
    30. * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
    31. * tb_dim_city.dat文件内容,分隔符为"|":   
    32. * id     name  orderid  city_code  is_show   
    33. * 0       其它        9999     9999         0   
    34. * 1       长春        1        901          1   
    35. * 2       吉林        2        902          1   
    36. * 3       四平        3        903          1   
    37. * 4       松原        4        904          1   
    38. * 5       通化        5        905          1   
    39. * 6       辽源        6        906          1   
    40. * 7       白城        7        907          1   
    41. * 8       白山        8        908          1   
    42. * 9       延吉        9        909          1   
    43. * -------------------------风骚的切割线-------------------------------   
    44. * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
    45. * tb_user_profiles.dat文件内容,分隔符为"|":   
    46. * userID   network     flow    cityID   
    47. * 1           2G       123      1   
    48. * 2           3G       333      2   
    49. * 3           3G       555      1   
    50. * 4           2G       777      3   
    51. * 5           3G       666      4   
    52. * -------------------------风骚的切割线-------------------------------   
    53. * joinKey.dat内容:   
    54. * city_code   
    55. * 1   
    56. * 2   
    57. * 3   
    58. * 4   
    59. * -------------------------风骚的切割线-------------------------------   
    60. *  结果:   
    61. *  1   长春  1   901 1   1   2G  123   
    62. *  1   长春  1   901 1   3   3G  555   
    63. *  2   吉林  2   902 1   2   3G  333   
    64. *  3   四平  3   903 1   4   2G  777   
    65. *  4   松原  4   904 1   5   3G  666   
    66. */ 
    67. public class SemiJoin extends Configured implements Tool{   
    68.     private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   
    69.     public static class SemiJoinMapper extends Mapper {   
    70.         private CombineValues combineValues = new CombineValues();   
    71.         private HashSet joinKeySet = new HashSet();   
    72.         private Text flag = new Text();   
    73.         private Text joinKey = new Text();   
    74.         private Text secondPart = new Text();   
    75.         /**   
    76.          * 将參加join的key从DistributedCache取出放到内存中,以便在map端将要參加join的key过滤出来。b   
    77.          */ 
    78.         @Override 
    79.         protected void setup(Context context)   
    80.                 throws IOException, InterruptedException {   
    81.             BufferedReader br = null;   
    82.             //获得当前作业的DistributedCache相关文件   
    83.             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
    84.             String joinKeyStr = null;   
    85.             for(Path p : distributePaths){   
    86.                 if(p.toString().endsWith("joinKey.dat")){   
    87.                     //读缓存文件,并放到mem中   
    88.                     br = new BufferedReader(new FileReader(p.toString()));   
    89.                     while(null!=(joinKeyStr=br.readLine())){   
    90.                         joinKeySet.add(joinKeyStr);   
    91.                     }   
    92.                 }   
    93.             }   
    94.         }   
    95.         @Override 
    96.         protected void map(Object key, Text value, Context context)   
    97.                 throws IOException, InterruptedException {   
    98.             //获得文件输入路径   
    99.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
    100.             //数据来自tb_dim_city.dat文件,标志即为"0"   
    101.             if(pathName.endsWith("tb_dim_city.dat")){   
    102.                 String[] valueItems = value.toString().split("\|");   
    103.                 //过滤格式错误的记录   
    104.                 if(valueItems.length != 5){   
    105.                     return;   
    106.                 }   
    107.                 //过滤掉不须要參加join的记录   
    108.                 if(joinKeySet.contains(valueItems[0])){   
    109.                     flag.set("0");   
    110.                     joinKey.set(valueItems[0]);   
    111.                     secondPart.set(valueItems[1]+" "+valueItems[2]+" "+valueItems[3]+" "+valueItems[4]);   
    112.                     combineValues.setFlag(flag);   
    113.                     combineValues.setJoinKey(joinKey);   
    114.                     combineValues.setSecondPart(secondPart);   
    115.                     context.write(combineValues.getJoinKey(), combineValues);   
    116.                 }else{   
    117.                     return ;   
    118.                 }   
    119.             }//数据来自于tb_user_profiles.dat,标志即为"1"   
    120.             else if(pathName.endsWith("tb_user_profiles.dat")){   
    121.                 String[] valueItems = value.toString().split("\|");   
    122.                 //过滤格式错误的记录   
    123.                 if(valueItems.length != 4){   
    124.                     return;   
    125.                 }   
    126.                 //过滤掉不须要參加join的记录   
    127.                 if(joinKeySet.contains(valueItems[3])){   
    128.                     flag.set("1");   
    129.                     joinKey.set(valueItems[3]);   
    130.                     secondPart.set(valueItems[0]+" "+valueItems[1]+" "+valueItems[2]);   
    131.                     combineValues.setFlag(flag);   
    132.                     combineValues.setJoinKey(joinKey);   
    133.                     combineValues.setSecondPart(secondPart);   
    134.                     context.write(combineValues.getJoinKey(), combineValues);   
    135.                 }else{   
    136.                     return ;   
    137.                 }   
    138.             }   
    139.         }   
    140.     }   
    141.     public static class SemiJoinReducer extends Reducer {   
    142.         //存储一个分组中的左表信息   
    143.         private ArrayList leftTable = new ArrayList();   
    144.         //存储一个分组中的右表信息   
    145.         private ArrayList rightTable = new ArrayList();   
    146.         private Text secondPar = null;   
    147.         private Text output = new Text();   
    148.         /**   
    149.          * 一个分组调用一次reduce函数   
    150.          */ 
    151.         @Override 
    152.         protected void reduce(Text key, Iterable value, Context context)   
    153.                 throws IOException, InterruptedException {   
    154.             leftTable.clear();   
    155.             rightTable.clear();   
    156.             /**   
    157.              * 将分组中的元素依照文件分别进行存放   
    158.              * 这样的方法要注意的问题:   
    159.              * 假设一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM。   
    160.              * 在处理分布式问题之前最好先了解数据的分布情况,依据不同的分布採取最   
    161.              * 适当的处理方法,这样能够有效的防止导致OOM和数据过度倾斜问题。   
    162.              */ 
    163.             for(CombineValues cv : value){   
    164.                 secondPar = new Text(cv.getSecondPart().toString());   
    165.                 //左表tb_dim_city   
    166.                 if("0".equals(cv.getFlag().toString().trim())){   
    167.                     leftTable.add(secondPar);   
    168.                 }   
    169.                 //右表tb_user_profiles   
    170.                 else if("1".equals(cv.getFlag().toString().trim())){   
    171.                     rightTable.add(secondPar);   
    172.                 }   
    173.             }   
    174.             logger.info("tb_dim_city:"+leftTable.toString());   
    175.             logger.info("tb_user_profiles:"+rightTable.toString());   
    176.             for(Text leftPart : leftTable){   
    177.                 for(Text rightPart : rightTable){   
    178.                     output.set(leftPart+ " " + rightPart);   
    179.                     context.write(key, output);   
    180.                 }   
    181.             }   
    182.         }   
    183.     }   
    184.     @Override 
    185.     public int run(String[] args) throws Exception {   
    186.             Configuration conf=getConf(); //获得配置文件对象   
    187.             DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
    188.             Job job=new Job(conf,"LeftOutJoinMR");   
    189.             job.setJarByClass(SemiJoin.class);

    190.             FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   
    191.             FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径

    192.             job.setMapperClass(SemiJoinMapper.class);   
    193.             job.setReducerClass(SemiJoinReducer.class);

    194.             job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   
    195.             job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

    196.             //设置map的输出key和value类型   
    197.             job.setMapOutputKeyClass(Text.class);   
    198.             job.setMapOutputValueClass(CombineValues.class);

    199.             //设置reduce的输出key和value类型   
    200.             job.setOutputKeyClass(Text.class);   
    201.             job.setOutputValueClass(Text.class);   
    202.             job.waitForCompletion(true);   
    203.             return job.isSuccessful()?0:1;   
    204.     }   
    205.     public static void main(String[] args) throws IOException,   
    206.             ClassNotFoundException, InterruptedException {   
    207.         try {   
    208.             int returnCode =  ToolRunner.run(new SemiJoin(),args);   
    209.             System.exit(returnCode);   
    210.         } catch (Exception e) {   
    211.             logger.error(e.getMessage());   
    212.         }   
    213.     }   
    214. }

    这里还说说SemiJoin也是有一定的适用范围的。其抽取出来进行join的key是要放到内存中的,所以不可以太大。easy在Map端造成OOM。

    总结
    blog介绍了三种join方式。这三种join方式适用于不同的场景。其处理效率上的相差还是蛮大的。当中主要导致因素是网络传输。

    Map join效率最高。其次是SemiJoin。最低的是reduce join。另外,写分布式大数据处理程序的时最好要对总体要处理的数据分布情况作一个了解,这能够提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。



          

查看全文
  • 相关阅读:
    日常点滴
    Django基础之forms组件中的ModelForm组件
    你想了解的轮询、长轮询和websocket都在这里了
    python并发编程之协程
    聊聊五大IO模型
    python并发编程之线程
    网络编程
    python并发编程之进程
    python中的异常处理
    flask实现文件的上传
  • 原文地址:https://www.cnblogs.com/yutingliuyl/p/7298187.html
  • Copyright © 2011-2022 走看看