zoukankan      html  css  js  c++  java
  • Hadoop的ChainMapper和ChainReducer使用案例(链式处理)(四)

      不多说,直接上干货!

         Hadoop的MR作业支持链式处理,类似在一个生产牛奶的流水线上,每一个阶段都有特定的任务要处理,比如提供牛奶盒,装入牛奶,封盒,打印出厂日期,等等,通过这样进一步的分工,从而提高了生产效率,那么在我们的Hadoop的MapReduce中也是如此,支持链式的处理方式,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,而这一点与Lucene和Solr中的Filter机制是非常类似的,Hadoop项目源自Lucene,自然也借鉴了一些Lucene中的处理方式。

        举个例子,比如处理文本中的一些禁用词,或者敏感词,等等,Hadoop里的链式操作,支持的形式类似正则Map+ Rrduce Map*,代表的意思是全局只能有一个唯一的Reduce,但是在Reduce的前后是可以存在无限多个Mapper来进行一些预处理或者善后工作的

    注意:

      1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。 

      2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。

    比如:

      Map1 -> Map2 -> Reducer -> Map3 -> Map4

      (不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)

         

      在编程的时候,我们可以借用源码提供给我们的程序!在此基础上进行修改和编写。

         

          比如我的源码本地目录如下:(找我的本地ChainMapper和ChainReducer案例

          D:SoftWarehadoop-2.2.0-srchadoop-mapreduce-projecthadoop-mapreduce-clienthadoop-mapreduce-client-coresrcmainjavaorgapachehadoopmapreducelibchain

    任务介绍:

      这个任务需要两步完成:

      1. 对一篇文章进行WordCount

      2. 统计出现次数超过5词的单词

    WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:

    Java代码

    1. package hadoop_in_action_exersice;  
    2.   
    3. import java.io.IOException;  
    4. import java.util.Iterator;  
    5. import java.util.StringTokenizer;  
    6.   
    7. import org.apache.hadoop.fs.FileSystem;  
    8. import org.apache.hadoop.fs.Path;  
    9. import org.apache.hadoop.io.IntWritable;  
    10. import org.apache.hadoop.io.LongWritable;  
    11. import org.apache.hadoop.io.Text;  
    12. import org.apache.hadoop.mapred.FileInputFormat;  
    13. import org.apache.hadoop.mapred.FileOutputFormat;  
    14. import org.apache.hadoop.mapred.JobClient;  
    15. import org.apache.hadoop.mapred.JobConf;  
    16. import org.apache.hadoop.mapred.MapReduceBase;  
    17. import org.apache.hadoop.mapred.Mapper;  
    18. import org.apache.hadoop.mapred.OutputCollector;  
    19. import org.apache.hadoop.mapred.Reducer;  
    20. import org.apache.hadoop.mapred.Reporter;  
    21. import org.apache.hadoop.mapred.TextInputFormat;  
    22. import org.apache.hadoop.mapred.TextOutputFormat;  
    23.   
    24. public class ChainedJobs {  
    25.   
    26.     public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {  
    27.   
    28.         private final static IntWritable one = new IntWritable(1);  
    29.         public static final int LOW_LIMIT = 5;  
    30.         @Override  
    31.         public void map(LongWritable key, Text value,  
    32.                 OutputCollector<Text, IntWritable> output, Reporter reporter)  
    33.                 throws IOException {  
    34.             String line = value.toString();  
    35.             StringTokenizer st = new StringTokenizer(line);  
    36.             while(st.hasMoreTokens())  
    37.                 output.collect(new Text(st.nextToken()), one);  
    38.               
    39.         }  
    40.           
    41.     }  
    42.       
    43.     public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {  
    44.   
    45.         @Override  
    46.         public void reduce(Text key, Iterator<IntWritable> values,  
    47.                 OutputCollector<Text, IntWritable> output, Reporter reporter)  
    48.                 throws IOException {  
    49.             int sum = 0;  
    50.             while(values.hasNext()) {  
    51.                 sum += values.next().get();  
    52.             }  
    53.             output.collect(key, new IntWritable(sum));  
    54.         }  
    55.           
    56.     }  
    57.       
    58.       
    59.     public static void main(String[] args) throws IOException {  
    60.           
    61.           
    62.         JobConf conf = new JobConf(ChainedJobs.class);  
    63.         conf.setJobName("wordcount");           //设置一个用户定义的job名称  
    64.         conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类  
    65.         conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类  
    66.         conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类  
    67.         conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类  
    68.         conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类  
    69.         conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类  
    70.         conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类  
    71.   
    72.         // Remove output folder before run job(s)  
    73.         FileSystem fs=FileSystem.get(conf);  
    74.         String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";  
    75.         Path op=new Path(outputPath);          
    76.         if (fs.exists(op)) {  
    77.             fs.delete(op, true);  
    78.             System.out.println("存在此输出路径,已删除!!!");  
    79.         }  
    80.           
    81.         FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));  
    82.         FileOutputFormat.setOutputPath(conf, new Path(outputPath));  
    83.         JobClient.runJob(conf);         //运行一个job  
    84.     }  
    85.       
    86. }  

         上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。

    为了方便理解,上面的输入的例子如下:

    Java代码

    1. accessed    3  
    2. accessible  4  
    3. accomplish  1  
    4. accounting  7  
    5. accurately  1  
    6. acquire 1  
    7. across  1  
    8. actual  1  
    9. actually    1  
    10. add 3  
    11. added   2  
    12. addition    1  
    13. additional  4  

        old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X 

    新的API会方便简洁很多

        下面是增加了一个Mapper 来过滤

    Java代码

    1. public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {  
    2.   
    3.     @Override  
    4.     public void map(Text key, IntWritable value,  
    5.             OutputCollector<Text, IntWritable> output, Reporter reporter)  
    6.             throws IOException {  
    7.           
    8.         if(value.get() >= LOW_LIMIT) {  
    9.             output.collect(key, value);  
    10.         }  
    11.           
    12.     }  
    13. }  

         这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出

    所以,目前为止,任务链如下:

    TokenizerMapper -> TokenizeReducer -> RangeFilterMapper 

        所以我们的main函数改成下面的样子:

    Java代码

    1. public static void main(String[] args) throws IOException {  
    2.       
    3.       
    4.     JobConf conf = new JobConf(ChainedJobs.class);  
    5.     conf.setJobName("wordcount");           //设置一个用户定义的job名称  
    6. //        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类  
    7. //        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类  
    8. //        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类  
    9. //        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类  
    10. //        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类  
    11. //        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类  
    12. //        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类  
    13.   
    14.     // Step1 : mapper forr word count   
    15.     JobConf wordCountMapper  = new JobConf(false);  
    16.     ChainMapper.addMapper(conf,   
    17.             TokenizeMapper.class,   
    18.             LongWritable.class,     // input key type   
    19.             Text.class,             // input value type  
    20.             Text.class,             // output key type  
    21.             IntWritable.class,      // output value type  
    22.             false,                  //byValue or byRefference 传值还是传引用  
    23.             wordCountMapper);  
    24.       
    25.     // Step2: reducer for word count  
    26.     JobConf wordCountReducer  = new JobConf(false);  
    27.     ChainReducer.setReducer(conf,   
    28.             TokenizeReducer.class,   
    29.             Text.class,   
    30.             IntWritable.class,   
    31.             Text.class,   
    32.             IntWritable.class,   
    33.             false,   
    34.             wordCountReducer);  
    35.       
    36.         // Step3: mapper used as filter  
    37.     JobConf rangeFilterMapper  = new JobConf(false);  
    38.     ChainReducer.addMapper(conf,   
    39.             RangeFilterMapper.class,   
    40.             Text.class,   
    41.             IntWritable.class,   
    42.             Text.class,   
    43.             IntWritable.class,   
    44.             false,   
    45.             rangeFilterMapper);  
    46.       
    47.       
    48.     // Remove output folder before run job(s)  
    49.     FileSystem fs=FileSystem.get(conf);  
    50.     String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";  
    51.     Path op=new Path(outputPath);          
    52.     if (fs.exists(op)) {  
    53.         fs.delete(op, true);  
    54.         System.out.println("存在此输出路径,已删除!!!");  
    55.     }  
    56.       
    57.     FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));  
    58.     FileOutputFormat.setOutputPath(conf, new Path(outputPath));  
    59.     JobClient.runJob(conf);         //运行一个job  
    60. }  

    下面是运行结果的一部分:

    Java代码

    1. a   40  
    2. and 26  
    3. are 12  
    4. as  6  
    5. be  7  
    6. been    8  
    7. but 5  
    8. by  5  
    9. can 12  
    10. change  5  
    11. data    5  
    12. files   7  
    13. for 28  
    14. from    5  
    15. has 7  
    16. have    8  
    17. if  6  
    18. in  27  
    19. is  16  
    20. it  13  
    21. more    8  
    22. not 5  
    23. of  23  
    24. on  5  
    25. outputs 5  
    26. see 6  
    27. so  11  
    28. that    11  
    29. the 54  

         可以看到,英文之中,如果NLP不去除停用词(a, the, for ...) 等,效果确实会被大大的影响。

  • 相关阅读:
    HTTP报文(转)
    批处理增加开机启动项(转)
    HTTP代理服务程序介绍(copy)
    MP3文件格式说明 (转)
    [sql] SQL Server判断对象是否存在
    MSSQL 链接远程数据库 读取并操作数据
    将无线网卡变成“无线路由器(无线AP)”
    :DOS命令大全(经典收藏)
    java 使用 poi 操纵 excel2003 经验总结
    log4j.properties的配置详解(根据网络资料整理)
  • 原文地址:https://www.cnblogs.com/zlslch/p/6295485.html
Copyright © 2011-2022 走看看