zoukankan      html  css  js  c++  java
  • Mapper 与 Reducer 解析

    1 . 旧版 API 的 Mapper/Reducer 解析

    Mapper/Reducer 中封装了应用程序的数据处理逻辑。为了简化接口,MapReduce 要求所有存储在底层分布式文件系统上的数据均要解释成 key/value 的形式,并交给Mapper/Reducer 中的 map/reduce 函数处理,产生另外一些 key/value。Mapper 与 Reducer 的类体系非常类似,我们以 Mapper 为例进行讲解。Mapper 的类图如图所示,包括初始化、Map操作和清理三部分。

    (1)初始化
    Mapper 继承了 JobConfigurable 接口。该接口中的 configure 方法允许通过 JobConf 参数对 Mapper 进行初始化。

    (2)Map 操作
    MapReduce 框架会通过 InputFormat 中 RecordReader 从 InputSplit 获取一个个 key/value 对, 并交给下面的 map() 函数处理:

    void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException;

    该函数的参数除了 key 和 value 之外, 还包括 OutputCollector 和 Reporter 两个类型的参数, 分别用于输出结果和修改 Counter 值。

    (3)清理
    Mapper 通过继承 Closeable 接口(它又继承了 Java IO 中的 Closeable 接口)获得 close方法,用户可通过实现该方法对 Mapper 进行清理。
    MapReduce 提供了很多 Mapper/Reducer 实现,但大部分功能比较简单,具体如图所示。它们对应的功能分别是:

    ❑ChainMapper/ChainReducer:用于支持链式作业。

    ❑IdentityMapper/IdentityReducer:对于输入 key/value 不进行任何处理, 直接输出。

    ❑InvertMapper:交换 key/value 位置。

    ❑ RegexMapper:正则表达式字符串匹配。

    ❑TokenMapper:将字符串分割成若干个 token(单词),可用作 WordCount 的 Mapper。

    ❑LongSumReducer:以 key 为组,对 long 类型的 value 求累加和。

    对于一个 MapReduce 应用程序,不一定非要存在 Mapper。MapReduce 框架提供了比 Mapper 更通用的接口:MapRunnable,如图所示。用 户可以实现该接口以定制Mapper 的调用 方式或者自己实现 key/value 的处理逻辑,比如,Hadoop Pipes 自行实现了MapRunnable,直接将数据通过 Socket 发送给其他进程处理。提供该接口的另外一个好处是允许用户实现多线程 Mapper。

    如图所示, MapReduce 提供了两个 MapRunnable 实现,分别是 MapRunner 和MultithreadedMapRunner,其中 MapRunner 为默认实现。 MultithreadedMapRunner 实现了一种多线程的 MapRunnable。 默认情况下,每个 Mapper 启动 10 个线程,通常用于非 CPU类型的作业以提供吞吐率。

    2. 新版 API 的 Mapper/Reducer 解析

    从图可知, 新 API 在旧 API 基础上发生了以下几个变化:

    ❑Mapper 由接口变为类,且不再继承 JobConfigurable 和 Closeable 两个接口,而是直接在类中添加了 setup 和 cleanup 两个方法进行初始化和清理工作。

    ❑将参数封装到 Context 对象中,这使得接口具有良好的扩展性。

    ❑去掉 MapRunnable 接口,在 Mapper 中添加 run 方法,以方便用户定制 map() 函数的调用方法,run 默认实现与旧版本中 MapRunner 的 run 实现一样。

    ❑新 API 中 Reducer 遍历 value 的迭代器类型变为 java.lang.Iterable,使得用户可以采用“ foreach” 形式遍历所有 value,如下所示:

    void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
        for(VALUEIN value: values)  { // 注意遍历方式
            context.write((KEYOUT) key, (VALUEOUT) value);
        }
    }

    Mapper类的完整代码如下:

    package org.apache.hadoop.mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.compress.CompressionCodec;
    /** 
     * Maps input key/value pairs to a set of intermediate key/value pairs.  
     * 
     * <p>Maps are the individual tasks which transform input records into a 
     * intermediate records. The transformed intermediate records need not be of 
     * the same type as the input records. A given input pair may map to zero or 
     * many output pairs.</p> 
     * 
     * <p>The Hadoop Map-Reduce framework spawns one map task for each 
     * {@link InputSplit} generated by the {@link InputFormat} for the job.
     * <code>Mapper</code> implementations can access the {@link Configuration} for 
     * the job via the {@link JobContext#getConfiguration()}.
     * 
     * <p>The framework first calls 
     * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
     * {@link #map(Object, Object, Context)} 
     * for each key/value pair in the <code>InputSplit</code>. Finally 
     * {@link #cleanup(Context)} is called.</p>
     * 
     * <p>All intermediate values associated with a given output key are 
     * subsequently grouped by the framework, and passed to a {@link Reducer} to  
     * determine the final output. Users can control the sorting and grouping by 
     * specifying two key {@link RawComparator} classes.</p>
     *
     * <p>The <code>Mapper</code> outputs are partitioned per 
     * <code>Reducer</code>. Users can control which keys (and hence records) go to 
     * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
     * 
     * <p>Users can optionally specify a <code>combiner</code>, via 
     * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
     * intermediate outputs, which helps to cut down the amount of data transferred 
     * from the <code>Mapper</code> to the <code>Reducer</code>.
     * 
     * <p>Applications can specify if and how the intermediate
     * outputs are to be compressed and which {@link CompressionCodec}s are to be
     * used via the <code>Configuration</code>.</p>
     *  
     * <p>If the job has zero
     * reduces then the output of the <code>Mapper</code> is directly written
     * to the {@link OutputFormat} without sorting by keys.</p>
     * 
     * <p>Example:</p>
     * <p><blockquote><pre>
     * public class TokenCounterMapper 
     *     extends Mapper<Object, Text, Text, IntWritable>{
     *    
     *   private final static IntWritable one = new IntWritable(1);
     *   private Text word = new Text();
     *   
     *   public void map(Object key, Text value, Context context) throws IOException {
     *     StringTokenizer itr = new StringTokenizer(value.toString());
     *     while (itr.hasMoreTokens()) {
     *       word.set(itr.nextToken());
     *       context.collect(word, one);
     *     }
     *   }
     * }
     * </pre></blockquote></p>
     *
     * <p>Applications may override the {@link #run(Context)} method to exert 
     * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
     * etc.</p>
     * 
     * @see InputFormat
     * @see JobContext
     * @see Partitioner  
     * @see Reducer
     */
    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    
      public class Context 
        extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
        public Context(Configuration conf, TaskAttemptID taskid,
                       RecordReader<KEYIN,VALUEIN> reader,
                       RecordWriter<KEYOUT,VALUEOUT> writer,
                       OutputCommitter committer,
                       StatusReporter reporter,
                       InputSplit split) throws IOException, InterruptedException {
          super(conf, taskid, reader, writer, committer, reporter, split);
        }
      }
      /**
       * Called once at the beginning of the task.
       */
      protected void setup(Context context
                           ) throws IOException, InterruptedException {
        // NOTHING
      }
      /**
       * Called once for each key/value pair in the input split. Most applications
       * should override this, but the default is the identity function.
       */
      @SuppressWarnings("unchecked")
      protected void map(KEYIN key, VALUEIN value, 
                         Context context) throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
      }
      /**
       * Called once at the end of the task.
       */
      protected void cleanup(Context context
                             ) throws IOException, InterruptedException {
        // NOTHING
      }
      /**
       * Expert users can override this method for more complete control over the
       * execution of the Mapper.
       * @param context
       * @throws IOException
       */
      public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
          map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
        cleanup(context);
      }
    }

    从代码中可以看到,Mapper类中定义了一个新的类Context,继承自MapContext

    我们来看看MapContext类的源代码:

    package org.apache.hadoop.mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    /**
     * The context that is given to the {@link Mapper}.
     * @param <KEYIN> the key input type to the Mapper
     * @param <VALUEIN> the value input type to the Mapper
     * @param <KEYOUT> the key output type from the Mapper
     * @param <VALUEOUT> the value output type from the Mapper
     */
    public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
      extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
      private RecordReader<KEYIN,VALUEIN> reader;
      private InputSplit split;
    
      public MapContext(Configuration conf, TaskAttemptID taskid,
                        RecordReader<KEYIN,VALUEIN> reader,
                        RecordWriter<KEYOUT,VALUEOUT> writer,
                        OutputCommitter committer,
                        StatusReporter reporter,
                        InputSplit split) {
        super(conf, taskid, writer, committer, reporter);
        this.reader = reader;
        this.split = split;
      }
      /**
       * Get the input split for this map.
       */
      public InputSplit getInputSplit() {
        return split;
      }
      @Override
      public KEYIN getCurrentKey() throws IOException, InterruptedException {
        return reader.getCurrentKey();
      }
      @Override
      public VALUEIN getCurrentValue() throws IOException, InterruptedException {
        return reader.getCurrentValue();
      }
      @Override
      public boolean nextKeyValue() throws IOException, InterruptedException {
        return reader.nextKeyValue();
      }
    }

    MapContext类继承自TaskInputOutputContext,再看看TaskInputOutputContext类的代码:

    package org.apache.hadoop.mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.util.Progressable;
    /**
     * A context object that allows input and output from the task. It is only
     * supplied to the {@link Mapper} or {@link Reducer}.
     * @param <KEYIN> the input key type for the task
     * @param <VALUEIN> the input value type for the task
     * @param <KEYOUT> the output key type for the task
     * @param <VALUEOUT> the output value type for the task
     */
    public abstract class TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
           extends TaskAttemptContext implements Progressable {
      private RecordWriter<KEYOUT,VALUEOUT> output;
      private StatusReporter reporter;
      private OutputCommitter committer;
    
      public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid,
                                    RecordWriter<KEYOUT,VALUEOUT> output,
                                    OutputCommitter committer,
                                    StatusReporter reporter) {
        super(conf, taskid);
        this.output = output;
        this.reporter = reporter;
        this.committer = committer;
      }
      /**
       * Advance to the next key, value pair, returning null if at end.
       * @return the key object that was read into, or null if no more
       */
      public abstract 
      boolean nextKeyValue() throws IOException, InterruptedException;
      /**
       * Get the current key.
       * @return the current key object or null if there isn't one
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract 
      KEYIN getCurrentKey() throws IOException, InterruptedException;
      /**
       * Get the current value.
       * @return the value object that was read into
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract VALUEIN getCurrentValue() throws IOException, 
                                                       InterruptedException;
      /**
       * Generate an output key/value pair.
       */
      public void write(KEYOUT key, VALUEOUT value
                        ) throws IOException, InterruptedException {
        output.write(key, value);
      }
      public Counter getCounter(Enum<?> counterName) {
        return reporter.getCounter(counterName);
      }
      public Counter getCounter(String groupName, String counterName) {
        return reporter.getCounter(groupName, counterName);
      }
      @Override
      public void progress() {
        reporter.progress();
      }
      @Override
      public void setStatus(String status) {
        reporter.setStatus(status);
      }
      public OutputCommitter getOutputCommitter() {
        return committer;
      }
    }

    TaskInputOutputContext类继承自TaskAttemptContext,实现了Progressable接口,先看看Progressable接口的代码:

    package org.apache.hadoop.util;
    /**
     * A facility for reporting progress.
     * 
     * <p>Clients and/or applications can use the provided <code>Progressable</code>
     * to explicitly report progress to the Hadoop framework. This is especially
     * important for operations which take an insignificant amount of time since,
     * in-lieu of the reported progress, the framework has to assume that an error
     * has occured and time-out the operation.</p>
     */
    public interface Progressable {
      /**
       * Report progress to the Hadoop framework.
       */
      public void progress();
    }

    TaskAttemptContext类的代码:

    package org.apache.hadoop.mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.util.Progressable;
    /**
     * The context for task attempts.
     */
    public class TaskAttemptContext extends JobContext implements Progressable {
      private final TaskAttemptID taskId;
      private String status = "";
      
      public TaskAttemptContext(Configuration conf, 
                                TaskAttemptID taskId) {
        super(conf, taskId.getJobID());
        this.taskId = taskId;
      }
      /**
       * Get the unique name for this task attempt.
       */
      public TaskAttemptID getTaskAttemptID() {
        return taskId;
      }
    
      /**
       * Set the current status of the task to the given string.
       */
      public void setStatus(String msg) throws IOException {
        status = msg;
      }
      /**
       * Get the last set status message.
       * @return the current status message
       */
      public String getStatus() {
        return status;
      }
      /**
       * Report progress. The subtypes actually do work in this method.
       */
      public void progress() { 
      }
    }

    TaskAttemptContext继承自类JobContext,最后来看看JobContext的源代码:

    package org.apache.hadoop.mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    /**
     * A read-only view of the job that is provided to the tasks while they
     * are running.
     */
    public class JobContext {
      // Put all of the attribute names in here so that Job and JobContext are
      // consistent.
      protected static final String INPUT_FORMAT_CLASS_ATTR = 
        "mapreduce.inputformat.class";
      protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
      protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
      protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
      protected static final String OUTPUT_FORMAT_CLASS_ATTR = 
        "mapreduce.outputformat.class";
      protected static final String PARTITIONER_CLASS_ATTR = 
        "mapreduce.partitioner.class";
    
      protected final org.apache.hadoop.mapred.JobConf conf;
      private final JobID jobId;
      
      public JobContext(Configuration conf, JobID jobId) {
        this.conf = new org.apache.hadoop.mapred.JobConf(conf);
        this.jobId = jobId;
      }
      /**
       * Return the configuration for the job.
       * @return the shared configuration object
       */
      public Configuration getConfiguration() {
        return conf;
      }
    
      /**
       * Get the unique ID for the job.
       * @return the object with the job id
       */
      public JobID getJobID() {
        return jobId;
      }
      /**
       * Get configured the number of reduce tasks for this job. Defaults to 
       * <code>1</code>.
       * @return the number of reduce tasks for this job.
       */
      public int getNumReduceTasks() {
        return conf.getNumReduceTasks();
      }
      /**
       * Get the current working directory for the default file system.
       * 
       * @return the directory name.
       */
      public Path getWorkingDirectory() throws IOException {
        return conf.getWorkingDirectory();
      }
      /**
       * Get the key class for the job output data.
       * @return the key class for the job output data.
       */
      public Class<?> getOutputKeyClass() {
        return conf.getOutputKeyClass();
      }
      /**
       * Get the value class for job outputs.
       * @return the value class for job outputs.
       */
      public Class<?> getOutputValueClass() {
        return conf.getOutputValueClass();
      }
      /**
       * Get the key class for the map output data. If it is not set, use the
       * (final) output key class. This allows the map output key class to be
       * different than the final output key class.
       * @return the map output key class.
       */
      public Class<?> getMapOutputKeyClass() {
        return conf.getMapOutputKeyClass();
      }
      /**
       * Get the value class for the map output data. If it is not set, use the
       * (final) output value class This allows the map output value class to be
       * different than the final output value class.
       *  
       * @return the map output value class.
       */
      public Class<?> getMapOutputValueClass() {
        return conf.getMapOutputValueClass();
      }
      /**
       * Get the user-specified job name. This is only used to identify the 
       * job to the user.
       * 
       * @return the job's name, defaulting to "".
       */
      public String getJobName() {
        return conf.getJobName();
      }
      /**
       * Get the {@link InputFormat} class for the job.
       * 
       * @return the {@link InputFormat} class for the job.
       */
      @SuppressWarnings("unchecked")
      public Class<? extends InputFormat<?,?>> getInputFormatClass() 
         throws ClassNotFoundException {
        return (Class<? extends InputFormat<?,?>>) 
          conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
      }
      /**
       * Get the {@link Mapper} class for the job.
       * 
       * @return the {@link Mapper} class for the job.
       */
      @SuppressWarnings("unchecked")
      public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
         throws ClassNotFoundException {
        return (Class<? extends Mapper<?,?,?,?>>) 
          conf.getClass(MAP_CLASS_ATTR, Mapper.class);
      }
      /**
       * Get the combiner class for the job.
       * 
       * @return the combiner class for the job.
       */
      @SuppressWarnings("unchecked")
      public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
         throws ClassNotFoundException {
        return (Class<? extends Reducer<?,?,?,?>>) 
          conf.getClass(COMBINE_CLASS_ATTR, null);
      }
      /**
       * Get the {@link Reducer} class for the job.
       * 
       * @return the {@link Reducer} class for the job.
       */
      @SuppressWarnings("unchecked")
      public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
         throws ClassNotFoundException {
        return (Class<? extends Reducer<?,?,?,?>>) 
          conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
      }
      /**
       * Get the {@link OutputFormat} class for the job.
       * 
       * @return the {@link OutputFormat} class for the job.
       */
      @SuppressWarnings("unchecked")
      public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
         throws ClassNotFoundException {
        return (Class<? extends OutputFormat<?,?>>) 
          conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
      }
      /**
       * Get the {@link Partitioner} class for the job.
       * 
       * @return the {@link Partitioner} class for the job.
       */
      @SuppressWarnings("unchecked")
      public Class<? extends Partitioner<?,?>> getPartitionerClass() 
         throws ClassNotFoundException {
        return (Class<? extends Partitioner<?,?>>) 
          conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
      }
      /**
       * Get the {@link RawComparator} comparator used to compare keys.
       * 
       * @return the {@link RawComparator} comparator used to compare keys.
       */
      public RawComparator<?> getSortComparator() {
        return conf.getOutputKeyComparator();
      }
      /**
       * Get the pathname of the job's jar.
       * @return the pathname
       */
      public String getJar() {
        return conf.getJar();
      }
      /** 
       * Get the user defined {@link RawComparator} comparator for 
       * grouping keys of inputs to the reduce.
       * 
       * @return comparator set by the user for grouping values.
       * @see Job#setGroupingComparatorClass(Class) for details.  
       */
      public RawComparator<?> getGroupingComparator() {
        return conf.getOutputValueGroupingComparator();
      }
    }

    参考资料

    《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》

  • 相关阅读:
    HDU 1358 Period (KMP)
    POJ 1042 Gone Fishing
    Csharp,Javascript 获取显示器的大小的几种方式
    css text 自动换行的实现方法 Internet Explorer,Firefox,Opera,Safar
    Dynamic Fonts动态设置字体大小存入Cookie
    CSS Image Rollovers翻转效果Image Sprites图片精灵
    CSS three column layout
    css 自定义字体 Internet Explorer,Firefox,Opera,Safari
    颜色选择器 Color Picker,Internet Explorer,Firefox,Opera,Safar
    CSS TextShadow in Safari, Opera, Firefox and more
  • 原文地址:https://www.cnblogs.com/wuyudong/p/4356530.html
Copyright © 2011-2022 走看看