zoukankan      html  css  js  c++  java
  • (转)Hadoop MapReduce链式实践--ChainReducer

    版本:CDH5.0.0,HDFS:2.3.0,Mapreduce:2.3.0,Yarn:2.3.0。

    场景描述:求一组数据中按照不同类别的最大值,比如,如下的数据:

    data1:

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. A,10  
    2. A,11  
    3. A,12  
    4. A,13  
    5. B,21  
    6. B,31  
    7. B,41  
    8. B,51  

    data2:

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. A,20  
    2. A,21  
    3. A,22  
    4. A,23  
    5. B,201  
    6. B,301  
    7. B,401  
    8. B,501  

    最后输出为:

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. A,23  
    2. B,501  

    假如这样的逻辑的mapreduce数据流如下:

    假设C组数据比较多,同时假设集群有2个节点,那么这个任务分配2个reducer,且C组数据平均分布到两个reducer中,(这样做是为了效率考虑,如果只有一个reducer,那么当一个节点在运行reducer的时候另外一个节点会处于空闲状态)那么如果在reducer之后,还可以再次做一个reducer,那么不就可以整合数据到一个文件了么,同时还可以再次比较C组数据中,以得到真正比较大的数据。

    首先说下,不用上面假设的方式进行操作,那么一般的操作方法。一般有两种方法:其一,直接读出HDFS数据,然后进行整合;其二,新建另外一个Job来进行整合。这两种方法,如果就效率来说的话,可能第一种效率会高点。

    考虑到前面提出的mapreduce数据流,以前曾对ChainReducer有点印象,好像可以做这个,所以就拿ChainReducer来试,同时为了学多点知识,也是用了多个Mapper(即使用ChainMapper)。

    主程序代码如下:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. package chain;  
    2.   
    3. import org.apache.hadoop.conf.Configuration;  
    4. import org.apache.hadoop.conf.Configured;  
    5. import org.apache.hadoop.fs.Path;  
    6. import org.apache.hadoop.io.IntWritable;  
    7. import org.apache.hadoop.io.LongWritable;  
    8. import org.apache.hadoop.io.Text;  
    9. import org.apache.hadoop.mapred.FileInputFormat;  
    10. import org.apache.hadoop.mapred.FileOutputFormat;  
    11. import org.apache.hadoop.mapred.JobClient;  
    12. import org.apache.hadoop.mapred.JobConf;  
    13. import org.apache.hadoop.mapred.TextInputFormat;  
    14. import org.apache.hadoop.mapred.TextOutputFormat;  
    15. import org.apache.hadoop.mapred.lib.ChainMapper;  
    16. import org.apache.hadoop.mapred.lib.ChainReducer;  
    17. import org.apache.hadoop.util.Tool;  
    18. import org.apache.hadoop.util.ToolRunner;  
    19.   
    20. public class ChainDriver2 extends Configured implements Tool{  
    21.   
    22.     /** 
    23.      * ChainReducer 实战 
    24.      * 验证多个reducer的整合 
    25.      * 逻辑:寻找最大值 
    26.      * @param args 
    27.      */  
    28.       
    29.     private String input=null;  
    30.     private String output=null;  
    31.     private String delimiter=null;  
    32.     private int reducer=1;  
    33.     public static void main(String[] args) throws Exception {  
    34.         ToolRunner.run(new Configuration(), new ChainDriver2(),args);  
    35.     }  
    36.       
    37.     @Override  
    38.     public int run(String[] arg0) throws Exception {  
    39.         configureArgs(arg0);  
    40.         checkArgs();  
    41.         Configuration conf = getConf();  
    42.         conf.set("delimiter", delimiter);  
    43.         JobConf  job= new JobConf(conf,ChainDriver2.class);  
    44.           
    45.         ChainMapper.addMapper(job, MaxMapper.class, LongWritable.class,  
    46.                 Text.class, Text.class, IntWritable.class, true, new JobConf(false)) ;  
    47.           
    48.         ChainMapper.addMapper(job, MergeMaxMapper.class, Text.class,  
    49.                 IntWritable.class, Text.class, IntWritable.class, true, new JobConf(false));  
    50.           
    51.         ChainReducer.setReducer(job, MaxReducer.class, Text.class, IntWritable.class,  
    52.                 Text.class, IntWritable.class, true, new JobConf(false));  
    53.         ChainReducer.addMapper(job, MergeMaxMapper.class, Text.class,  
    54.                 IntWritable.class, Text.class, IntWritable.class, false, new JobConf(false));  
    55.         job.setJarByClass(ChainDriver2.class);  
    56.         job.setJobName("ChainReducer test job");  
    57.           
    58.         job.setMapOutputKeyClass(Text.class);  
    59.         job.setMapOutputValueClass(IntWritable.class);  
    60.         job.setOutputKeyClass(Text.class);  
    61.         job.setOutputValueClass(IntWritable.class);  
    62.           
    63.        /* job.setMapperClass(MaxMapper.class); 
    64.         job.setReducerClass(MaxReducer.class);*/  
    65.         job.setInputFormat(TextInputFormat.class);;  
    66.         job.setOutputFormat(TextOutputFormat.class);  
    67.         job.setNumReduceTasks(reducer);  
    68.           
    69.         FileInputFormat.addInputPath(job, new Path(input));  
    70.         FileOutputFormat.setOutputPath(job, new Path(output));  
    71.           
    72.         JobClient.runJob(job);  
    73.         return 0;  
    74.     }  
    75.       
    76.       
    77.     /** 
    78.      * check the args  
    79.      */  
    80.     private void checkArgs() {  
    81.         if(input==null||"".equals(input)){  
    82.             System.out.println("no input...");  
    83.             printUsage();  
    84.             System.exit(-1);  
    85.         }  
    86.         if(output==null||"".equals(output)){  
    87.             System.out.println("no output...");  
    88.             printUsage();  
    89.             System.exit(-1);  
    90.         }  
    91.         if(delimiter==null||"".equals(delimiter)){  
    92.             System.out.println("no delimiter...");  
    93.             printUsage();  
    94.             System.exit(-1);  
    95.         }  
    96.         if(reducer==0){  
    97.             System.out.println("no reducer...");  
    98.             printUsage();  
    99.             System.exit(-1);  
    100.         }  
    101.     }  
    102.   
    103.     /** 
    104.      * configuration the args 
    105.      * @param args 
    106.      */  
    107.     private void configureArgs(String[] args) {  
    108.         for(int i=0;i<args.length;i++){  
    109.             if("-i".equals(args[i])){  
    110.                 input=args[++i];  
    111.             }  
    112.             if("-o".equals(args[i])){  
    113.                 output=args[++i];  
    114.             }  
    115.               
    116.             if("-delimiter".equals(args[i])){  
    117.                 delimiter=args[++i];  
    118.             }  
    119.             if("-reducer".equals(args[i])){  
    120.                 try {  
    121.                     reducer=Integer.parseInt(args[++i]);  
    122.                 } catch (Exception e) {  
    123.                     reducer=0;  
    124.                 }  
    125.             }  
    126.         }  
    127.     }  
    128.     public static void printUsage(){  
    129.         System.err.println("Usage:");  
    130.         System.err.println("-i input   cell data path.");  
    131.         System.err.println("-o output   output data path.");  
    132.         System.err.println("-delimiter  data delimiter , default is blanket  .");  
    133.         System.err.println("-reducer  reducer number , default is 1  .");  
    134.     }  
    135.       
    136. }  


    MaxMapper:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. package chain;  
    2.   
    3. import java.io.IOException;  
    4.   
    5. import org.apache.hadoop.io.IntWritable;  
    6. import org.apache.hadoop.io.LongWritable;  
    7. import org.apache.hadoop.io.Text;  
    8. import org.apache.hadoop.mapred.JobConf;  
    9. import org.apache.hadoop.mapred.MapReduceBase;  
    10. import org.apache.hadoop.mapred.Mapper;  
    11. import org.apache.hadoop.mapred.OutputCollector;  
    12. import org.apache.hadoop.mapred.Reporter;  
    13. import org.slf4j.Logger;  
    14. import org.slf4j.LoggerFactory;  
    15.   
    16. public class MaxMapper extends MapReduceBase implements Mapper<LongWritable ,Text,Text,IntWritable>{  
    17.     private Logger log = LoggerFactory.getLogger(MaxMapper.class);  
    18.     private String delimiter=null;  
    19.     @Override  
    20.     public void configure(JobConf conf){  
    21.         delimiter=conf.get("delimiter");  
    22.         log.info("delimiter:"+delimiter);  
    23.         log.info("This is the begin of MaxMapper");  
    24.     }  
    25.       
    26.     @Override  
    27.     public void map(LongWritable key, Text value,  
    28.             OutputCollector<Text, IntWritable> out, Reporter reporter)  
    29.             throws IOException {  
    30.         // TODO Auto-generated method stub  
    31.         String[] values= value.toString().split(delimiter);  
    32.         log.info(values[0]+"-->"+values[1]);  
    33.         out.collect(new Text(values[0]), new IntWritable(Integer.parseInt(values[1])));  
    34.           
    35.     }  
    36.     public void close(){  
    37.         log.info("This is the end of MaxMapper");  
    38.     }  
    39. }  


    MaxReducer:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. package chain;  
    2.   
    3. import java.io.IOException;  
    4. import java.util.Iterator;  
    5.   
    6. import org.apache.hadoop.io.IntWritable;  
    7. import org.apache.hadoop.io.Text;  
    8. import org.apache.hadoop.mapred.JobConf;  
    9. import org.apache.hadoop.mapred.MapReduceBase;  
    10. import org.apache.hadoop.mapred.OutputCollector;  
    11. import org.apache.hadoop.mapred.Reducer;  
    12. import org.apache.hadoop.mapred.Reporter;  
    13. import org.slf4j.Logger;  
    14. import org.slf4j.LoggerFactory;  
    15.   
    16. public   class MaxReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>{  
    17.     private Logger log = LoggerFactory.getLogger(MaxReducer.class);  
    18.     @Override  
    19.     public void configure(JobConf conf){  
    20.         log.info("This is the begin of the MaxReducer");  
    21.     }  
    22.     @Override  
    23.     public void reduce(Text key, Iterator<IntWritable> values,  
    24.             OutputCollector<Text, IntWritable> out, Reporter reporter)  
    25.             throws IOException {  
    26.         // TODO Auto-generated method stub  
    27.         int max=-1;  
    28.         while(values.hasNext()){  
    29.             int value=values.next().get();  
    30.             if(value>max){  
    31.                 max=value;  
    32.             }  
    33.         }  
    34.         log.info(key+"-->"+max);  
    35.         out.collect(key, new IntWritable(max));  
    36.           
    37.     }  
    38.       
    39.     @Override  
    40.     public void close(){  
    41.         log.info("This is the end of the MaxReducer");  
    42.     }  
    43. }  


    MergeMaxMapper:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. package chain;  
    2.   
    3. import java.io.IOException;  
    4. //import java.util.ArrayList;  
    5. //import java.util.HashMap;  
    6. //import java.util.Map;  
    7.   
    8. import org.apache.hadoop.io.IntWritable;  
    9. import org.apache.hadoop.io.Text;  
    10. import org.apache.hadoop.mapred.JobConf;  
    11. import org.apache.hadoop.mapred.MapReduceBase;  
    12. import org.apache.hadoop.mapred.Mapper;  
    13. import org.apache.hadoop.mapred.OutputCollector;  
    14. import org.apache.hadoop.mapred.Reporter;  
    15. import org.slf4j.Logger;  
    16. import org.slf4j.LoggerFactory;  
    17.   
    18. public class MergeMaxMapper extends MapReduceBase implements Mapper<Text ,IntWritable,Text,IntWritable>{  
    19.     private Logger log = LoggerFactory.getLogger(MergeMaxMapper.class);  
    20. //  private Map<Text,ArrayList<IntWritable>> outMap= new HashMap<Text,ArrayList<IntWritable>>();  
    21.     @Override  
    22.     public void configure(JobConf conf){  
    23.         log.info("This is the begin of MergeMaxMapper");  
    24.     }  
    25.       
    26.     @Override  
    27.     public void map(Text key, IntWritable value,  
    28.             OutputCollector<Text, IntWritable> out, Reporter reporter)  
    29.             throws IOException {  
    30.         log.info(key.toString()+"_MergeMaxMapper"+"-->"+value.get());  
    31.         out.collect(new Text(key.toString()+"_MergeMaxMapper"), value);  
    32.           
    33.     }  
    34.       
    35.     @Override  
    36.     public void close(){  
    37.         log.info("this is the end of MergeMaxMapper");  
    38.     }  
    39. }  


    编程思路如下:原始测试数据data1、data2首先经过MaxMapper(由于两个文件,所以生成了2个map),然后经过MergeMaxMapper,到MaxReducer,最后再次经过MergeMaxMapper。

    在程序中添加了输出数据的log,可以通过log来查看各个map和reduce的数据流程。

    mapper端的log(其中的一个mapper):

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. 2014-05-14 17:23:51,307 INFO [main] chain.MaxMapper: delimiter:,  
    2. 2014-05-14 17:23:51,307 INFO [main] chain.MaxMapper: This is the begin of MaxMapper  
    3. 2014-05-14 17:23:51,454 INFO [main] chain.MergeMaxMapper: This is the begin of MergeMaxMapper  
    4. 2014-05-14 17:23:51,471 INFO [main] chain.MaxMapper: A-->20  
    5. 2014-05-14 17:23:51,476 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->20  
    6. 2014-05-14 17:23:51,476 INFO [main] chain.MaxMapper: A-->21  
    7. 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->21  
    8. 2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: A-->22  
    9. 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->22  
    10. 2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: A-->23  
    11. 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->23  
    12. 2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: B-->201  
    13. 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->201  
    14. 2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: B-->301  
    15. 2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->301  
    16. 2014-05-14 17:23:51,478 INFO [main] chain.MaxMapper: B-->401  
    17. 2014-05-14 17:23:51,478 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->401  
    18. 2014-05-14 17:23:51,478 INFO [main] chain.MaxMapper: B-->501  
    19. 2014-05-14 17:23:51,478 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->501  
    20. 2014-05-14 17:23:51,481 INFO [main] chain.MaxMapper: This is the end of MaxMapper  
    21. 2014-05-14 17:23:51,481 INFO [main] chain.MergeMaxMapper: this is the end of MergeMaxMapper  


    通过上面log,可以看出,通过ChainMapper添加mapper的方式的mapper的处理顺序为:首先初始化第一个mapper(即调用configure方法);接着初始第二个mapper(调用configure方法);然后开始map函数,map函数针对一条记录,首先采用mapper1进行处理,然后使用mapper2进行处理;最后是关闭阶段,关闭的顺序同样是首先关闭mapper1(调用close方法),然后关闭mapper2。

    reducer端的log(其中一个reducer)

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. 2014-05-14 17:24:10,171 INFO [main] chain.MergeMaxMapper: This is the begin of MergeMaxMapper  
    2. 2014-05-14 17:24:10,311 INFO [main] chain.MaxReducer: This is the begin of the MaxReducer  
    3. 2014-05-14 17:24:10,671 INFO [main] chain.MaxReducer: B_MergeMaxMapper-->501  
    4. 2014-05-14 17:24:10,672 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper_MergeMaxMapper-->501  
    5. 2014-05-14 17:24:10,673 INFO [main] chain.MergeMaxMapper: this is the end of MergeMaxMapper  
    6. 2014-05-14 17:24:10,673 INFO [main] chain.MaxReducer: This is the end of the MaxReducer  


    通过上面的log可以看出,通过ChainReducer添加mapper的方式,其数据处理顺序为:首先初始化Reducer之后的Mapper,接着初始化Reducer(看configure函数即可知道);然后处理reducer,reducer的输出接着交给mapper处理;最后先关闭Mapper,接着关闭reducer。

    同时,注意到,reducer后面的mapper也是两个的,即有多少个reducer,就有多少个mapper。

    通过实验得到上面的ChainReducer的数据处理流程,且ChainReducer没有addReducer的方法,也即是不能添加reducer了,那么最开始提出的mapreduce数据流程就不能采用这种方式实现了。

    最后,前面提出的mapreduce数据流程应该是错的,在reducer out里面C组数据不会被拆分为两个reducer,相同的key只会向同一个reducer传输。这里同样做了个试验,通过对接近90M的数据(只有一个分组A)执行上面的程序,可以看到有2个mapper,2个reducer(此数值为设置值),但是在其中一个reducer中并没有A分组的任何数据,在另外一个reducer中才有数据。其实,不用试验也是可以的,以前看的书上一般都会说相同的key进入同一个reducer中。不过,如果是这样的话,那么这样的数据效率应该不高。

    返回最开始提出的场景,最开始提出的问题,如果相同的key只会进入一个reducer中,那么最后的2个数据文件(2个reducer生成2个数据文件)其实里面不会有key冲突的数据,所以在进行后面的操作的时候可以直接读多个文件即可,就像是读一个文件一样。

    会产生这样的认知错误,应该是对mapreduce 原理不清楚导致。

    分享,成长,快乐

    转载请注明blog地址:http://blog.csdn.net/fansy1990

  • 相关阅读:
    c:forTokens标签循环输出
    jsp转long类型为date,并且格式化
    spring中@Param和mybatis中@Param使用区别(暂时还没接触)
    734. Sentence Similarity 有字典数组的相似句子
    246. Strobogrammatic Number 上下对称的数字
    720. Longest Word in Dictionary 能连续拼接出来的最长单词
    599. Minimum Index Sum of Two Lists两个餐厅列表的索引和最小
    594. Longest Harmonious Subsequence强制差距为1的最长连续
    645. Set Mismatch挑出不匹配的元素和应该真正存在的元素
    409. Longest Palindrome 最长对称串
  • 原文地址:https://www.cnblogs.com/luolizhi/p/4928629.html
Copyright © 2011-2022 走看看