zoukankan      html  css  js  c++  java
  • Hadoop日记Day15---MapReduce新旧api的比较

      我使用hadoop的是hadoop1.1.2,而很多公司也在使用hadoop0.2x版本,因此市面上的hadoop资料版本不一,为了扩充自己的知识面,MapReduce的新旧api进行了比较研究。
      hadoop版本1.x的包一般是mapreduce
      hadoop版本0.x的包一般是mapred
    我们还是以单词统计为例进行研究,代码如下,如代码1.1所示:
    package old;
    
    import java.io.IOException;
    import java.net.URI;
    import java.util.Iterator;
    
    import mapreduce.WordCountApp;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    /**
     * hadoop版本1.x的包一般是mapreduce
     * hadoop版本0.x的包一般是mapred
     *
     */
    public class OldAPP {
        static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
        static final String OUT_PATH = "hdfs://hadoop:9000/out";
        /**
         * 改动:
         * 1.不再使用Job,而是使用JobConf
         * 2.类的包名不再使用mapreduce,而是使用mapred
         * 3.不再使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(job);
         * 
         */
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
            final Path outPath = new Path(OUT_PATH);
            if(fileSystem.exists(outPath)){
                fileSystem.delete(outPath, true);
            }
            
            final JobConf job = new JobConf(conf , WordCountApp.class);
            //1.1指定读取的文件位于哪里
            FileInputFormat.setInputPaths(job, INPUT_PATH);
            //指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
            //job.setInputFormatClass(TextInputFormat.class);
            
            //1.2 指定自定义的map类
            job.setMapperClass(MyMapper.class);
            //map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
            //job.setMapOutputKeyClass(Text.class);
            //job.setMapOutputValueClass(LongWritable.class);
            
            //1.3 分区
            //job.setPartitionerClass(HashPartitioner.class);
            //有一个reduce任务运行
            //job.setNumReduceTasks(1);
            
            //1.4 TODO 排序、分组
            
            //1.5 TODO 规约
            
            //2.2 指定自定义reduce类
            job.setReducerClass(MyReducer.class);
            //指定reduce的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            
            //2.3 指定写出到哪里
            FileOutputFormat.setOutputPath(job, outPath);
            //指定输出文件的格式化类
            //job.setOutputFormatClass(TextOutputFormat.class);
            
            //把job提交给JobTracker运行
            JobClient.runJob(job);
        }
    
        
        
        /**
         * 新api:extends Mapper
         * 老api:extends MapRedcueBase implements Mapper
         */
        static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{
            @Override
            public void map(LongWritable k1, Text v1,
                    OutputCollector<Text, LongWritable> collector, Reporter reporter)
                    throws IOException {
                final String[] splited = v1.toString().split("	");
                for (String word : splited) {
                    collector.collect(new Text(word), new LongWritable(1));
                }
            }
        }
        
        static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{
            @Override
            public void reduce(Text k2, Iterator<LongWritable> v2s,
                    OutputCollector<Text, LongWritable> collector, Reporter reporter)
                    throws IOException {
                long times = 0L;
                while (v2s.hasNext()) {
                    final long temp = v2s.next().get();
                    times += temp;
                }
                collector.collect(k2, new LongWritable(times));
            }
        }
    }
    View Code

    代码 1.1

    一、自定义Mapper类的不同

      在新api中,是继承类org.apache.hadoop.mapreduce.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>。在旧api中,是继承类org.apache.hadoop.mapred.MapReduceBase,然后实现接口 org.apache.hadoop.mapred.Mapper<K1, V1, K2, V2>。在新api中,覆盖的map方法的第三个参数是Context类;在旧api中,覆盖的map方法的第三、四个形参分别是OutputCollectorReporter类。在新api的Context中已经把两个类的功能合并到一起了,用户操作更简单。使用旧api的自定义Mapper类,如代码1.2所示所示。key、value对。每一个键值对调用一次map函数。

     1 /**
     2      * 新api:extends Mapper
     3      * 老api:extends MapRedcueBase implements Mapper
     4      */
     5     static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{
     6         @Override
     7         public void map(LongWritable k1, Text v1,
     8                 OutputCollector<Text, LongWritable> collector, Reporter reporter)
     9                 throws IOException {
    10             final String[] splited = v1.toString().split("	");
    11             for (String word : splited) {
    12                 collector.collect(new Text(word), new LongWritable(1));
    13             }
    14         }
    15     }
    View Code

    代码 1.2

    二、自定义Reducer类的不同

      在新api中,是继承类org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>。在旧api中,是继承类org.apache.hadoop.mapred.MapReduceBase,然后实现接口 org.apache.hadoop.mapred. Reducer<K1, V1, K2, V2>。在新api中覆盖的reduce方法的第二个参数是java.lang.Iterable<VALUEIN>。在旧api中,覆盖的 reduce方法的第二个参数是java.util.Iterator<V 2>。前者可以使用增强for循环进行处理,后者只能使用 while循环处理了。在新api中,覆盖的reduce方法的第三个参数是Context类;在旧api中,覆盖的reduce方法的第三、四个形参分别是OutputCollectorReporter类。在新api的Context中已经把两个类的功能合并到一起了,用户操作更简单。使用旧api的自定义Reducer类,代码如2.1所示。

     1 static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{
     2         @Override
     3         public void reduce(Text k2, Iterator<LongWritable> v2s,
     4                 OutputCollector<Text, LongWritable> collector, Reporter reporter)
     5                 throws IOException {
     6             long times = 0L;
     7             while (v2s.hasNext()) {
     8                 final long temp = v2s.next().get();
     9                 times += temp;
    10             }
    11             collector.collect(k2, new LongWritable(times));
    12         }
    13     }
    View Code

    代码 2.1

    三、 驱动代码main方法的不同

      在新api中驱动代码主要是通过org.apache.hadoop.mapreduce.Job类实现的,通过该类管理各种配置,然后调用waitForCompleti on(boolean)方法把代码提交给JobTracker执行。在旧api中驱动代码主要是通过 org.apache.hadoop.mapred.JobConf.JobConf(Con figuration, Class)类实现的,通过该类管理各种配置。对于job的提交,是通过org.apache.hadoop.mapred.JobClient类的 runJob(JobC onf)方法实现的。可见,新api中把JobConfJobClient的功能进行了合并,用户调用更方便。

      其中,JobConf类与Job类的方法名称几乎一致,只是传递的形参类型大不相同了。在新api中的Job类,要求setXXX(…)的形参必须是org .apache.hadoop.mapreduce及其子包下面的类;而旧api中的JobConf类,要求setXXX(…)的形参必须是 org.apache.hadoop.mapred及其子包下面的类。使用旧api的驱动代码main方法,如代码3.1所示。

     1 package old;
     2 
     3 import java.io.IOException;
     4 import java.net.URI;
     5 import java.util.Iterator;
     6 
     7 import mapreduce.WordCountApp;
     8 
     9 import org.apache.hadoop.conf.Configuration;
    10 import org.apache.hadoop.fs.FileSystem;
    11 import org.apache.hadoop.fs.Path;
    12 import org.apache.hadoop.io.LongWritable;
    13 import org.apache.hadoop.io.Text;
    14 import org.apache.hadoop.mapred.FileInputFormat;
    15 import org.apache.hadoop.mapred.FileOutputFormat;
    16 import org.apache.hadoop.mapred.JobClient;
    17 import org.apache.hadoop.mapred.JobConf;
    18 import org.apache.hadoop.mapred.MapReduceBase;
    19 import org.apache.hadoop.mapred.Mapper;
    20 import org.apache.hadoop.mapred.OutputCollector;
    21 import org.apache.hadoop.mapred.Reducer;
    22 import org.apache.hadoop.mapred.Reporter;
    23 import org.apache.hadoop.mapred.TextInputFormat;
    24 import org.apache.hadoop.mapred.TextOutputFormat;
    25 import org.apache.hadoop.mapred.lib.HashPartitioner;
    26 /**
    27  * hadoop版本1.x的包一般是mapreduce
    28  * hadoop版本0.x的包一般是mapred
    29  *
    30  */
    31 public class OldAPP {
    32     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
    33     static final String OUT_PATH = "hdfs://hadoop:9000/out";
    34     /**
    35      * 改动:
    36      * 1.不再使用Job,而是使用JobConf
    37      * 2.类的包名不再使用mapreduce,而是使用mapred
    38      * 3.不再使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(job);
    39      * 
    40      */
    41     public static void main(String[] args) throws Exception {
    42         
    43         Configuration conf = new Configuration();
    44         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
    45         final Path outPath = new Path(OUT_PATH);
    46         if(fileSystem.exists(outPath)){
    47             fileSystem.delete(outPath, true);
    48         }
    49         
    50         final JobConf job = new JobConf(conf , WordCountApp.class);
    51         
    52         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里
    53         job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类
    54         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
    55         job.setMapOutputValueClass(LongWritable.class);
    56         job.setPartitionerClass(HashPartitioner.class);//1.3 分区
    57         job.setNumReduceTasks(1);//有一个reduce任务运行
    58         job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类
    59         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
    60         job.setOutputValueClass(LongWritable.class);
    61         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定写出到哪里
    62         JobClient.runJob(job);//把job提交给JobTracker运行
    63     }
    64 
    65     
    66     
    67     /**
    68      * 新api:extends Mapper
    69      * 老api:extends MapRedcueBase implements Mapper
    70      */
    71     static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{
    72         @Override
    73         public void map(LongWritable k1, Text v1,
    74                 OutputCollector<Text, LongWritable> collector, Reporter reporter)
    75                 throws IOException {
    76             final String[] splited = v1.toString().split("	");
    77             for (String word : splited) {
    78                 collector.collect(new Text(word), new LongWritable(1));
    79             }
    80         }
    81     }
    82     
    83     static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{
    84         @Override
    85         public void reduce(Text k2, Iterator<LongWritable> v2s,
    86                 OutputCollector<Text, LongWritable> collector, Reporter reporter)
    87                 throws IOException {
    88             long times = 0L;
    89             while (v2s.hasNext()) {
    90                 final long temp = v2s.next().get();
    91                 times += temp;
    92             }
    93             collector.collect(k2, new LongWritable(times));
    94         }
    95     }
    96 }
    View Code

    代码 3.1

  • 相关阅读:
    卫星天线基础知识点
    Compling_Windows_Server_2003
    抛物线天线的结构组成
    UHP Command List
    C波段卫星天线秒变Ku波段卫星天线
    FineUIPro/Mvc/Core v7.0.0 正式发布了!
    【新特性速递】表格行分组(续)
    【新特性速递】表格行分组(EnableRowGroup)
    【新特性速递】树表格复选框与级联选择(TreeCheckBox,TreeCascadeCheck)
    【新特性速递】平铺数字输入框的触发图标(NumberBoxTriggerType)
  • 原文地址:https://www.cnblogs.com/sunddenly/p/3997836.html
Copyright © 2011-2022 走看看