zoukankan      html  css  js  c++  java
  • hadoop map任务Combiner被调用的源码逻辑简要分析

     
    从MapTask类中分析下去,看一下map任务是如何被调用并执行的。
     
    入口方法是MapTask的run方法,看一下run方法的相关介绍:
     
    org.apache.hadoop.mapred.Task
    public abstract void run(JobConf job,
                             TaskUmbilicalProtocol umbilical)
                     throws IOException,
                            ClassNotFoundException,
                            InterruptedException
    Run this task as a part of the named job. This method is executed in the child process and is what invokes user-supplied map, reduce, etc. methods.
    Parameters:
    umbilical - for progress reports
     
     
    可以看出,这个方法接收两个参数,jobconf就是任务的相关配置,而umbilical是作为任务进度reporter的身份存在的。
     
    map任务根据是否有reduce部分,而将进度分成两种样式,
    // If there are no reducers then there won't be any sort. Hence the map
          // phase will govern the entire attempt's progress.
          if (conf.getNumReduceTasks() == 0) {
            mapPhase = getProgress().addPhase("map", 1.0f);
          } else {
            // If there are reducers then the entire attempt's progress will be
            // split between the map phase (67%) and the sort phase (33%).
            mapPhase = getProgress().addPhase("map", 0.667f);
            sortPhase  = getProgress().addPhase("sort", 0.333f);
          }
     
     
    如果没有reduce任务,就没有必要为map的结果进行归并排序操作了,那么整个map过程将以100%进度执行;相反,如果其中含有reduce任务,那么map的任务被分成两部分,map函数执行的部分占整个进度的66.7%(此时我们在RecordReader中的getProgress仅仅给出的是相对这部分的百分比值),剩下的33.3%赋予归并排序的过程。
     
    是否使用当前的new-api执行会有所不同,建议都使用最新的api,org.apache.hadoop.mapreduce是最新的api,org.apache.hadoop.mapred是旧的api,我们这里分析的也是new-api。
    if (useNewApi) {
         runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
         runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
     
     
    整个map执行过程大概如下:
        
     input.initialize(split, mapperContext);
          mapper.run(mapperContext);
          mapPhase.complete();
          setPhase(TaskStatus.Phase.SORT);
          statusUpdate(umbilical);
          input.close();
          input = null;
          output.close(mapperContext);
          output = null;
     
     
    首先,初始化RecordReader,执行map具体的函数,待所有的map函数执行完成之后,进入SORT阶段,排序完成后,整个map过程就执行完成了。
     
    在new-api中的map函数执行的具体过程:
    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
          while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
          }
        } finally {
          cleanup(context);
        }
      }
     
     
    执行完初始化方法之后,从RecordReader中依次执行nextKeyValue()方法,当返回true时,拿到其key和value,并执行map函数,直到该分片内的数据都已经读取完毕(nextKeyValue()返回false),最后执行清理操作。由于new-api与old-api的差异比较大,分析这段代码可以让我们更加深入地理解这个过程,避免陷入新的编程模型的坑。
     
    在MapTask执行时,根据是否有Reduce任务,其RecordWriter(OutputCollector) 也会有所不同,如果没有直接返回不需要排序,否则构造一个NewOutputCollector类。在NewOutputCollector的构造函数中,会调用方法createSortingCollector(job, reporter),会根据job的参数:mapreduce.job.map.output.collector.class构造一个OutputCollector(需要实现接口:org.apache.hadoop.mapred.MapOutputCollector,主要负责Map端数据的收集,虽然好像很少有人定制这个),默认使用org.apache.hadoop.mapred.MapOutputBuffer类。
     
    在初始化init方法中,mapreduce.map.sort.spill.percent用于控制缓冲区达到哪个百分比后,开始进行Spill操作,默认值80%,只能是[0, 1]中间的数;mapreduce.task.io.sort.mb确定缓冲区的大小,默认值100M,这部分是占用Map堆内存的,设置太大,Map端堆内存也需要设置较大,设置太小,spill的次数可能变多,最大不能设置操作32G,需要具体情况衡量;
     
    //sanity checks
          final float spillper =
            job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
          final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
          indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                             INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
          if (spillper > (float)1.0 || spillper <= (float)0.0) {
            throw new IOException("Invalid "" + JobContext.MAP_SORT_SPILL_PERCENT +
                "": " + spillper);
          }
          if ((sortmb & 0x7FF) != sortmb) {
            throw new IOException(
                "Invalid "" + JobContext.IO_SORT_MB + "": " + sortmb);
          }
          sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
                QuickSort.class, IndexedSorter.class), job);
     
     
    甚至map.sort.class排序的类都可以定制,默认使用的是快速排序。
     
     
    SpillThread就是控制溢写的线程,是一个守护线程,在MapOutputBuffer中被调用:
     
    spillThread.setDaemon(true);
          spillThread.setName("SpillThread");
          spillLock.lock();
          try {
            spillThread.start();
            while (!spillThreadRunning) {
              spillDone.await();
            }
          } catch (InterruptedException e) {
            throw new IOException("Spill thread failed to initialize", e);
          } finally {
            spillLock.unlock();
          }
          if (sortSpillException != null) {
            throw new IOException("Spill thread failed to initialize",
                sortSpillException);
          }
     
     
    spillLock是一个可重入锁对象,带有两种类型的条件,done以及ready,分别表示溢写完成和准备开始溢写。
    final ReentrantLock spillLock = new ReentrantLock();
        final Condition spillDone = spillLock.newCondition();
        final Condition spillReady = spillLock.newCondition();
     
     
    根据代码理解,当累积的index内存超出mapreduce.task.index.cache.limit.bytes的限制后(1024*1024),就会将索引的文件放到本地磁盘上,否则一直在内存里。
     
    indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                             INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
    
    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
              // create spill index file
              Path indexFilename =
                  mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                      * MAP_OUTPUT_INDEX_RECORD_LENGTH);
              spillRec.writeToFile(indexFilename, job);
            } else {
              indexCacheList.add(spillRec);
              totalIndexCacheMemory +=
                spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
            }
     
     
     在spill的过程中,还会同时进行Combine操作:
    if (combinerRunner == null) {
                  // spill directly
                  DataInputBuffer key = new DataInputBuffer();
                  while (spindex < mend &&
                      kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                    final int kvoff = offsetFor(spindex % maxRec);
                    int keystart = kvmeta.get(kvoff + KEYSTART);
                    int valstart = kvmeta.get(kvoff + VALSTART);
                    key.reset(kvbuffer, keystart, valstart - keystart);
                    getVBytesForOffset(kvoff, value);
                    writer.append(key, value);
                    ++spindex;
                  }
                } else {
                  int spstart = spindex;
                  while (spindex < mend &&
                      kvmeta.get(offsetFor(spindex % maxRec)
                                + PARTITION) == i) {
                    ++spindex;
                  }
                  // Note: we would like to avoid the combiner if we've fewer
                  // than some threshold of records for a partition
                  if (spstart != spindex) {
                    combineCollector.setWriter(writer);
                    RawKeyValueIterator kvIter =
                      new MRResultIterator(spstart, spindex);
                    combinerRunner.combine(kvIter, combineCollector);
                  }
                }
     
     
    如果combineRunner是空,表示没有设置Combine Class,这时就会直接溢写;否则,就可能进行Combiner过程。
     
    注意下面的注释,”在某个partition的记录数量过少时,会避免使用Combiner”,可以理解为,为了性能考虑,避免无用的一些操作。
     
    执行Combiner的方法,CombinerRunner.combine,由于要兼容老版本的类型,分为Old/NewCombineRunner,我们查看的是NewCombineRunner,
     
    public void combine(RawKeyValueIterator iterator,
                     OutputCollector<K,V> collector
                     ) throws IOException, InterruptedException,
                              ClassNotFoundException {
          // make a reducer
          org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
            (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
              ReflectionUtils.newInstance(reducerClass, job);
          org.apache.hadoop.mapreduce.Reducer.Context
               reducerContext = createReduceContext(reducer, job, taskId,
                                                    iterator, null, inputCounter,
                                                    new OutputConverter(collector),
                                                    committer,
                                                    reporter, comparator, keyClass,
                                                    valueClass);
          reducer.run(reducerContext);
        }
     
     
    从代码中可以看出,Combiner就是Map端的Reducer,执行方式所有参数都与Reducer没有差别。
     
    Combine会在两个时间点被调用,当map端缓冲区内存占用达到mapreduce.map.sort.spill.percent时,SpillThread进行,进行完成后,会执行一次Combine操作(合理地设置内存大小能够避免过多地spill,尽量放到内存操作);还有一次,就是当Map端Collector收集数据完成之后,在MapOutputBuffer.flush()方法刷新缓冲区之后,也会调用sortAndSpill操作。
     
    但是要注意,combine调用的不确定性,一定不能根据combine执行的次数编程,要做到Combine是幂等操作(@Idempotence, hadoop的源码中这种操作非常多)。
     
    Spill的时候可以打印出来的一些日志:
    2014-11-27 21:06:35,113 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output
    2014-11-27 21:06:35,113 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 328052900; bufend = 93461297; bufvoid = 536870912
    2014-11-27 21:06:35,113 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 82013220(328052880); kvend = 50208864(200835456); length = 31804357/33554432
    2014-11-27 21:06:35,113 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 125876129 kvi 31469028(125876112)
    2014-11-27 21:07:04,801 INFO [main] org.apache.hadoop.mapred.MapTask: Finished spill 99
     
    最后合理地使用Combiner可以大大减少中间结果的大小,同时带来Reducer端数据处理速度的提升,下面就是我们加入Combiner之后的输入输出纪录对比:


     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     

     

  • 相关阅读:
    vuejs学习小结(数据处理)
    vuejs的遇到的问题小结
    ES6 对象扩展
    webpack的两个难点
    Sass入门:第二章
    Sass入门:第一章
    Effective JavaScript :第六章
    Effective JavaScript :第五章
    Effective JavaScript :第四章
    Effective JavaScript :第三章
  • 原文地址:https://www.cnblogs.com/mmaa/p/5789898.html
Copyright © 2011-2022 走看看