zoukankan      html  css  js  c++  java
  • 5.hadoop流原理、实例和新旧API下Wordcount详解

    前四篇文章讲了Hadoop的配置和测试以及eclipse下的使用,有兴趣的可以先看下。

    1.Hadoop流简介

    用可执行文件作为Mapper和Reducer,接受的都是标准输入,输出的都是标准输出。

    当一个可执行文件作为Mapper时,每一个Map任务会以一个独立的进程启动这个可执行文件,然后在Map任务运行时,会把输入切分成行提供给可 执行文件,并作为它的标准输入(stdin)内容。当可执行文件运行出结果时,Map从标准输出(stdout)中收集数据,并将其转化 为<key, value>对,作为Map的输出。

    Reduce与Map相同,如果可执行文件做Reducer时,Reduce任务会启动这个可执行文件,并且将<key, value>对转化为行作为这个可执行文件的标准输入(stdin)。然后Reduce会收集这个可执行文件的标准输出(stdout)的内容。并 把每一行转化为<key, value>对,作为Reduce的输出。

    Map与Reduce将输出转化为<key , value>对的默认方法是:将每行的第一个tab符号(制表符)之前的内容作为key,之后的内容作为value。如果没有tab符号,那么这一 行的所有内容会作为key,而value值为null。当然这是可以更改的。

    值得一提的是,可以使用Java类作为Map,而用一个可执行程序作为Reduce;或使用Java类作为Reduce,而用可执行程序作为Map。

    下面先看一个简单例子,用/bin/cat作Mapper,用/usr/bin/wc作Reducer

    /input下两个文件为:

    hello world bye world

    hello hadoop bye hadoop

     我在root账户/usr/local/hadoop/hadoop-2.2.0/bin目录(和安装路径有关)下运行此代码,可以统计文件中的行数,单词数和字节数。

    root@master:/usr/local/hadoop/hadoop-2.2.0/bin# hadoop jar /usr/local/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /input -output /output/stream -mapper /bin/cat -reducer /usr/bin/wc 

    运行结果为:  2  8  46

    wc命令用来统计文件中的行数、单词数与字节数,可以看到,这个结果是正确的。

    2.1 Hadoop流命令


    参数         可选/必选 参数         可选/必选
    -input      必选   -cmdenv       可选
    -output     必选   -inputreader      可选
    -mapper    必选   -verbose       可选
    -reducer    必选   -lazyOutput       可选
    -file       可选   -numReduce tasks 可选
    -inputformat  可选   -mapdebug      可选
    -outputformat 可选   -reducedebug     可选
    -partitioner    可选   -io           可选
    -combiner    可选 

    Hadoop流命令中,必选的4个很好理解,分别用于指定输入/输出文件的位置及Map/Reduce函数。在其他的可选命令中,这里我们只解释常用的几个。

    -file

    这个指令用于将文件加入到Hadoop的Job中。上面的例子中,cat和wc都是Linux系统中的命令,而在Hadoop流的使用中,往往需要使用自己写的文件(作为Map函数或Reduce函数)。一般而言,这些文件是Hadoop集群中的机器上没有的,这时就需要使用Hadoop流中的-file命令将这个可执行文件加入到Hadoop的Job中。

    -combiner

    这个命令用来加入combiner程序。

    -inputformat和-outputformat

    这两个命令用来设置输入输出文件的处理方法,这两个命令后面的参数必须是Java类。

    2.2  Hadoop流通用的命令选项

    Hadoop流的通用命令用来配置Hadoop流的Job。需要注意的是,如果使用这部分配置,就必须将其置于流命令配置之前,否则命令会失败。这里简要列出命令列表,供大家参考。

    Hadoop流的Job设置命令
    参数   可选/必选  参数   可选/必选
    -conf  可选   -files       可选
    -D      可选   -libjars    可选
    -fs     可选   -archives  可选
    -jt      可选

      从上面的内容可以知道,Hadoop流的API是一个扩展性非常强的框架,它与程序相连的部分只有数据,因此可以接受任何适用于UNIX标准输入/输出的脚本语言,比如Bash、PHP、Ruby、Python等。下面举两个非常简单的例子来进一步说明它的特性。(来源:《Hadoop实战》-陆嘉恒,中国人民大学)

    3. 1 Bash

    MapReduce框架是一个非常适合在大规模的非结构化数据中查找数据的编程模型,grep就是这种类型的一个例子。

    在Linux中,grep命令用来在一个或多个文件中查找某个字符模式(这个字符模式可以代表字符串,多用正则表达式表示)。

    下面尝试在如下的数据中查找带有Hadoop字符串的行,如下所示。

    输入文件为:
    file01:
    hello world bye world
    file02:
    hello hadoop bye hadoop

    reduce文件为:
    reduce.sh:
    grep hadoop

    把reduce.sh放到/hadoop/bin下

    输入命令为:

    root@master:/usr/local/hadoop/hadoop-2.2.0/bin# jar /usr/local/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /input -output /output -mapper /bin/cat -reducer reduce.sh -file reduce.sh

    结果为:
    hello hadoop bye hadoop

    显然,这个结果是正确的。

    3.2 Python

    对于Python来说,情况有些特殊。因为Python是可以编译为JAR包的,如果将程序编译为JAR包,那么就可以采用运行JAR包的方式来运行了。

    不过,同样也可以用流的方式运行Python程序。请看如下代码:
    Reduce.py

     1   #!/usr/bin/python
     2   
     3   import sys;
     4   
     5   def generateLongCountToken(id):
     6       return "LongValueSum:" + id + "\t" + "1"
     7   def main(argv):
     8      line = sys.stdin.readline();
     9      try:
    10         while line:
    11              line = line[:-1];
    12              fields = line.split("\t");
    13              print generateLongCountToken(fields[0]);
    14              line = sys.stdin.readline();
    15      except "end of file":
    16          return None
    17  if __name__ == "__main__":
    18      main(sys.argv)

    注意把reduce.py放到hadoop/bin下。

    使用如下命令来运行:

    root@master:/usr/local/hadoop/hadoop-2.2.0/bin# jar /usr/local/hadoop/hadoop-2.2.0/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -input /input -output /pyoutput -mapper reduce.py -reducer aggregate -file reduce.py

    注意其中的aggregate是Hadoop提供的一个包,它提供一个Reduce函数和一个combine函数。这个函数实现一些简单的类似求和、取最大值最小值等的功能。

    结果为

    hello hadoop bye hadoop    1
    hello world bye world    1

    4.Hadoop Pipes

    Hadoop Pipes提供了一个在Hadoop上运行C++程序的方法。与流不同的是,流使用的是标准输入输出作为可执行程序与Hadoop相关进程间通信的工具,而Pipes使用的是Sockets。先看一个示例程序wordcount.cpp:

     1 #include "hadoop/Pipes.hh"
     2 #include "hadoop/TemplateFactory.hh"
     3 #include "hadoop/StringUtils.hh"
     4 
     5 const std::string WORDCOUNT = "WORDCOUNT";
     6 const std::string INPUT_WORDS = "INPUT_WORDS";
     7 const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
     8 
     9 class WordCountMap: public HadoopPipes::Mapper {
    10 public:
    11   HadoopPipes::TaskContext::Counter* inputWords;
    12  
    13   WordCountMap(HadoopPipes::TaskContext& context) {
    14     inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
    15   }
    16  
    17   void map(HadoopPipes::MapContext& context) {
    18     std::vector<std::string> words =
    19       HadoopUtils::splitString(context.getInputValue(), " ");
    20     for(unsigned int i=0; i < words.size(); ++i) {
    21       context.emit(words[i], "1");
    22     }
    23     context.incrementCounter(inputWords, words.size());
    24   }
    25 };
    26 
    27 class WordCountReduce: public HadoopPipes::Reducer {
    28 public:
    29   HadoopPipes::TaskContext::Counter* outputWords;
    30 
    31 WordCountReduce(HadoopPipes::TaskContext& context) {
    32     outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
    33   }
    34 
    35 void reduce(HadoopPipes::ReduceContext& context) {
    36     int sum = 0;
    37     while (context.nextValue()) {
    38       sum += HadoopUtils::toInt(context.getInputValue());
    39     }
    40     context.emit(context.getInputKey(), HadoopUtils::toString(sum));
    41     context.incrementCounter(outputWords, 1);
    42   }
    43 };
    44 int main(int argc, char *argv[]) {
    45   return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, WordCountReduce>());
    46 }

    这个程序连接的是一个C++库,结构类似于Java编写的程序。如新版API一样,这个程序使用context方法读入和收集<key, value>对。在使用时要重写HadoopPipes名字空间下的Mapper和Reducer函数,并用context.emit()方法输 出<key, value>对。main函数是应用程序的入口,它调用HadoopPipes::runTask方法,这个方法由一个 TemplateFactory参数来创建Map和Reduce实例,也可以重载factory设置combiner()、partitioner()、 record reader、record writer。

    接下来,编译这个程序。这个编译命令需要用到g++,读者可以使用apt自动安装这个程序。g++的命令格式如下所示:

    apt-get install g++

    然后建立文件Makerfile,如下所示:

    HADOOP_INSTALL="你的hadoop安装文件夹"
    
    PLATFORM=Linux-i386-32(如果是AMD的CPU,请使用Linux-amd64-64)
    
    CC = g++
    CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
    
    wordcount: wordcount.cpp
    $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes -lhadooputils -lpthread -g -O2 -o $@

    注意在$(CC)前有一个<tab>符号,这个分隔符是很关键的。

    在当前目录下建立一个WordCount可执行文件。

    接着,上传可执行文件到HDFS上,这是为了TaskTracker能够获得这个可执行文件。这里上传到bin文件夹内。

    ~/hadoop/bin/hadoop fs –mkdir bin
    ~/hadoop/bin/hadoop dfs –put wordcount /bin

    然后,就可以运行这个MapReduce程序了,可以采用两种配置方式运行这个程序。一种方式是直接在命令中运行指定配置,如下所示:

    ~/hadoop/bin/hadoop pipes\
    -D hadoop.pipes.java.recordreader=true\
    -D hadoop.pipes.java.recordwriter=true\
    -input input\
    -output Coutput\
    -program /bin/wordcount

    另一种方式是预先将配置写入配置文件中,如下所示:

    <?xml version="1.0"?>
    <configuration>
      <property>
        // Set the binary path on DFS
        <name>hadoop.pipes.executable</name>
        <value>bin/wordcount</value>
      </property>
      <property>
        <name>hadoop.pipes.java.recordreader</name>
        <value>true</value>
      </property>
      <property>
        <name>hadoop.pipes.java.recordwriter</name>
        <value>true</value>
      </property>
    </configuration>

    然后通过如下命令运行这个程序:

    ~/hadoop/bin/hadoop pipes -conf word.xml -input /input -output /output

    将参数hadoop.pipes.executable和hadoop.pipes.java.recordreader设置为true表示使用 Hadoop默认的输入输出方式(即Java的)。同样的,也可以设置一个Java语言编写的Mapper函数、Reducer函数、combiner函 数和partitioner函数。实际上,在任何一个作业中,都可以混用Java类和C++类。

    5.1 旧API  WordCount分析

      1)源代码程序

    package org.apache.hadoop.examples;

    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.TextOutputFormat;

    public class WordCount {

        public static class Map extends MapReduceBase implements
                Mapper<LongWritable, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();

            public void map(LongWritable key, Text value,
                    OutputCollector<Text, IntWritable> output, Reporter reporter)
                    throws IOException {
                String line = value.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens()) {
                    word.set(tokenizer.nextToken());
                    output.collect(word, one);
                }
            }
        }

        public static class Reduce extends MapReduceBase implements
                Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterator<IntWritable> values,
                    OutputCollector<Text, IntWritable> output, Reporter reporter)
                    throws IOException {
                int sum = 0;
                while (values.hasNext()) {
                    sum += values.next().get();
                }
                output.collect(key, new IntWritable(sum));
            }
        }

        public static void main(String[] args) throws Exception {
            JobConf conf = new JobConf(WordCount.class);
            conf.setJobName("wordcount");

            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);

            conf.setMapperClass(Map.class);
            conf.setCombinerClass(Reduce.class);
            conf.setReducerClass(Reduce.class);

            conf.setInputFormat(TextInputFormat.class);
            conf.setOutputFormat(TextOutputFormat.class);

            FileInputFormat.setInputPaths(conf, new Path(args[0]));
            FileOutputFormat.setOutputPath(conf, new Path(args[1]));

            JobClient.runJob(conf);
        }
    }

      3)主方法Main分析

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }

      首先讲解一下Job初始化过程main函数调用Jobconf类来对MapReduce Job进行初始化,然后调用setJobName()方法命名这个Job。对Job进行合理的命名有助于更快地找到Job,以便在JobTracker和Tasktracker的页面中对其进行监视

    JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );

      接着设置Job输出结果<key,value>的中key和value数据类型,因为结果是<单词,个数>,所以 key设置为"Text"类型,相当于Java中String类型。Value设置为"IntWritable",相当于Java中的int类型。

    conf.setOutputKeyClass(Text.class );

    conf.setOutputValueClass(IntWritable.class );

      然后设置Job处理的Map(拆分)、Combiner(中间结果合并)以及Reduce(合并)的相关处理类。这里用Reduce类来进行Map产生的中间结果合并,避免给网络数据传输产生压力。

    conf.setMapperClass(Map.class );

    conf.setCombinerClass(Reduce.class );

    conf.setReducerClass(Reduce.class );

      接着就是调用setInputPath()和setOutputPath()设置输入输出路径

    conf.setInputFormat(TextInputFormat.class );

    conf.setOutputFormat(TextOutputFormat.class );

      (1)InputFormat和InputSplit

      InputSplit是Hadoop定义的用来传送给每个单独map数据,InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置数组生成InputSplit的方法可以通过InputFormat()设置

      当数据传送给map时,map会将输入分片传送到InputFormat,InputFormat则调用方法getRecordReader()生成RecordReader,RecordReader再通过creatKey()creatValue()方法创建可供map处理的<key,value>对。简而言之,InputFormat()方法是用来生成可供map处理的<key,value>对的。

      Hadoop预定义了多种方法将不同类型的输入数据转化为map能够处理的<key,value>对,它们都继承自InputFormat,分别是:

        InputFormat

            |

            |---BaileyBorweinPlouffe.BbpInputFormat

            |---ComposableInputFormat

            |---CompositeInputFormat

            |---DBInputFormat

            |---DistSum.Machine.AbstractInputFormat

            |---FileInputFormat

                |---CombineFileInputFormat

                |---KeyValueTextInputFormat

                |---NLineInputFormat

                |---SequenceFileInputFormat

                |---TeraInputFormat

                |---TextInputFormat

      其中TextInputFormat是Hadoop默认的输入方法,在TextInputFormat中,每个文件(或其一部分)都会单独地作为map的输入,而这个是继承自FileInputFormat的。之后,每行数据都会生成一条记录,每条记录则表示成<key,value>形式:

    • key值是每个数据的记录在数据分片中字节偏移量,数据类型是LongWritable;  

    value值是每行的内容,数据类型是Text。

      (2)OutputFormat

      每一种输入格式都有一种输出格式与其对应。默认的输出格式是TextOutputFormat,这种输出方式与输入类似,会将每条记录以一行的形式存入文本文件。不过,它的键和值可以是任意形式的,因为程序内容会调用toString()方法将键和值转换为String类型再输出。

      3)Map类中map方法分析

    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }

      Map类继承自MapReduceBase,并且它实现了Mapper接口,此接口是一个规范类型,它有4种形式的参数,分别用来指定map的输入key值类型、输入value值类型、输出key值类型和输出value 值类型。在本例中,因为使用的是TextInputFormat,它的输出key值是LongWritable类型,输出value值是Text类型,所 以map的输入类型为<LongWritable,Text>。在本例中需要输出<word,1>这样的形式,因此输出的key 值类型是Text,输出的value值类型是IntWritable。

      实现此接口类还需要实现map方法,map方法会具体负责对输入进行操作,在本例中,map方法对输入的行以空格为单位进行切分,然后使用OutputCollect收集输出的<word,1>。

      4)Reduce类中reduce方法分析

    public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }

      Reduce类也是继承自MapReduceBase的,需要实现Reducer接口。Reduce类以map的输出作为输入,因此Reduce的输入类型是<Text,Intwritable>。而Reduce的输出是单词它的数目,因此,它的输出类型是<Text,IntWritable>。Reduce类也要实现reduce方法,在此方法中,reduce函数将输入的key值作为输出的key值,然后将获得多个value值加起来,作为输出的值。

    5.2 新API WordCount分析

      1)源代码程序

    package org.apache.hadoop.examples;

    import java.io.IOException;

    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IntWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import org.apache.hadoop.util.GenericOptionsParser;

    public class WordCount {

      public static class TokenizerMapper

          extends Mapper<Object, Text, Text, IntWritable>{

          private final static IntWritable one = new IntWritable(1);

          private Text word = new Text();

          public void map(Object key, Text value, Context context)

            throws IOException, InterruptedException {

            StringTokenizer itr = new StringTokenizer(value.toString());

            while (itr.hasMoreTokens()) {

            word.set(itr.nextToken());

            context.write(word, one);

          }

        }

      }

      public static class IntSumReducer

          extends Reducer<Text,IntWritable,Text,IntWritable> {

          private IntWritable result = new IntWritable();

          public void reduce(Text key, Iterable<IntWritable> values,Context context)

               throws IOException, InterruptedException {

            int sum = 0;

            for (IntWritable val : values) {

               sum += val.get();

            }

          result.set(sum);

          context.write(key, result);

        }

      }

      public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length != 2) {

          System.err.println("Usage: wordcount <in> <out>");

          System.exit(2);

        }

        Job job = new Job(conf, "word count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(IntSumReducer.class);

        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    }

       1)Map过程

    public static class TokenizerMapper

      extends Mapper<Object, Text, Text, IntWritable>{

      private final static IntWritable one = new IntWritable(1);

      private Text word = new Text();

      public void map(Object key, Text value, Context context)

        throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString());

        while (itr.hasMoreTokens()) {

          word.set(itr.nextToken());

          context.write(word, one);

      }

    }

      Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其 map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回 车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并 将<word,1>作为map方法的结果输出,其余的工作都交有MapReduce框架处理。

      2)Reduce过程

    public static class IntSumReducer

      extends Reducer<Text,IntWritable,Text,IntWritable> {

      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values,Context context)

         throws IOException, InterruptedException {

        int sum = 0;

        for (IntWritable val : values) {

          sum += val.get();

        }

        result.set(sum);

        context.write(key, result);

      }

    }

      Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。

        3)执行MapReduce任务

    public static void main(String[] args) throws Exception {

      Configuration conf = new Configuration();

      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

      if (otherArgs.length != 2) {

        System.err.println("Usage: wordcount <in> <out>");

        System.exit(2);

      }

      Job job = new Job(conf, "word count");

      job.setJarByClass(WordCount.class);

      job.setMapperClass(TokenizerMapper.class);

      job.setCombinerClass(IntSumReducer.class);

      job.setReducerClass(IntSumReducer.class);

      job.setOutputKeyClass(Text.class);

      job.setOutputValueClass(IntWritable.class);

      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

      FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

      System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

      在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用 TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。还设置了Map 过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。

    5.3 新旧API区别

      Hadoop最新版本的MapReduce Release 0.20.0的API包括了一个全新的Mapreduce JAVA API,有时候也称为上下文对象。

      新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用 。

      新的API和旧的API之间有下面几个明显的区别。

    • 新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
    • 新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。
    • 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
    • 新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。
    • 新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置, 这是一个对于Hadoop通常的Configuration对象的扩展。在新的API中,这种区别没有了,所以作业配置通过Configuration来 完成。作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。

     (最后的部分参考http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html)

  • 相关阅读:
    base64编码是什么1
    base64编码是什么
    算法
    spring 的 ApplicationContext.getBean(type) 无法获取bean,报错
    多个tomcat shutdown.sh 导致无法正常关闭的问题
    springboot server.address 配置问题
    jsp页面错误的全局处理
    ASP.NET MVC三层关系
    ASP.NET指定页面转PDF、JPG(插件)
    Java理论知识及面试题
  • 原文地址:https://www.cnblogs.com/dmyu/p/4657405.html
Copyright © 2011-2022 走看看