zoukankan      html  css  js  c++  java
  • 【转】[Hadoop源码解读](六)MapReduce篇之MapTask类

    转自:http://www.cnblogs.com/lucius/p/3449959.html

    MapTask类继承于Task类,它最主要的方法就是run(),用来执行这个Map任务。

      run()首先设置一个TaskReporter并启动,然后调用JobConf的getUseNewAPI()判断是否使用New API,使用New API的设置在前面[Hadoop源码解读](三)MapReduce篇之Job类 讲 到过,再调用Task继承来的initialize()方法初始化这个task,接着根据需要执行runJobCleanupTask()、 runJobSetupTask()、runTaskCleanupTask()或相应的Mapper,执行Mapper时根据情况使用不同版本的 MapReduce,这个版本是设置参数决定的。

    复制代码
     1   @Override
     2   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) 
     3     throws IOException, ClassNotFoundException, InterruptedException {
     4     this.umbilical = umbilical;
     5 
     6     // start thread that will handle communication with parent
     7     TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
     8         jvmContext);
     9     reporter.startCommunicationThread();
    10     boolean useNewApi = job.getUseNewMapper();  //是由JobConf来的,而New API 的JobContext包含一个JobConf,Job类有
    11     //setUseNewAPI()方法,当Job.submit()时使用它,这样,waitForCompletion()就用submit()设置了使用New API,而此时就使用它。
    12     initialize(job, getJobID(), reporter, useNewApi);//一个Task的初始化工作,包括jobContext,taskContext,输出路径等,
    13                                  //使用的是Task.initialize()方法
    14  
    15     // check if it is a cleanupJobTask
    16     if (jobCleanup) {
    17       runJobCleanupTask(umbilical, reporter);
    18       return;
    19     }
    20     if (jobSetup) {
    21       runJobSetupTask(umbilical, reporter);
    22       return;
    23     }
    24     if (taskCleanup) {
    25       runTaskCleanupTask(umbilical, reporter);
    26       return;
    27     }
    28 
    29     if (useNewApi) {//根据情况使用不同的MapReduce版本执行Mapper
    30       runNewMapper(job, splitMetaInfo, umbilical, reporter);
    31     } else {
    32       runOldMapper(job, splitMetaInfo, umbilical, reporter);
    33     }
    34     done(umbilical, reporter);
    35   }
    复制代码

     runNewMapper对应new API的MapReduce,而runOldMapper对应旧API。

       runNewMapper首先创建TaskAttemptContext对象,Mapper对象,InputFormat对 象,InputSplit,RecordReader;然后根据是否有Reduce task来创建不同的输出收集器NewDirectOutputCollector[没有reducer]或NewOutputCollector[有 reducer],接下来调用input.initialize()初始化RecordReader,主要是为输入做准备,设置 RecordReader,输入路径等等。然后到最主要的部分:mapper.run()。这个方法就是调用前面[Hadoop源码解读](二)MapReduce篇之Mapper类讲到的Mapper.class的run()方法。然后就是一条一条的读取K/V对,这样就衔接起来了。

    复制代码
     1  @SuppressWarnings("unchecked")
     2   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
     3   void runNewMapper(final JobConf job,
     4                     final TaskSplitIndex splitIndex,
     5                     final TaskUmbilicalProtocol umbilical,
     6                     TaskReporter reporter
     7                     ) throws IOException, ClassNotFoundException,
     8                              InterruptedException {
     9     // make a task context so we can get the classes
    10     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
    11       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
    12     // make a mapper
    13     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
    14       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
    15         ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    16     // make the input format
    17     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
    18       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
    19         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    20     // rebuild the input split
    21     org.apache.hadoop.mapreduce.InputSplit split = null;
    22     split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
    23         splitIndex.getStartOffset());
    24 
    25     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
    26       new NewTrackingRecordReader<INKEY,INVALUE>
    27           (split, inputFormat, reporter, job, taskContext);
    28 
    29     job.setBoolean("mapred.skip.on", isSkipping());
    30     org.apache.hadoop.mapreduce.RecordWriter output = null;
    31     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
    32          mapperContext = null;
    33     try {
    34       Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
    35         org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
    36         (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
    37                      Configuration.class,
    38                      org.apache.hadoop.mapreduce.TaskAttemptID.class,
    39                      org.apache.hadoop.mapreduce.RecordReader.class,
    40                      org.apache.hadoop.mapreduce.RecordWriter.class,
    41                      org.apache.hadoop.mapreduce.OutputCommitter.class,  //
    42                      org.apache.hadoop.mapreduce.StatusReporter.class,
    43                      org.apache.hadoop.mapreduce.InputSplit.class});
    44 
    45       // get an output object
    46       if (job.getNumReduceTasks() == 0) {
    47          output =
    48            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    49       } else {
    50         output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    51       }
    52 
    53       mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
    54                                                      input, output, committer,
    55                                                      reporter, split);
    56 
    57       input.initialize(split, mapperContext);
    58       mapper.run(mapperContext);
    59       input.close();
    60       output.close(mapperContext);
    61     } catch (NoSuchMethodException e) {
    62       throw new IOException("Can't find Context constructor", e);
    63     } catch (InstantiationException e) {
    64       throw new IOException("Can't create Context", e);
    65     } catch (InvocationTargetException e) {
    66       throw new IOException("Can't invoke Context constructor", e);
    67     } catch (IllegalAccessException e) {
    68       throw new IOException("Can't invoke Context constructor", e);
    69     }
    70   }
    复制代码

    至于运行哪个Mapper类,一般是我们用job.setMapperClass(SelectGradeMapper.class)设置的,那设置后是怎样获取的,或者默认值是什么,且看下面的追溯。

                MapTask.runNewMapper()

    =>       (TaskAttemptContext)taskContext.getMapperClass();     //runNewMapper生成mapper时用到。

    =>       JobContext.getMapperClass()

    =>       JobConf.getClass(MAP_CLASS_ATTR,Mapper.class)

    =>       Configuration.getClass(name,default)

    根据上面一层的调用关系,找到了默认值是Mapper.class,它的获取过程也一目了然。

    再仔细看看Configuration.getClass()

    复制代码
     1   public Class<?> getClass(String name, Class<?> defaultValue) {
     2     String valueString = get(name);
     3     if (valueString == null)
     4       return defaultValue;
     5     try {
     6       return getClassByName(valueString);
     7     } catch (ClassNotFoundException e) {
     8       throw new RuntimeException(e);
     9     }
    10   }
    复制代码

    它首先看是否设置了某个属性,如果设置了,就调用getClassByName获取这个属性对应的类[加载之],否则就返回默认值。
    Mapper执行完后,关闭RecordReader和OutputCollector等资源就完事了。

    另外我们把关注点放在上面的runNewMapper()中的mapper.run(mapperContext);前面对Mapper.class提到,这个mapperContext会被用于读取输入分片的K/V对和写出输出结果的K/V对。而由

          mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                         input, output, committer,
                                                         reporter, split);

    可以看出,这个Context是由我们设置的mapper,RecordReader等进行配置的。

    Mapper中的map方法不断使用context.write(K,V)进行输出,我们看这个函数是怎么进行的,先看Context类的层次关系:

    write()方法是由TaskInputOutputContext来的:

      public void write(KEYOUT key, VALUEOUT value
                        ) throws IOException, InterruptedException {
        output.write(key, value);
      }

    它调用了RecordWriter.write(),RecordWriter是一个抽象类,主要是规定了write方法。

    复制代码
    public abstract class RecordWriter<K, V> {
      public abstract void write(K key, V value
                                 ) throws IOException, InterruptedException;
    
      public abstract void close(TaskAttemptContext context
                                 ) throws IOException, InterruptedException;
    }
    复制代码

    然后看RecordWriter的一个实现NewOutputCollector,它是MapTask的内部类:

    复制代码
     1   private class NewOutputCollector<K,V>
     2     extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     3     private final MapOutputCollector<K,V> collector;
     4     private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
     5     private final int partitions;
     6 
     7     @SuppressWarnings("unchecked")
     8     NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
     9                        JobConf job,
    10                        TaskUmbilicalProtocol umbilical,
    11                        TaskReporter reporter
    12                        ) throws IOException, ClassNotFoundException {
    13       collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
    14       partitions = jobContext.getNumReduceTasks();
    15       if (partitions > 0) {
    16         partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
    17           ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
    18       } else {
    19         partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
    20           @Override
    21           public int getPartition(K key, V value, int numPartitions) {
    22             return -1;
    23           }
    24         };
    25       }
    26     }
    27 
    28     @Override
    29     public void write(K key, V value) throws IOException, InterruptedException {
    30       collector.collect(key, value,
    31                         partitioner.getPartition(key, value, partitions));
    32     }
    33 
    34     @Override
    35     public void close(TaskAttemptContext context
    36                       ) throws IOException,InterruptedException {
    37       try {
    38         collector.flush();
    39       } catch (ClassNotFoundException cnf) {
    40         throw new IOException("can't find class ", cnf);
    41       }
    42       collector.close();
    43     }
    44   }
    复制代码

    从它的write()方法,我们从context.write(K,V)追溯到了 collector.collect(K,V,partition),注意到输出需要一个Partitioner的getPartitioner()来提 供当前K/V对的所属分区,因为要对K/V对分区,不同分区输出到不同Reducer,Partitioner默认是HashPartitioner,可 设置,Reduce task数量决定Partition数量;

    我们可以从NewOutputCollector看出NewOutputCollector就是MapOutputBuffer的封装。 MapoutputBuffer是旧API中就存在了的,它很复杂,但很关键,暂且放着先,反正就是收集输出K/V对的。它实现了 MapperOutputCollector接口:

    复制代码
      interface MapOutputCollector<K, V> {
        public void collect(K key, V value, int partition
                            ) throws IOException, InterruptedException;
        public void close() throws IOException, InterruptedException;
        public void flush() throws IOException, InterruptedException, 
                                   ClassNotFoundException;
      }
    复制代码

    这个接口告诉我们,收集器必须实现collect,close,flush方法。

    看一个简单的:NewDirectOutputCollector,它在没有reduce task的时候使用,主要是从InputFormat中获取OutputFormat的RecordWriter,然后就可以用这个 RecordWriter的write()方法来写出,这就与我们设置的输出格式对应起来了。

    复制代码
     1   private class NewDirectOutputCollector<K,V>
     2   extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     3     private final org.apache.hadoop.mapreduce.RecordWriter out;
     4 
     5     private final TaskReporter reporter;
     6 
     7     private final Counters.Counter mapOutputRecordCounter;
     8     private final Counters.Counter fileOutputByteCounter; 
     9     private final Statistics fsStats;
    10     
    11     @SuppressWarnings("unchecked")
    12     NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
    13         JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
    14     throws IOException, ClassNotFoundException, InterruptedException {
    15       this.reporter = reporter;
    16       Statistics matchedStats = null;
    17       if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) { 
    18         //outputFormat是Task来的,内部类访问外部类成员变量
    19         matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
    20             .getOutputPath(jobContext), job);
    21       }
    22       fsStats = matchedStats;
    23       mapOutputRecordCounter = 
    24         reporter.getCounter(MAP_OUTPUT_RECORDS);
    25       fileOutputByteCounter = reporter
    26           .getCounter(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN);
    27 
    28       long bytesOutPrev = getOutputBytes(fsStats);
    29       out = outputFormat.getRecordWriter(taskContext); //主要是这句,获取设置的OutputputFormat里的RecordWriter
    30       long bytesOutCurr = getOutputBytes(fsStats);
    31       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
    32     }
    33 
    34     @Override
    35     @SuppressWarnings("unchecked")
    36     public void write(K key, V value) 
    37     throws IOException, InterruptedException {
    38       reporter.progress();  //报告一下进度
    39       long bytesOutPrev = getOutputBytes(fsStats);
    40       out.write(key, value);//使用out收集一条记录,out是设置的OutputFormat来的。
    41       long bytesOutCurr = getOutputBytes(fsStats);
    42       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);  //更新输出字节数
    43       mapOutputRecordCounter.increment(1);      //更新输出K/V对数量
    44     }
    45 
    46     @Override
    47     public void close(TaskAttemptContext context) 
    48     throws IOException,InterruptedException {
    49       reporter.progress();
    50       if (out != null) {
    51         long bytesOutPrev = getOutputBytes(fsStats);
    52         out.close(context);
    53         long bytesOutCurr = getOutputBytes(fsStats);
    54         fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
    55       }
    56     }
    57 
    58     private long getOutputBytes(Statistics stats) {
    59       return stats == null ? 0 : stats.getBytesWritten();
    60     }
    61   }
    复制代码

    另外还有一些以runOldMapper()为主导的旧MapReduce API那套,就不进行讨论了。

  • 相关阅读:
    自学Linux Shell2.1-进入shell命令行
    自学Linux Shell1.3-Linux文件系统
    自学Linux Shell1.2-Linux目录结构
    自学Linux Shell1.1-Linux初识
    03 自学Aruba之2.4GHz及5GHz无线信道
    02 自学Aruba之无线频段---ISM频段及UNII频段
    01 自学Aruba之功率单位和相对单位
    1.Zabbix报错信息:It probably means that the systems requires more physical memory.
    自学Aruba5.3.4-Aruba安全认证-有PEFNG 许可证环境的认证配置802.1x
    自学Aruba5.3.3-Aruba安全认证-有PEFNG 许可证环境的认证配置Captive-Portal
  • 原文地址:https://www.cnblogs.com/conie/p/3583605.html
Copyright © 2011-2022 走看看