zoukankan      html  css  js  c++  java
  • 027_编写MapReduce的模板类Mapper、Reducer和Driver

    模板类编写好后写MapReduce程序,的模板类编写好以后只需要改参数就行了,代码如下:

      1 package org.dragon.hadoop.mr.module;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.conf.Configured;
      7 import org.apache.hadoop.fs.Path;
      8 import org.apache.hadoop.io.LongWritable;
      9 import org.apache.hadoop.io.Text;
     10 import org.apache.hadoop.mapreduce.Job;
     11 import org.apache.hadoop.mapreduce.Mapper;
     12 import org.apache.hadoop.mapreduce.Reducer;
     13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     17 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
     18 import org.apache.hadoop.util.Tool;
     19 import org.apache.hadoop.util.ToolRunner;
     20 
     21 /**
     22  * 
     23  * ########################################### 
     24  * ############     MapReduce 模板类        ########## 
     25  * ###########################################
     26  * 
     27  * @author ZhuXY
     28  * @time 2016-3-13 下午10:21:06
     29  * 
     30  */
     31 public class ModuleMapReduce extends Configured implements Tool {
     32 
     33     /**
     34      * Mapper Class
     35      */
     36     public static class ModuleMapper extends
     37             Mapper<LongWritable, Text, LongWritable, Text> {
     38 
     39         @Override
     40         protected void setup(Context context) throws IOException,
     41                 InterruptedException {
     42             super.setup(context);
     43         }
     44 
     45         @Override
     46         protected void map(LongWritable key, Text value, Context context)
     47                 throws IOException, InterruptedException {
     48             super.map(key, value, context);
     49         }
     50 
     51         @Override
     52         protected void cleanup(Context context) throws IOException,
     53                 InterruptedException {
     54             super.cleanup(context);
     55         }
     56     }
     57 
     58     /**
     59      * Reducer Class
     60      */
     61     public static class ModuleReducer extends
     62             Reducer<LongWritable, Text, LongWritable, Text> {
     63 
     64         @Override
     65         protected void setup(Context context) throws IOException,
     66                 InterruptedException {
     67             // TODO Auto-generated method stub
     68             super.setup(context);
     69         }
     70 
     71         @Override
     72         protected void reduce(LongWritable key, Iterable<Text> values,
     73                 Context context) throws IOException, InterruptedException {
     74             // TODO Auto-generated method stub
     75             super.reduce(key, values, context);
     76         }
     77 
     78         @Override
     79         protected void cleanup(Context context) throws IOException,
     80                 InterruptedException {
     81             // TODO Auto-generated method stub
     82             super.cleanup(context);
     83         }
     84 
     85     }
     86 
     87     /**
     88      * Driver Class
     89      */
     90 
     91     //    专门抽取一个方法出来用于设置
     92     public Job parseInputAndOutput(Tool tool,Configuration conf,String[] args) throws IOException
     93     {
     94         if (args.length!=2) {
     95             System.err.printf("Usage:%s [generic options] <input> <output>
    ", tool.getClass().getSimpleName());
     96             ToolRunner.printGenericCommandUsage(System.err);
     97             return null;
     98         }
     99         
    100         //创建job,并设置配置信息和job名称
    101         Job job=new Job(conf, ModuleMapReduce.class.getSimpleName());
    102         
    103         //设置job的运行类
    104         // step 3:set job
    105         // 1) set run jar class
    106         job.setJarByClass(tool.getClass());
    107         
    108         // 14) job output path
    109         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    110         
    111         return job;
    112     }
    113     
    114     @Override
    115     public int run(String[] args) throws Exception {
    116         
    117         // step 1:get conf
    118         Configuration conf = new Configuration();
    119 
    120         // step 2:create job
    121         Job job = parseInputAndOutput(this, conf, args);
    122 
    123 
    124         // 2) set input format
    125         job.setInputFormatClass(TextInputFormat.class); // 可省
    126 
    127         // 3) set input path
    128         FileInputFormat.addInputPath(job, new Path(args[0]));
    129 
    130         // 4) set mapper class
    131         job.setMapperClass(ModuleMapper.class); // 可省
    132 
    133         // 5)set map input key/value class
    134         job.setMapOutputKeyClass(LongWritable.class); // 可省
    135         job.setMapOutputValueClass(Text.class); // 可省
    136 
    137         // 6) set partitioner class
    138         job.setPartitionerClass(HashPartitioner.class); // 可省
    139 
    140         // 7) set reducer number
    141         job.setNumReduceTasks(1);// default 1 //可省
    142         
    143         // 8)set sort comparator class
    144         //job.setSortComparatorClass(LongWritable.Comparator.class); // 可省
    145 
    146         // 9) set group comparator class
    147         //job.setGroupingComparatorClass(LongWritable.Comparator.class); // 可省
    148 
    149         // 10) set combiner class
    150         // job.setCombinerClass(null);默认是null,但是此处不能写 //可省
    151 
    152         // 11) set reducer class
    153         job.setReducerClass(ModuleReducer.class); // 可省
    154 
    155         // 12) set output format
    156         job.setOutputFormatClass(TextOutputFormat.class); // 可省
    157 
    158         // 13) job output key/value class
    159         job.setOutputKeyClass(LongWritable.class); // 可省
    160         job.setOutputValueClass(Text.class); // 可省
    161 
    162 
    163         // step 4: submit job
    164         boolean isSuccess = job.waitForCompletion(true);
    165 
    166         // step 5: return status
    167         return isSuccess ? 0 : 1;
    168     }
    169 
    170     public static void main(String[] args) throws Exception {
    171         
    172         args = new String[] {
    173                 "hdfs://hadoop-master.dragon.org:9000/wc/mininput/",
    174                 "hdfs://hadoop-master.dragon.org:9000/wc/minoutput" 
    175                 };
    176 
    177         //run mapreduce
    178         int status=ToolRunner.run(new ModuleMapReduce(), args);
    179         
    180         //exit
    181         System.exit(status);
    182     }
    183 }
    View Module Code

    模板使用步骤:

    1) 改名称(MapReduce类的名称、Mapper类的名称、Reducer类的名称)

    2) 依据实际的业务逻辑修改Mapper类和Reducer类的Key/Value输入输出参数的类型

    3) 修改驱动Driver部分的Job的参数设置(Mapper类和Reducer类的输出类型)

    4) 在Mapper类中编写实际的业务逻辑(setup()、map()、cleanup())

    5) 在Reducer类中编写实际的业务逻辑(setup()、map()、cleanup())

    6) 检查并修改驱动Driver代码(模板类中的run()方法)

    7) 设置输入输出路径,进行MR测试。

    使用ModuleMapReduce编写wordcount程序

      1 package org.dragon.hadoop.mr.module;
      2 
      3 import java.io.IOException;
      4 import java.util.StringTokenizer;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.conf.Configured;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.LongWritable;
     10 import org.apache.hadoop.io.Text;
     11 import org.apache.hadoop.mapreduce.Job;
     12 import org.apache.hadoop.mapreduce.Mapper;
     13 import org.apache.hadoop.mapreduce.Reducer;
     14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
     19 import org.apache.hadoop.util.Tool;
     20 import org.apache.hadoop.util.ToolRunner;
     21 
     22 /**
     23  * 
     24  * ########################################### 
     25  * ############        MapReduce 模板类     ########## 
     26  * ###########################################
     27  * 
     28  * @author ZhuXY
     29  * @time 2016-3-13 下午10:21:06
     30  * 
     31  */
     32 public class WordcountByModuleMapReduce extends Configured implements Tool {
     33 
     34     /**
     35      * Mapper Class
     36      */
     37     public static class WordcountMapper extends
     38             Mapper<LongWritable, Text, Text, LongWritable> {
     39 
     40         @Override
     41         protected void setup(Context context) throws IOException,
     42                 InterruptedException {
     43             super.setup(context);
     44         }
     45 
     46         private Text word = new Text();
     47         private final static LongWritable one = new LongWritable(1);
     48 
     49         @Override
     50         protected void map(LongWritable key, Text value, Context context)
     51                 throws IOException, InterruptedException {
     52 
     53             // 获取每行数据的值
     54             String lineValue = value.toString();
     55 
     56             // 进行分割
     57             StringTokenizer stringTokenizer = new StringTokenizer(lineValue);
     58 
     59             // 遍历
     60             while (stringTokenizer.hasMoreElements()) {
     61 
     62                 // 获取每个值
     63                 String worldValue = stringTokenizer.nextToken();
     64 
     65                 // 设置map, 输入的key值
     66                 word.set(worldValue);
     67                 context.write(word, one); // 如果出现就出现一次,存在每行出现几次,这时候键的值一样,多个键值对
     68             }
     69         }
     70 
     71         @Override
     72         protected void cleanup(Context context) throws IOException,
     73                 InterruptedException {
     74             super.cleanup(context);
     75         }
     76     }
     77 
     78     /**
     79      * Reducer Class
     80      */
     81     public static class WordcountReducer extends
     82             Reducer<Text, LongWritable, Text, LongWritable> {
     83         private LongWritable resultLongWritable = new LongWritable();
     84 
     85         @Override
     86         protected void setup(Context context) throws IOException,
     87                 InterruptedException {
     88             // TODO Auto-generated method stub
     89             super.setup(context);
     90         }
     91 
     92         @Override
     93         protected void reduce(Text key, Iterable<LongWritable> values,
     94                 Context context) throws IOException, InterruptedException {
     95             int sum = 0;
     96             // 循环遍历Interable
     97             for (LongWritable value : values) {
     98                 // 累加
     99                 sum += value.get();
    100             }
    101 
    102             // 设置总次数
    103             resultLongWritable.set(sum);
    104             context.write(key, resultLongWritable);
    105         }
    106 
    107         @Override
    108         protected void cleanup(Context context) throws IOException,
    109                 InterruptedException {
    110             // TODO Auto-generated method stub
    111             super.cleanup(context);
    112         }
    113 
    114     }
    115 
    116     /**
    117      * Driver Class
    118      */
    119 
    120     // 专门抽取一个方法出来用于设置
    121     public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)
    122             throws IOException {
    123         if (args.length != 2) {
    124             System.err.printf("Usage:%s [generic options] <input> <output>
    ",
    125                     tool.getClass().getSimpleName());
    126             ToolRunner.printGenericCommandUsage(System.err);
    127             return null;
    128         }
    129 
    130         // 创建job,并设置配置信息和job名称
    131         Job job = new Job(conf,
    132                 WordcountByModuleMapReduce.class.getSimpleName());
    133 
    134         // 设置job的运行类
    135         // step 3:set job
    136         // 1) set run jar class
    137         job.setJarByClass(tool.getClass());
    138 
    139         // 14) job output path
    140         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    141 
    142         return job;
    143     }
    144 
    145     @Override
    146     public int run(String[] args) throws Exception {
    147 
    148         // step 1:get conf
    149         Configuration conf = new Configuration();
    150 
    151         // step 2:create job
    152         Job job = parseInputAndOutput(this, conf, args);
    153 
    154         // 2) set input format
    155         job.setInputFormatClass(TextInputFormat.class); // 可省
    156 
    157         // 3) set input path
    158         FileInputFormat.addInputPath(job, new Path(args[0]));
    159 
    160         // 4) set mapper class
    161         job.setMapperClass(WordcountMapper.class); // 可省
    162 
    163         // 5)set map input key/value class
    164         job.setMapOutputKeyClass(Text.class); // 可省
    165         job.setMapOutputValueClass(LongWritable.class); // 可省
    166 
    167         // 6) set partitioner class
    168         job.setPartitionerClass(HashPartitioner.class); // 可省
    169 
    170         // 7) set reducer number
    171         job.setNumReduceTasks(1);// default 1 //可省
    172 
    173         // 8)set sort comparator class
    174         // job.setSortComparatorClass(LongWritable.Comparator.class); // 可省
    175 
    176         // 9) set group comparator class
    177         // job.setGroupingComparatorClass(LongWritable.Comparator.class); // 可省
    178 
    179         // 10) set combiner class
    180         // job.setCombinerClass(null);默认是null,但是此处不能写 //可省
    181 
    182         // 11) set reducer class
    183         job.setReducerClass(WordcountReducer.class); // 可省
    184 
    185         // 12) set output format
    186         job.setOutputFormatClass(TextOutputFormat.class); // 可省
    187 
    188         // 13) job output key/value class
    189         job.setOutputKeyClass(Text.class); // 可省
    190         job.setOutputValueClass(LongWritable.class); // 可省
    191 
    192         // step 4: submit job
    193         boolean isSuccess = job.waitForCompletion(true);
    194 
    195         // step 5: return status
    196         return isSuccess ? 0 : 1;
    197     }
    198 
    199     public static void main(String[] args) throws Exception {
    200 
    201         args = new String[] {
    202                 "hdfs://hadoop-master.dragon.org:9000/wc/mininput/",
    203                 "hdfs://hadoop-master.dragon.org:9000/wc/minoutput" };
    204 
    205         // run mapreduce
    206         int status = ToolRunner.run(new WordcountByModuleMapReduce(), args);
    207 
    208         // exit
    209         System.exit(status);
    210     }
    211 }
    View WordcountByModuleMapReduce Code
  • 相关阅读:
    Notepad++语言格式设置,自定义扩展名关联文件格式
    Windows使用SSH Secure Shell实现免密码登录CentOS
    如何从jks文件中导出公私钥
    tomcat运行监控脚本,自动启动
    2016年统计用区划代码和城乡划分代码(截止2016年07月31日)
    jquery动态出操作select
    阿里负载均衡的一个坑~~~备忘
    神奇的空格
    centos安装tomcat7.0.70
    驱动相关的内核函数分析
  • 原文地址:https://www.cnblogs.com/xiangyangzhu/p/5281129.html
Copyright © 2011-2022 走看看