zoukankan      html  css  js  c++  java
  • YARN源码分析(八)-----Reduce Shuffle过程分析

    前言

    在Hadoop Job的各个运行过程中,Shuffle阶段一直是一个比较神秘的过程.因为Shuffle阶段是隶属于Reduce过程的子过程,所以很多时候会被人所忽略.但是Shffle的整个过程在map reduce的整个过程中起到1个数据过渡的作用.正因为这个模块的重要性,Hadoop把这个模块设置成了可插拔的模块,用户可以根据自己应用的类型特点,定制自己的Shuffle模块代码.之前粗粗的阅读了一下相关的代码,于是写一些内容记录一下所学的.


    Shuffle过程

    Shuffle过程是Reduce阶段的初始操作阶段,过程简单的理解就是"远程数据拷贝"的过程,拷贝的目标数据源就是map的中间输出结果.reduce过程要想进一步进行处理操作,首先必须要做的就是拿到这批数据.一般map的中间结果文件是写出在当前的Task运行的节点上,所以reduce task拷贝数据会经过走网络的过程.而且如果这其中的量比较大的话,会消耗掉一定的网络带宽.


    Shuffle模块源码分析

    下面从源代码层面浅析此模块部分的代码,首先这个阶段是属于Reduce的过程中的,所以定位到Reduce Task的代码上.

    @Override
      @SuppressWarnings("unchecked")
      public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
        throws IOException, InterruptedException, ClassNotFoundException {
        job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    
        if (isMapOrReduce()) {
          copyPhase = getProgress().addPhase("copy");
          sortPhase  = getProgress().addPhase("sort");
          reducePhase = getProgress().addPhase("reduce");
        }
        ....
        
        // Initialize the codec
        codec = initCodec();
        RawKeyValueIterator rIter = null;
        ShuffleConsumerPlugin shuffleConsumerPlugin = null;
        
        Class combinerClass = conf.getCombinerClass();
        CombineOutputCollector combineCollector = 
          (null != combinerClass) ? 
         new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
    
        Class<? extends ShuffleConsumerPlugin> clazz =
              job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    					
        shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
        LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
    
        ShuffleConsumerPlugin.Context shuffleContext = 
          new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                      super.lDirAlloc, reporter, codec, 
                      combinerClass, combineCollector, 
                      spilledRecordsCounter, reduceCombineInputCounter,
                      shuffledMapsCounter,
                      reduceShuffleBytes, failedShuffleCounter,
                      mergedMapOutputsCounter,
                      taskStatus, copyPhase, sortPhase, this,
                      mapOutputFile, localMapFiles);
        shuffleConsumerPlugin.init(shuffleContext);
    
        rIter = shuffleConsumerPlugin.run();
        ....
    在Reduce Task中的run方法中能够看到shuffle部分的代码.首先是狗仔Shuffle上下文,然后是初始化操作,然后执行shuffle主操作.首先来看shuffle的上下文构造过程,他是一个内部类,在构造的过程中,传入了大量的变量参数,这些变量参数在Shuffle的过程中会被用到,下面是context中的变量定义,这些参数会由外部reduce task的参数传入到context上下文类中:

    @InterfaceAudience.LimitedPrivate("mapreduce")
      @InterfaceStability.Unstable
      public static class Context<K,V> {
        private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
        private final JobConf jobConf;
        private final FileSystem localFS;
        private final TaskUmbilicalProtocol umbilical;
        private final LocalDirAllocator localDirAllocator;
        private final Reporter reporter;
        private final CompressionCodec codec;
        private final Class<? extends Reducer> combinerClass;
        private final CombineOutputCollector<K, V> combineCollector;
        //与Shuffle过程相关计数器
        private final Counters.Counter spilledRecordsCounter;
        private final Counters.Counter reduceCombineInputCounter;
        private final Counters.Counter shuffledMapsCounter;
        private final Counters.Counter reduceShuffleBytes;
        private final Counters.Counter failedShuffleCounter;
        private final Counters.Counter mergedMapOutputsCounter;
        private final TaskStatus status;
        private final Progress copyPhase;
        private final Progress mergePhase;
        private final Task reduceTask;
        private final MapOutputFile mapOutputFile;
        private final Map<TaskAttemptID, MapOutputFile> localMapFiles;
    然后构造完成上下文后,进行init操作,就是下面这行代码:

    shuffleConsumerPlugin.init(shuffleContext);
    shuffleConsumerPlugin是接口类,所以要找到具体的实现类,在Hadoop-2.7.1中,是名为Shuffle类.下面是进行的初始化操作

    @InterfaceAudience.LimitedPrivate({"MapReduce"})
    @InterfaceStability.Unstable
    @SuppressWarnings({"unchecked", "rawtypes"})
    public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
      private static final int PROGRESS_FREQUENCY = 2000;
      private static final int MAX_EVENTS_TO_FETCH = 10000;
      private static final int MIN_EVENTS_TO_FETCH = 100;
      private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
      
      private ShuffleConsumerPlugin.Context context;
    
      .....
    
      @Override
      public void init(ShuffleConsumerPlugin.Context context) {
        this.context = context;
    
        this.reduceId = context.getReduceId();
        this.jobConf = context.getJobConf();
        this.umbilical = context.getUmbilical();
        this.reporter = context.getReporter();
        this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
        this.copyPhase = context.getCopyPhase();
        this.taskStatus = context.getStatus();
        this.reduceTask = context.getReduceTask();
        this.localMapFiles = context.getLocalMapFiles();
        
        scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
            this, copyPhase, context.getShuffledMapsCounter(),
            context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
        merger = createMergeManager(context);
      }
    将之前上下文中的参数变量值赋值到自己的内部变量中.还需要关注一下,代码最后一行merge操作类.merge操作会在shuffle阶段尾声阶段进行.下面是执行主方法:

    rIter = shuffleConsumerPlugin.run();
    跳到具体的实现方法中:

    @Override
      public RawKeyValueIterator run() throws IOException, InterruptedException {
        ......
    
        // Start the map-completion events fetcher thread
        final EventFetcher<K,V> eventFetcher = 
          new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
              maxEventsToFetch);
        eventFetcher.start();
        
        // Start the map-output fetcher threads
        boolean isLocal = localMapFiles != null;
        final int numFetchers = isLocal ? 1 :
          jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
        Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
        if (isLocal) {
          fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
              merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
              localMapFiles);
          fetchers[0].start();
        } else {
          for (int i=0; i < numFetchers; ++i) {
            fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                           reporter, metrics, this, 
                                           reduceTask.getShuffleSecret());
            fetchers[i].start();
          }
        }
        ......
    在Shuffle的主操作中可以看到,首先会根据map输出结果是否具有本地性,如果是在本地的,传入mapFile文件地址,然后都会新建若干个fetcher线程,来远程抓取数据.所以这里的核心操作应该是在Fetcher类中实现的.在Fetcher的start方法会执行拷贝的主要操作.

    public void run() {
        try {
          while (!stopped && !Thread.currentThread().isInterrupted()) {
            MapHost host = null;
            try {
              // If merge is on, block
              merger.waitForResource();
    
              // Get a host to shuffle from
              host = scheduler.getHost();
              metrics.threadBusy();
    
              // Shuffle
              copyFromHost(host);
            } finally {
              if (host != null) {
                scheduler.freeHost(host);
                metrics.threadFree();            
              }
            }
          }
        } catch (InterruptedException ie) {
          return;
        } catch (Throwable t) {
          exceptionReporter.reportException(t);
        }
      }
    进入copyFromHost方法,在方法的其实部分,先获取目标已经结束运行的map task列表.

      @VisibleForTesting
      protected void copyFromHost(MapHost host) throws IOException {
        // reset retryStartTime for a new host
        retryStartTime = 0;
        // Get completed maps on 'host'
        List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
        ....
    也就是说,后面的Shuffle阶段就会从这些map task运行所在的节点上进行fetch data的操作.在拷贝操作之前,维护一个remaining剩余变量操作

    // List of maps to be fetched yet
        Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
    接着首先根据变量获取输入流数据,判断是否在map task的host上是否真正存在数据

     // Construct the url and connect
        URL url = getMapOutputURL(host, maps);
        DataInputStream input = openShuffleUrl(host, remaining, url);
        if (input == null) {
          return;
        }
        
    然后后面的操作进行循环的拷贝读取

    TaskAttemptID[] failedTasks = null;
          while (!remaining.isEmpty() && failedTasks == null) {
            try {
              failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
            } catch (IOException e) {
              //
              // Setup connection again if disconnected by NM
              connection.disconnect();
              // Get map output from remaining tasks only.
              url = getMapOutputURL(host, remaining);
              input = openShuffleUrl(host, remaining, url);
              if (input == null) {
                return;
              }
            }
          }
    构造url,获得输入流数据,如果出现失败任务则就会退出循环,正常情况下,remain map数为空了,循环自然会退出.这里的拷贝操作细节又来到了copyMapOutput中.

    在拷贝操作之前,会进行拷贝总大小的计算,从输入流中读取.

    private TaskAttemptID[] copyMapOutput(MapHost host,
                                    DataInputStream input,
                                    Set<TaskAttemptID> remaining,
                                    boolean canRetry) throws IOException {
        MapOutput<K,V> mapOutput = null;
        TaskAttemptID mapId = null;
        long decompressedLength = -1;
        long compressedLength = -1;
        
        try {
          long startTime = Time.monotonicNow();
          int forReduce = -1;
          //Read the shuffle header
          try {
            ShuffleHeader header = new ShuffleHeader();
            header.readFields(input);
            mapId = TaskAttemptID.forName(header.mapId);
            compressedLength = header.compressedLength;
            decompressedLength = header.uncompressedLength;
            forReduce = header.forReduce;
            .....
    然后会根据计算过的拷贝数据量的大小,判断将数据拷贝到内存中还是磁盘中,然后返回相应的输出对象.

    // Do some basic sanity verification
          if (!verifySanity(compressedLength, decompressedLength, forReduce,
              remaining, mapId)) {
            return new TaskAttemptID[] {mapId};
          }
          
          if(LOG.isDebugEnabled()) {
            LOG.debug("header: " + mapId + ", len: " + compressedLength + 
                ", decomp len: " + decompressedLength);
          }
          
          // Get the location for the map output - either in-memory or on-disk
          try {
            mapOutput = merger.reserve(mapId, decompressedLength, id);
            .... 
    reserve方法就是主要进行判断操作的

    @Override
      public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
                                                 long requestedSize,
                                                 int fetcher
                                                 ) throws IOException {
        if (!canShuffleToMemory(requestedSize)) {
          LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
                   " is greater than maxSingleShuffleLimit (" + 
                   maxSingleShuffleLimit + ")");
          return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
                                          jobConf, mapOutputFile, fetcher, true);
        }
        
        .....
        
        if (usedMemory > memoryLimit) {
          LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
              + ") is greater than memoryLimit (" + memoryLimit + ")." + 
              " CommitMemory is (" + commitMemory + ")"); 
          return null;
        }
        
        // Allow the in-memory shuffle to progress
        LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
            + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
            + "CommitMemory is (" + commitMemory + ")"); 
        return unconditionalReserve(mapId, requestedSize, true);
      }

    /**
       * Unconditional Reserve is used by the Memory-to-Memory thread
       * @return
       */
      private synchronized InMemoryMapOutput<K, V> unconditionalReserve(
          TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
        usedMemory += requestedSize;
        return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize,
                                          codec, primaryMapOutput);
      }
    返回的对象有2种,1种是拷贝到内存中InMemoryMapOutput,还有1种是磁盘上的,OnDiskMapOutput.目标确定之后,进行Shuffle远程拷贝操作

     ....
          // The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
          // on decompression failures. Catching and re-throwing as IOException
          // to allow fetch failure logic to be processed
          try {
            // Go!
            LOG.info("fetcher#" + id + " about to shuffle output of map "
                + mapOutput.getMapId() + " decomp: " + decompressedLength
                + " len: " + compressedLength + " to " + mapOutput.getDescription());
            mapOutput.shuffle(host, is, compressedLength, decompressedLength,
                metrics, reporter);
          } catch (java.lang.InternalError e) {
            LOG.warn("Failed to shuffle for fetcher#"+id, e);
            throw new IOException(e);
          }
    最后在这些操作完成之后,为了防止内存中的空间被占用过大,或者磁盘中的小文件数太多,会进行一次merge和并操作,在最后的Shuffle类的merge.close()方法中会调用.

    // Start the map-output fetcher threads
        boolean isLocal = localMapFiles != null;
        final int numFetchers = isLocal ? 1 :
          jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
        Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
        if (isLocal) {
          fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
              merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
              localMapFiles);
          fetchers[0].start();
        } else {
          for (int i=0; i < numFetchers; ++i) {
            fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                           reporter, metrics, this, 
                                           reduceTask.getShuffleSecret());
            fetchers[i].start();
          }
        }
        
        .....
        // Finish the on-going merges...
        RawKeyValueIterator kvIter = null;
        try {
          kvIter = merger.close();
        } catch (Throwable e) {
          throw new ShuffleError("Error while doing final merge " , e);
        }

    @Override
      public RawKeyValueIterator close() throws Throwable {
        // Wait for on-going merges to complete
        if (memToMemMerger != null) { 
          memToMemMerger.close();
        }
        inMemoryMerger.close();
        onDiskMerger.close();
        
        List<InMemoryMapOutput<K, V>> memory = 
          new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
        inMemoryMergedMapOutputs.clear();
        memory.addAll(inMemoryMapOutputs);
        inMemoryMapOutputs.clear();
        List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
        onDiskMapOutputs.clear();
        return finalMerge(jobConf, rfs, memory, disk);
      }
    OK,以上操作的结束,就是整个Reduce Shuffle的过程操作.下面是一张简易的流程分析图


    其他方面代码的分析请点击链接https://github.com/linyiqun/hadoop-yarn,后续将会继续更新YARN其他方面的代码分析。


    参考源码

    Apach-hadoop-2.7.1(hadoop-hdfs-project)

  • 相关阅读:
    org.springframework.beans.factory.BeanCreationException
    线程浅谈
    第五次作业
    第四次作业
    第三次作业
    第二次作业
    第一次作业
    第五次作业
    第三次作业
    第二次作业
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183850.html
Copyright © 2011-2022 走看看