zoukankan      html  css  js  c++  java
  • Hadoop的Map侧join

    写了关于Hadoop下载地址的Map侧join 
    和Reduce的join,今天我们就来在看另外一种比较中立的Join。 

    SemiJoin,一般称为半链接,其原理是在Map侧过滤掉了一些不需要join的数据,从而大大减少了reduce的shffule时间,因为我们知道,如果仅仅使用Reduce侧连接,那么如果一份数据中,存在大量的无效数据,而这些数据,在join中,并不需要,但是因为没有做过预处理,所以这些数据,直到真正的执行reduce函数时,才被定义为无效数据,而这时候,前面已经执行过shuffle和merge和sort,所以这部分无效的数据,就浪费了大量的网络IO和磁盘IO,所以在整体来讲,这是一种降低性能的表现,如果存在的无效数据越多,那么这种趋势,就越明显。 

    之所以会出现半连接,这其实也是reduce侧连接的一个变种,只不过我们在Map侧,过滤掉了一些无效的数据,所以减少了reduce过程的shuffle时间,所以能获取一个性能的提升。 

    具体的原理也是利用DistributedCache将小表的的分发到各个节点上,在Map过程的setup函数里,读取缓存里面的文件,只将小表的链接键存储在hashset里,在map函数执行时,对每一条数据,进行判断,如果这条数据的链接键为空或者在hashset里面不存在,那么则认为这条数据,是无效的数据,所以这条数据,并不会被partition分区后写入磁盘,参与reduce阶段的shuffle和sort下载地址  ,所以在一定程序上,提升了join性能。需要注意的是如果 
    小表的key依然非常巨大,可能会导致我们的程序出现OOM的情况,那么这时候我们就需要考虑其他的链接方式了。 

    测试数据如下: 
    模拟小表数据: 
    1,三劫散仙,13575468248 
    2,凤舞九天,18965235874 
    3,忙忙碌碌,15986854789 
    4,少林寺方丈,15698745862 


    模拟大表数据: 
    3,A,99,2013-03-05 
    1,B,89,2013-02-05 
    2,C,69,2013-03-09 
    3,D,56,2013-06-07 
    5,E,100,2013-09-09 
    6,H,200,2014-01-10 

    代码如下: 

    Java代码【下载地址】   复制代码 收藏代码
    1. package com.semijoin;  
    2.   
    3. import java.io.BufferedReader;  
    4. import java.io.DataInput;  
    5. import java.io.DataOutput;  
    6. import java.io.FileReader;  
    7. import java.io.IOException;  
    8. import java.net.URI;  
    9. import java.util.ArrayList;  
    10. import java.util.HashSet;  
    11. import java.util.List;  
    12.   
    13. import org.apache.hadoop.conf.Configuration;  
    14. import org.apache.hadoop.filecache.DistributedCache;  
    15. import org.apache.hadoop.fs.FileSystem;  
    16. import org.apache.hadoop.fs.Path;  
    17. import org.apache.hadoop.io.LongWritable;  
    18. import org.apache.hadoop.io.Text;  
    19. import org.apache.hadoop.io.WritableComparable;  
    20.   
    21. import org.apache.hadoop.mapred.JobConf;  
    22. import org.apache.hadoop.mapreduce.Job;  
    23. import org.apache.hadoop.mapreduce.Mapper;  
    24. import org.apache.hadoop.mapreduce.Reducer;  
    25. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    26. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
    27. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
    28. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    29. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
    30.   
    31. /*** 
    32.  *  
    33.  * Hadoop1.2的版本 
    34.  *  
    35.  * hadoop的半链接 
    36.  *  
    37.  * SemiJoin实现 
    38.  *  
    39.  * @author qindongliang 
    40.  *  
    41.  *    大数据交流群:376932160 
    42.  *  搜索技术交流群:324714439 
    43.  *  
    44.  *  
    45.  *  
    46.  * **/  
    47. public class Semjoin {  
    48.       
    49.       
    50.       
    51.     /** 
    52.      *  
    53.      *  
    54.      * 自定义一个输出实体 
    55.      *  
    56.      * **/  
    57.     private static class CombineEntity implements WritableComparable<CombineEntity>{  
    58.   
    59.           
    60.         private Text joinKey;//连接key  
    61.         private Text flag;//文件来源标志  
    62.         private Text secondPart;//除了键外的其他部分的数据  
    63.           
    64.           
    65.         public CombineEntity() {  
    66.             // TODO Auto-generated constructor stub  
    67.             this.joinKey=new Text();  
    68.             this.flag=new Text();  
    69.             this.secondPart=new Text();  
    70.         }  
    71.           
    72.         public Text getJoinKey() {  
    73.             return joinKey;  
    74.         }  
    75.   
    76.         public void setJoinKey(Text joinKey) {  
    77.             this.joinKey = joinKey;  
    78.         }  
    79.   
    80.         public Text getFlag() {  
    81.             return flag;  
    82.         }  
    83.   
    84.         public void setFlag(Text flag) {  
    85.             this.flag = flag;  
    86.         }  
    87.   
    88.         public Text getSecondPart() {  
    89.             return secondPart;  
    90.         }  
    91.   
    92.         public void setSecondPart(Text secondPart) {  
    93.             this.secondPart = secondPart;  
    94.         }  
    95.   
    96.         @Override  
    97.         public void readFields(DataInput in) throws IOException {  
    98.             this.joinKey.readFields(in);  
    99.             this.flag.readFields(in);  
    100.             this.secondPart.readFields(in);  
    101.               
    102.         }  
    103.   
    104.         @Override  
    105.         public void write(DataOutput out) throws IOException {  
    106.             this.joinKey.write(out);  
    107.             this.flag.write(out);  
    108.             this.secondPart.write(out);  
    109.               
    110.         }  
    111.   
    112.         @Override  
    113.         public int compareTo(CombineEntity o) {  
    114.             // TODO Auto-generated method stub  
    115.             return this.joinKey.compareTo(o.joinKey);  
    116.         }  
    117.           
    118.           
    119.           
    120.     }  
    121.       
    122.       
    123.       
    124.       
    125.     private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{  
    126.           
    127.         private CombineEntity combine=new CombineEntity();  
    128.         private Text flag=new Text();  
    129.         private  Text joinKey=new Text();  
    130.         private Text secondPart=new Text();  
    131.         /** 
    132.          * 存储小表的key 
    133.          *  
    134.          *  
    135.          * */  
    136.         private HashSet<String> joinKeySet=new HashSet<String>();  
    137.           
    138.           
    139.         @Override  
    140.         protected void setup(Context context)throws IOException, InterruptedException {  
    141.            
    142.             //读取文件流  
    143.             BufferedReader br=null;  
    144.             String temp;  
    145.             // 获取DistributedCached里面 的共享文件  
    146.             Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());  
    147.               
    148.             for(Path p:path){  
    149.                   
    150.                 if(p.getName().endsWith("a.txt")){  
    151.                     br=new BufferedReader(new FileReader(p.toString()));  
    152.                     //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8"));  
    153.                       
    154.                     while((temp=br.readLine())!=null){  
    155.                         String ss[]=temp.split(",");  
    156.                         //map.put(ss[0], ss[1]+" "+ss[2]);//放入hash表中  
    157.                         joinKeySet.add(ss[0]);//加入小表的key  
    158.                     }  
    159.                 }  
    160.             }  
    161.               
    162.               
    163.         }  
    164.           
    165.           
    166.           
    167.         @Override  
    168.         protected void map(LongWritable key, Text value,Context context)  
    169.                 throws IOException, InterruptedException {  
    170.               
    171.           
    172.                //获得文件输入路径  
    173.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();  
    174.           
    175.             if(pathName.endsWith("a.txt")){  
    176.                   
    177.                 String  valueItems[]=value.toString().split(",");  
    178.                   
    179.                   
    180.                 /** 
    181.                  * 在这里过滤必须要的连接字符 
    182.                  *  
    183.                  * */  
    184.                 if(joinKeySet.contains(valueItems[0])){  
    185.                     //设置标志位  
    186.                     flag.set("0");     
    187.                     //设置链接键  
    188.                     joinKey.set(valueItems[0]);            
    189.                     //设置第二部分  
    190.                     secondPart.set(valueItems[1]+" "+valueItems[2]);  
    191.                       
    192.                     //封装实体  
    193.                     combine.setFlag(flag);//标志位  
    194.                     combine.setJoinKey(joinKey);//链接键  
    195.                     combine.setSecondPart(secondPart);//其他部分  
    196.                       
    197.                      //写出  
    198.                     context.write(combine.getJoinKey(), combine);     
    199.                 }else{  
    200.                     System.out.println("a.txt里");  
    201.                     System.out.println("在小表中无此记录,执行过滤掉!");  
    202.                     for(String v:valueItems){  
    203.                         System.out.print(v+"   ");  
    204.                     }  
    205.                       
    206.                     return ;  
    207.                       
    208.                 }  
    209.                   
    210.                   
    211.                   
    212.             }else if(pathName.endsWith("b.txt")){  
    213.                 String  valueItems[]=value.toString().split(",");  
    214.                   
    215.                 /** 
    216.                  *  
    217.                  * 判断是否在集合中 
    218.                  *  
    219.                  * */  
    220.                 if(joinKeySet.contains(valueItems[0])){  
    221.                       
    222.   
    223.                     //设置标志位  
    224.                     flag.set("1");     
    225.                       
    226.                     //设置链接键  
    227.                     joinKey.set(valueItems[0]);  
    228.             
    229.                     //设置第二部分注意不同的文件的列数不一样  
    230.                     secondPart.set(valueItems[1]+" "+valueItems[2]+" "+valueItems[3]);  
    231.                       
    232.                     //封装实体  
    233.                     combine.setFlag(flag);//标志位  
    234.                     combine.setJoinKey(joinKey);//链接键  
    235.                     combine.setSecondPart(secondPart);//其他部分  
    236.                       
    237.                      //写出  
    238.                     context.write(combine.getJoinKey(), combine);  
    239.                       
    240.                       
    241.                 }else{                    
    242.                     //执行过滤 ......  
    243.                     System.out.println("b.txt里");  
    244.                     System.out.println("在小表中无此记录,执行过滤掉!");  
    245.                     for(String v:valueItems){  
    246.                         System.out.print(v+"   ");  
    247.                     }  
    248.                       
    249.                     return ;  
    250.                       
    251.                       
    252.                 }  
    253.                   
    254.               
    255.                   
    256.                   
    257.             }  
    258.               
    259.               
    260.               
    261.               
    262.                
    263.            
    264.               
    265.         }  
    266.           
    267.     }  
    268.       
    269.       
    270.     private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{  
    271.           
    272.           
    273.         //存储一个分组中左表信息  
    274.         private List<Text> leftTable=new ArrayList<Text>();  
    275.         //存储一个分组中右表信息  
    276.         private List<Text> rightTable=new ArrayList<Text>();  
    277.           
    278.         private Text secondPart=null;  
    279.           
    280.         private Text output=new Text();  
    281.           
    282.           
    283.            
    284.         //一个分组调用一次  
    285.         @Override  
    286.         protected void reduce(Text key, Iterable<CombineEntity> values,Context context)  
    287.                 throws IOException, InterruptedException {  
    288.              leftTable.clear();//清空分组数据  
    289.              rightTable.clear();//清空分组数据  
    290.                
    291.                
    292.              /** 
    293.               * 将不同文件的数据,分别放在不同的集合 
    294.               * 中,注意数据量过大时,会出现 
    295.               * OOM的异常 
    296.               *  
    297.               * **/  
    298.                
    299.              for(CombineEntity ce:values){  
    300.                    
    301.                  this.secondPart=new Text(ce.getSecondPart().toString());  
    302.                    
    303.                    
    304.                  //左表  
    305.                    
    306.                  if(ce.getFlag().toString().trim().equals("0")){  
    307.                      leftTable.add(secondPart);  
    308.                        
    309.                  }else if(ce.getFlag().toString().trim().equals("1")){  
    310.                        
    311.                      rightTable.add(secondPart);  
    312.                        
    313.                  }  
    314.                    
    315.                    
    316.                    
    317.                    
    318.              }  
    319.                
    320.              //=====================  
    321.              for(Text left:leftTable){  
    322.                    
    323.                  for(Text right:rightTable){  
    324.                        
    325.                      output.set(left+" "+right);//连接左右数据  
    326.                      context.write(key, output);//输出  
    327.                  }  
    328.                    
    329.              }  
    330.                
    331.                
    332.                
    333.               
    334.         }  
    335.           
    336.     }  
    337.       
    338.       
    339.       
    340.       
    341.       
    342.       
    343.       
    344.       
    345.     public static void main(String[] args)throws Exception {  
    346.           
    347.            
    348.       
    349.       
    350.          //Job job=new Job(conf,"myjoin");  
    351.          JobConf conf=new JobConf(Semjoin.class);   
    352.            conf.set("mapred.job.tracker","192.168.75.130:9001");  
    353.             conf.setJar("tt.jar");  
    354.             
    355.             
    356.             //小表共享  
    357.             String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt";  
    358.             //添加到共享cache里  
    359.         DistributedCache.addCacheFile(new URI(bpath), conf);  
    360.           
    361.           Job job=new Job(conf, "aaaaa");  
    362.          job.setJarByClass(Semjoin.class);  
    363.          System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  
    364.            
    365.            
    366.          //设置Map和Reduce自定义类  
    367.          job.setMapperClass(JMapper.class);  
    368.          job.setReducerClass(JReduce.class);  
    369.            
    370.          //设置Map端输出  
    371.          job.setMapOutputKeyClass(Text.class);  
    372.          job.setMapOutputValueClass(CombineEntity.class);  
    373.            
    374.          //设置Reduce端的输出  
    375.          job.setOutputKeyClass(Text.class);  
    376.          job.setOutputValueClass(Text.class);  
    377.            
    378.       
    379.          job.setInputFormatClass(TextInputFormat.class);  
    380.          job.setOutputFormatClass(TextOutputFormat.class);  
    381.            
    382.        
    383.          FileSystem fs=FileSystem.get(conf);  
    384.            
    385.          Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew4");  
    386.            
    387.          if(fs.exists(op)){  
    388.              fs.delete(op, true);  
    389.              System.out.println("存在此输出路径,已删除!!!");  
    390.          }  
    391.            
    392.            
    393.       FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));  
    394.       FileOutputFormat.setOutputPath(job, op);  
    395.          
    396.       System.exit(job.waitForCompletion(true)?0:1);  
    397.         
    398.         
    399.            
    400.            
    401.            
    402.            
    403.            
    404.            
    405.            
    406.                   
    407.                    
    408.           
    409.           
    410.           
    411.     }  
    412.       
    413.       
    414.       
    415.   
    416. }  
    Java代码  收藏代码
    1. package com.semijoin;  
    2.   
    3. import java.io.BufferedReader;  
    4. import java.io.DataInput;  
    5. import java.io.DataOutput;  
    6. import java.io.FileReader;  
    7. import java.io.IOException;  
    8. import java.net.URI;  
    9. import java.util.ArrayList;  
    10. import java.util.HashSet;  
    11. import java.util.List;  
    12.   
    13. import org.apache.hadoop.conf.Configuration;  
    14. import org.apache.hadoop.filecache.DistributedCache;  
    15. import org.apache.hadoop.fs.FileSystem;  
    16. import org.apache.hadoop.fs.Path;  
    17. import org.apache.hadoop.io.LongWritable;  
    18. import org.apache.hadoop.io.Text;  
    19. import org.apache.hadoop.io.WritableComparable;  
    20.   
    21. import org.apache.hadoop.mapred.JobConf;  
    22. import org.apache.hadoop.mapreduce.Job;  
    23. import org.apache.hadoop.mapreduce.Mapper;  
    24. import org.apache.hadoop.mapreduce.Reducer;  
    25. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    26. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
    27. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
    28. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    29. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
    30.   
    31. /*** 
    32.  *  
    33.  * Hadoop1.2的版本 
    34.  *  
    35.  * hadoop的半链接 
    36.  *  
    37.  * SemiJoin实现 
    38.  *  
    39.  * @author qindongliang 
    40.  *  
    41.  *    大数据交流群:376932160 
    42.  *  搜索技术交流群:324714439 
    43.  *  
    44.  *  
    45.  *  
    46.  * **/  
    47. public class Semjoin {  
    48.       
    49.       
    50.       
    51.     /** 
    52.      *  
    53.      *  
    54.      * 自定义一个输出实体 
    55.      *  
    56.      * **/  
    57.     private static class CombineEntity implements WritableComparable<CombineEntity>{  
    58.   
    59.           
    60.         private Text joinKey;//连接key  
    61.         private Text flag;//文件来源标志  
    62.         private Text secondPart;//除了键外的其他部分的数据  
    63.           
    64.           
    65.         public CombineEntity() {  
    66.             // TODO Auto-generated constructor stub  
    67.             this.joinKey=new Text();  
    68.             this.flag=new Text();  
    69.             this.secondPart=new Text();  
    70.         }  
    71.           
    72.         public Text getJoinKey() {  
    73.             return joinKey;  
    74.         }  
    75.   
    76.         public void setJoinKey(Text joinKey) {  
    77.             this.joinKey = joinKey;  
    78.         }  
    79.   
    80.         public Text getFlag() {  
    81.             return flag;  
    82.         }  
    83.   
    84.         public void setFlag(Text flag) {  
    85.             this.flag = flag;  
    86.         }  
    87.   
    88.         public Text getSecondPart() {  
    89.             return secondPart;  
    90.         }  
    91.   
    92.         public void setSecondPart(Text secondPart) {  
    93.             this.secondPart = secondPart;  
    94.         }  
    95.   
    96.         @Override  
    97.         public void readFields(DataInput in) throws IOException {  
    98.             this.joinKey.readFields(in);  
    99.             this.flag.readFields(in);  
    100.             this.secondPart.readFields(in);  
    101.               
    102.         }  
    103.   
    104.         @Override  
    105.         public void write(DataOutput out) throws IOException {  
    106.             this.joinKey.write(out);  
    107.             this.flag.write(out);  
    108.             this.secondPart.write(out);  
    109.               
    110.         }  
    111.   
    112.         @Override  
    113.         public int compareTo(CombineEntity o) {  
    114.             // TODO Auto-generated method stub  
    115.             return this.joinKey.compareTo(o.joinKey);  
    116.         }  
    117.           
    118.           
    119.           
    120.     }  
    121.       
    122.       
    123.       
    124.       
    125.     private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{  
    126.           
    127.         private CombineEntity combine=new CombineEntity();  
    128.         private Text flag=new Text();  
    129.         private  Text joinKey=new Text();  
    130.         private Text secondPart=new Text();  
    131.         /** 
    132.          * 存储小表的key 
    133.          *  
    134.          *  
    135.          * */  
    136.         private HashSet<String> joinKeySet=new HashSet<String>();  
    137.           
    138.           
    139.         @Override  
    140.         protected void setup(Context context)throws IOException, InterruptedException {  
    141.            
    142.             //读取文件流  
    143.             BufferedReader br=null;  
    144.             String temp;  
    145.             // 获取DistributedCached里面 的共享文件  
    146.             Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());  
    147.               
    148.             for(Path p:path){  
    149.                   
    150.                 if(p.getName().endsWith("a.txt")){  
    151.                     br=new BufferedReader(new FileReader(p.toString()));  
    152.                     //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8"));  
    153.                       
    154.                     while((temp=br.readLine())!=null){  
    155.                         String ss[]=temp.split(",");  
    156.                         //map.put(ss[0], ss[1]+" "+ss[2]);//放入hash表中  
    157.                         joinKeySet.add(ss[0]);//加入小表的key  
    158.                     }  
    159.                 }  
    160.             }  
    161.               
    162.               
    163.         }  
    164.           
    165.           
    166.           
    167.         @Override  
    168.         protected void map(LongWritable key, Text value,Context context)  
    169.                 throws IOException, InterruptedException {  
    170.               
    171.           
    172.                //获得文件输入路径  
    173.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();  
    174.           
    175.             if(pathName.endsWith("a.txt")){  
    176.                   
    177.                 String  valueItems[]=value.toString().split(",");  
    178.                   
    179.                   
    180.                 /** 
    181.                  * 在这里过滤必须要的连接字符 
    182.                  *  
    183.                  * */  
    184.                 if(joinKeySet.contains(valueItems[0])){  
    185.                     //设置标志位  
    186.                     flag.set("0");     
    187.                     //设置链接键  
    188.                     joinKey.set(valueItems[0]);            
    189.                     //设置第二部分  
    190.                     secondPart.set(valueItems[1]+" "+valueItems[2]);  
    191.                       
    192.                     //封装实体  
    193.                     combine.setFlag(flag);//标志位  
    194.                     combine.setJoinKey(joinKey);//链接键  
    195.                     combine.setSecondPart(secondPart);//其他部分  
    196.                       
    197.                      //写出  
    198.                     context.write(combine.getJoinKey(), combine);     
    199.                 }else{  
    200.                     System.out.println("a.txt里");  
    201.                     System.out.println("在小表中无此记录,执行过滤掉!");  
    202.                     for(String v:valueItems){  
    203.                         System.out.print(v+"   ");  
    204.                     }  
    205.                       
    206.                     return ;  
    207.                       
    208.                 }  
    209.                   
    210.                   
    211.                   
    212.             }else if(pathName.endsWith("b.txt")){  
    213.                 String  valueItems[]=value.toString().split(",");  
    214.                   
    215.                 /** 
    216.                  *  
    217.                  * 判断是否在集合中 
    218.                  *  
    219.                  * */  
    220.                 if(joinKeySet.contains(valueItems[0])){  
    221.                       
    222.   
    223.                     //设置标志位  
    224.                     flag.set("1");     
    225.                       
    226.                     //设置链接键  
    227.                     joinKey.set(valueItems[0]);  
    228.             
    229.                     //设置第二部分注意不同的文件的列数不一样  
    230.                     secondPart.set(valueItems[1]+" "+valueItems[2]+" "+valueItems[3]);  
    231.                       
    232.                     //封装实体  
    233.                     combine.setFlag(flag);//标志位  
    234.                     combine.setJoinKey(joinKey);//链接键  
    235.                     combine.setSecondPart(secondPart);//其他部分  
    236.                       
    237.                      //写出  
    238.                     context.write(combine.getJoinKey(), combine);  
    239.                       
    240.                       
    241.                 }else{                    
    242.                     //执行过滤 ......  
    243.                     System.out.println("b.txt里");  
    244.                     System.out.println("在小表中无此记录,执行过滤掉!");  
    245.                     for(String v:valueItems){  
    246.                         System.out.print(v+"   ");  
    247.                     }  
    248.                       
    249.                     return ;  
    250.                       
    251.                       
    252.                 }  
    253.                   
    254.               
    255.                   
    256.                   
    257.             }  
    258.               
    259.               
    260.               
    261.               
    262.                
    263.            
    264.               
    265.         }  
    266.           
    267.     }  
    268.       
    269.       
    270.     private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{  
    271.           
    272.           
    273.         //存储一个分组中左表信息  
    274.         private List<Text> leftTable=new ArrayList<Text>();  
    275.         //存储一个分组中右表信息  
    276.         private List<Text> rightTable=new ArrayList<Text>();  
    277.           
    278.         private Text secondPart=null;  
    279.           
    280.         private Text output=new Text();  
    281.           
    282.           
    283.            
    284.         //一个分组调用一次  
    285.         @Override  
    286.         protected void reduce(Text key, Iterable<CombineEntity> values,Context context)  
    287.                 throws IOException, InterruptedException {  
    288.              leftTable.clear();//清空分组数据  
    289.              rightTable.clear();//清空分组数据  
    290.                
    291.                
    292.              /** 
    293.               * 将不同文件的数据,分别放在不同的集合 
    294.               * 中,注意数据量过大时,会出现 
    295.               * OOM的异常 
    296.               *  
    297.               * **/  
    298.                
    299.              for(CombineEntity ce:values){  
    300.                    
    301.                  this.secondPart=new Text(ce.getSecondPart().toString());  
    302.                    
    303.                    
    304.                  //左表  
    305.                    
    306.                  if(ce.getFlag().toString().trim().equals("0")){  
    307.                      leftTable.add(secondPart);  
    308.                        
    309.                  }else if(ce.getFlag().toString().trim().equals("1")){  
    310.                        
    311.                      rightTable.add(secondPart);  
    312.                        
    313.                  }  
    314.                    
    315.                    
    316.                    
    317.                    
    318.              }  
    319.                
    320.              //=====================  
    321.              for(Text left:leftTable){  
    322.                    
    323.                  for(Text right:rightTable){  
    324.                        
    325.                      output.set(left+" "+right);//连接左右数据  
    326.                      context.write(key, output);//输出  
    327.                  }  
    328.                    
    329.              }  
    330.                
    331.                
    332.                
    333.               
    334.         }  
    335.           
    336.     }  
    337.       
    338.       
    339.       
    340.       
    341.       
    342.       
    343.       
    344.       
    345.     public static void main(String[] args)throws Exception {  
    346.           
    347.            
    348.       
    349.       
    350.          //Job job=new Job(conf,"myjoin");  
    351.          JobConf conf=new JobConf(Semjoin.class);   
    352.            conf.set("mapred.job.tracker","192.168.75.130:9001");  
    353.             conf.setJar("tt.jar");  
    354.             
    355.             
    356.             //小表共享  
    357.             String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt";  
    358.             //添加到共享cache里  
    359.         DistributedCache.addCacheFile(new URI(bpath), conf);  
    360.           
    361.           Job job=new Job(conf, "aaaaa");  
    362.          job.setJarByClass(Semjoin.class);  
    363.          System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  
    364.            
    365.            
    366.          //设置Map和Reduce自定义类  
    367.          job.setMapperClass(JMapper.class);  
    368.          job.setReducerClass(JReduce.class);  
    369.            
    370.          //设置Map端输出  
    371.          job.setMapOutputKeyClass(Text.class);  
    372.          job.setMapOutputValueClass(CombineEntity.class);  
    373.            
    374.          //设置Reduce端的输出  
    375.          job.setOutputKeyClass(Text.class);  
    376.          job.setOutputValueClass(Text.class);  
    377.            
    378.       
    379.          job.setInputFormatClass(TextInputFormat.class);  
    380.          job.setOutputFormatClass(TextOutputFormat.class);  
    381.            
    382.        
    383.          FileSystem fs=FileSystem.get(conf);  
    384.            
    385.          Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew4");  
    386.            
    387.          if(fs.exists(op)){  
    388.              fs.delete(op, true);  
    389.              System.out.println("存在此输出路径,已删除!!!");  
    390.          }  
    391.            
    392.            
    393.       FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));  
    394.       FileOutputFormat.setOutputPath(job, op);  
    395.          
    396.       System.exit(job.waitForCompletion(true)?0:1);  
    397.         
    398.         
    399.            
    400.            
    401.            
    402.            
    403.            
    404.            
    405.            
    406.                   
    407.                    
    408.           
    409.           
    410.           
    411.     }  
    412.       
    413.       
    414.       
    415.   
    416. }  


    运行日志如下:

    Java代码 复制代码 收藏代码
    1. 模式:  192.168.75.130:9001  
    2. 存在此输出路径,已删除!!!  
    3. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
    4. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2  
    5. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
    6. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
    7. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404260312_0002  
    8. INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%  
    9. INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%  
    10. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
    11. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%  
    12. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
    13. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404260312_0002  
    14. INFO - Counters.log(585) | Counters: 29  
    15. INFO - Counters.log(587) |   Job Counters   
    16. INFO - Counters.log(589) |     Launched reduce tasks=1  
    17. INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=12445  
    18. INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0  
    19. INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0  
    20. INFO - Counters.log(589) |     Launched map tasks=2  
    21. INFO - Counters.log(589) |     Data-local map tasks=2  
    22. INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9801  
    23. INFO - Counters.log(587) |   File Output Format Counters   
    24. INFO - Counters.log(589) |     Bytes Written=172  
    25. INFO - Counters.log(587) |   FileSystemCounters  
    26. INFO - Counters.log(589) |     FILE_BYTES_READ=237  
    27. INFO - Counters.log(589) |     HDFS_BYTES_READ=455  
    28. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=169503  
    29. INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172  
    30. INFO - Counters.log(587) |   File Input Format Counters   
    31. INFO - Counters.log(589) |     Bytes Read=227  
    32. INFO - Counters.log(587) |   Map-Reduce Framework  
    33. INFO - Counters.log(589) |     Map output materialized bytes=243  
    34. INFO - Counters.log(589) |     Map input records=10  
    35. INFO - Counters.log(589) |     Reduce shuffle bytes=243  
    36. INFO - Counters.log(589) |     Spilled Records=16  
    37. INFO - Counters.log(589) |     Map output bytes=215  
    38. INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944  
    39. INFO - Counters.log(589) |     CPU time spent (ms)=1770  
    40. INFO - Counters.log(589) |     Combine input records=0  
    41. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228  
    42. INFO - Counters.log(589) |     Reduce input records=8  
    43. INFO - Counters.log(589) |     Reduce input groups=4  
    44. INFO - Counters.log(589) |     Combine output records=0  
    45. INFO - Counters.log(589) |     Physical memory (bytes) snapshot=442564608  
    46. INFO - Counters.log(589) |     Reduce output records=4  
    47. INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184306688  
    48. INFO - Counters.log(589) |     Map output records=8  
    Java代码  收藏代码
    1. 模式:  192.168.75.130:9001  
    2. 存在此输出路径,已删除!!!  
    3. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
    4. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2  
    5. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
    6. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
    7. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404260312_0002  
    8. INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%  
    9. INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%  
    10. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
    11. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%  
    12. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
    13. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404260312_0002  
    14. INFO - Counters.log(585) | Counters: 29  
    15. INFO - Counters.log(587) |   Job Counters   
    16. INFO - Counters.log(589) |     Launched reduce tasks=1  
    17. INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=12445  
    18. INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0  
    19. INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0  
    20. INFO - Counters.log(589) |     Launched map tasks=2  
    21. INFO - Counters.log(589) |     Data-local map tasks=2  
    22. INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9801  
    23. INFO - Counters.log(587) |   File Output Format Counters   
    24. INFO - Counters.log(589) |     Bytes Written=172  
    25. INFO - Counters.log(587) |   FileSystemCounters  
    26. INFO - Counters.log(589) |     FILE_BYTES_READ=237  
    27. INFO - Counters.log(589) |     HDFS_BYTES_READ=455  
    28. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=169503  
    29. INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172  
    30. INFO - Counters.log(587) |   File Input Format Counters   
    31. INFO - Counters.log(589) |     Bytes Read=227  
    32. INFO - Counters.log(587) |   Map-Reduce Framework  
    33. INFO - Counters.log(589) |     Map output materialized bytes=243  
    34. INFO - Counters.log(589) |     Map input records=10  
    35. INFO - Counters.log(589) |     Reduce shuffle bytes=243  
    36. INFO - Counters.log(589) |     Spilled Records=16  
    37. INFO - Counters.log(589) |     Map output bytes=215  
    38. INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944  
    39. INFO - Counters.log(589) |     CPU time spent (ms)=1770  
    40. INFO - Counters.log(589) |     Combine input records=0  
    41. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228  
    42. INFO - Counters.log(589) |     Reduce input records=8  
    43. INFO - Counters.log(589) |     Reduce input groups=4  
    44. INFO - Counters.log(589) |     Combine output records=0  
    45. INFO - Counters.log(589) |     Physical memory (bytes) snapshot=442564608  
    46. INFO - Counters.log(589) |     Reduce output records=4  
    47. INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184306688  
    48. INFO - Counters.log(589) |     Map output records=8  


    在map侧过滤的数据,在50030中查看的截图如下: 



    运行结果如下所示:

    Java代码 复制代码 收藏代码
    1. 1   三劫散仙    13575468248 B   89  2013-02-05  
    2. 2   凤舞九天    18965235874 C   69  2013-03-09  
    3. 3   忙忙碌碌    15986854789 A   99  2013-03-05  
    4. 3   忙忙碌碌    15986854789 D   56  2013-06-07  
    Java代码  收藏代码
    1. 1   三劫散仙    13575468248 B   89  2013-02-05  
    2. 2   凤舞九天    18965235874 C   69  2013-03-09  
    3. 3   忙忙碌碌    15986854789 A   99  2013-03-05  
    4. 3   忙忙碌碌    15986854789 D   56  2013-06-07  



    至此,这个半链接就完成了,结果正确,在hadoop的几种join方式里,只有在Map侧的链接比较高效,但也需要根据具体的实际情况,进行选择。

  • 相关阅读:
    使用Jenkins自动编译 .net 项目
    Windows平台下Git服务器搭建
    在MAC上搭建cordova3.4.0的IOS和android开发环境
    检索 COM 类工厂中 CLSID 为 {820280E0-8ADA-4582-A1D9-960A83CE8BB5} 的组件失败,原因是出现以下错误: 80040154 没有注册类 (异常来自 HRESULT:0x80040154 (REGDB_E_CLASSNOTREG))。
    IIS7 404 模块 IIS Web Core 通知 MapRequestHandler 处理程序 StaticFile 错误代码 0x80070002
    mac 端口被占用及kill端口
    查询数据库表大小sql
    开启关闭keditor 过滤
    sql修改字段名称
    Android客户端性能优化
  • 原文地址:https://www.cnblogs.com/sdaassjjsd/p/5593157.html
Copyright © 2011-2022 走看看