zoukankan      html  css  js  c++  java
  • Reduce Task的学习笔记

                MapReduce五大过程已经分析过半了,上次分析完Map的过程,着实花费了我的很多时间,不过收获很大,值得了额,这次用同样的方法分析完了Reduce的过程,也算是彻底摸透了MapReduce思想的2个最最重要的思想了吧。好,废话不多,切入正题,在学习Reduce过程分析的之前,我特意查了书籍上或网络上相关的资料,我发现很大都是大同小异,缺乏对于源码的参照分析,所以我个人认为,我了可以在某些细节上讲得跟明白些,也许会比较好。因为Map和Reduce的过程的整体流程是非常相近的,如果你看过之前我写的Map Task的分析,相信你也能很快理解我的Reduce过程的分析的。Reduce过程的集中表现体现于Reduce Task中,Reduce Task与Map Reduce一样,分为Job-setup Task,  Job-cleanup Task, Task-cleanup Task和Reduce Task。我分析的主要是最后一个Reduce Task 。Reduce Task 主要分为5个阶段:

    Shuffle------------------->Merge------------------->Sort------------------->Reduce------------------->Write

    其中最重要的部分为前3部分,我也会花最多的时间描述前面3个阶段的任务。

           Shuffle阶段。我们知道,Reduce的任务在最最开始的时候,就是接收Map任务中输出的中间结果的数据,key-value根据特定的分区算法,给相应的Reduce任务做处理,所以这时需要Reduce任务去远程拷贝Map输出的中间数据了,这个过程就称作Shuffle阶段,所以这个阶段也称为Copy阶段。在Shuffle阶段中,有个GetMapEventsThread,会定期发送RPC请求,获取远程执行好的Map Task的列表,把他们的输出location映射到mapLocation中。

    ....
            	//GetMapEventsThread线程是远程调用获得已经完成的Map任务的列表
                int numNewMaps = getMapCompletionEvents();
                if (LOG.isDebugEnabled()) {
                  if (numNewMaps > 0) {
                    LOG.debug(reduceTask.getTaskID() + ": " +  
                        "Got " + numNewMaps + " new map-outputs"); 
                  }
                }
                Thread.sleep(SLEEP_TIME);
              } 
    进入getMapCompletionEvents方法,继续看:

    ...
            for (TaskCompletionEvent event : events) {
              switch (event.getTaskStatus()) {
                case SUCCEEDED:
                {
                  URI u = URI.create(event.getTaskTrackerHttp());
                  String host = u.getHost();
                  TaskAttemptID taskId = event.getTaskAttemptId();
                  URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
                                          "/mapOutput?job=" + taskId.getJobID() +
                                          "&map=" + taskId + 
                                          "&reduce=" + getPartition());
                  List<MapOutputLocation> loc = mapLocations.get(host);
                  if (loc == null) {
                    loc = Collections.synchronizedList
                      (new LinkedList<MapOutputLocation>());
                    mapLocations.put(host, loc);
                   }
                  //loc中添加新的已经完成的,mapOutputLocation,mapLocations是全局共享的
                  loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
                  numNewMaps ++;
                }
                break;
                ....
    为了避免出现网络热点,Reduce Task对输出的位置进行了混洗的操作,然后保存到scheduleCopies中,后续的拷贝操作都是围绕着这个列表进行的。这个变量保存在了一个叫ReduceCopier的类里面。确认拷贝的目标位置,还只是Shuffle阶段的前半部分,这时看一下,执行的入口代码在哪里。回到Reduce Task的入口run()代码:

    public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
        throws IOException, InterruptedException, ClassNotFoundException {
        this.umbilical = umbilical;
        job.setBoolean("mapred.skip.on", isSkipping());
    
        if (isMapOrReduce()) {
          //设置不同阶段任务的进度
          copyPhase = getProgress().addPhase("copy");
          sortPhase  = getProgress().addPhase("sort");
          reducePhase = getProgress().addPhase("reduce");
        }
        // start thread that will handle communication with parent
        //创建Task任务报告,与父进程进行联系沟通
        TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
            jvmContext);
        reporter.startCommunicationThread();
        //判断是否使用的是新的额API
        boolean useNewApi = job.getUseNewReducer();
        initialize(job, getJobID(), reporter, useNewApi);
    
        // check if it is a cleanupJobTask
        //和map任务一样,Task有4种,Job-setup Task, Job-cleanup Task, Task-cleanup Task和ReduceTask
        if (jobCleanup) {
          //这里执行的是Job-cleanup Task
          runJobCleanupTask(umbilical, reporter);
          return;
        }
        if (jobSetup) {
          //这里执行的是Job-setup Task
          runJobSetupTask(umbilical, reporter);
          return;
        }
        if (taskCleanup) {
          //这里执行的是Task-cleanup Task
          runTaskCleanupTask(umbilical, reporter);
          return;
        }
        
        /*  后面的内容就是开始执行Reduce的Task */
        
        // Initialize the codec
        codec = initCodec();
    
        boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
        if (!isLocal) {
          reduceCopier = new ReduceCopier(umbilical, job, reporter);
          if (!reduceCopier.fetchOutputs()) {
        	  ......
    到了reduceCopier.fetchOutps()这里必须停一步了,因为后面的Shuffle阶段和Merge阶段都在这里实现:

    /**
         * 开启n个线程远程拷贝Map中的输出数据
         * @return
         * @throws IOException
         */
        public boolean fetchOutputs() throws IOException {
          int totalFailures = 0;
          int            numInFlight = 0, numCopied = 0;
          DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
          final Progress copyPhase = 
            reduceTask.getProgress().phase();
          //单独的线程用于对本地磁盘的文件进行定期的合并
          LocalFSMerger localFSMergerThread = null;
          //单独的线程用于对内存上的文件进行进行定期的合并
          InMemFSMergeThread inMemFSMergeThread = null;
          GetMapEventsThread getMapEventsThread = null;
          
          for (int i = 0; i < numMaps; i++) {
            copyPhase.addPhase();       // add sub-phase per file
          }
          
          //建立拷贝线程列表容器
          copiers = new ArrayList<MapOutputCopier>(numCopiers);
          
          // start all the copying threads
          for (int i=0; i < numCopiers; i++) {
        	//新建拷贝线程,逐一开启拷贝线程
            MapOutputCopier copier = new MapOutputCopier(conf, reporter, 
                reduceTask.getJobTokenSecret());
            copiers.add(copier);
            //添加到列表容器中,并开启此线程
            copier.start();
          }
          
          //start the on-disk-merge thread
          localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
          //start the in memory merger thread
          inMemFSMergeThread = new InMemFSMergeThread();
          //定期合并的2个线程也开启,也就是说copy阶段和merge阶段是并行操作的
          localFSMergerThread.start();
          inMemFSMergeThread.start();
          
          // start the map events thread
          getMapEventsThread = new GetMapEventsThread();
          getMapEventsThread.start();
          .....
    在上面的代码中出现很多陌生的Thread的定义,这个可以先不用管,我们发现getMapEventsThread就是在这里开启的,去获取了最新的位置,位置获取完成当然是要启动很多的拷贝线程了,这里叫做MapOutputCopier线程,作者是把他放入一个线程列表中,逐个开启。看看里面的具体实现,他是如何进行远程拷贝的呢。

    @Override
          public void run() {
            while (true) {        
              try {
                MapOutputLocation loc = null;
                long size = -1;
                
                synchronized (scheduledCopies) {
                  //从scheduledCopies列表中获取获取map Task的输出数据的位置
                  while (scheduledCopies.isEmpty()) {
                	//如果scheduledCopies我空,则等待
                    scheduledCopies.wait();
                  }
                  //获取列表中的第一个数据作为拷贝的地址
                  loc = scheduledCopies.remove(0);
                }
               
                CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
                readError = false;
                try {
                  shuffleClientMetrics.threadBusy();
                  //标记当前的map输出位置为loc
                  start(loc);
                  //进行只要的copy操作,返回拷贝字节数的大小
                  size = copyOutput(loc);
                  shuffleClientMetrics.successFetch();
                  //如果进行到这里,说明拷贝成功吗,标记此error的标记为NO_ERROR
                  error = CopyOutputErrorType.NO_ERROR;
                } catch (IOException e) {
                  //抛出异常,做异常处理
                  ....
    从location列表中去取出,然后进行拷贝操作,核心方法在copyOutput(),接着往里跟踪:

    .....
            // Copy the map output
            //根据loc Map任务的数据输出位置,进行RPC的拷贝
            MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
                                               reduceId.getTaskID().getId());
    继续往里:

    private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
                                         Path filename, int reduce)
          throws IOException, InterruptedException {
            // Connect
        	//打开url资源定位符的连接
            URL url = mapOutputLoc.getOutputLocation();
            URLConnection connection = url.openConnection();
            
            //得到远程数据的输入流
            InputStream input = setupSecureConnection(mapOutputLoc, connection);
     
            ......
            //We will put a file in memory if it meets certain criteria:
            //1. The size of the (decompressed) file should be less than 25% of 
            //    the total inmem fs
            //2. There is space available in the inmem fs
            
            // Check if this map-output can be saved in-memory
            //向ShuffleRamManager申请内存存放拷贝的数据,判断内存是否内存是否装得下,装不下则放入DISK磁盘
            boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 
    
            // Shuffle
            MapOutput mapOutput = null;
            if (shuffleInMemory) {
              if (LOG.isDebugEnabled()) {
                LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                    compressedLength + " raw bytes) " + 
                    "into RAM from " + mapOutputLoc.getTaskAttemptId());
              }
    
              //如果内存装得下,则将输入流中的数据放入内存
              mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
                                          (int)decompressedLength,
                                          (int)compressedLength);
            } else {
              if (LOG.isDebugEnabled()) {
                LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                    compressedLength + " raw bytes) " + 
                    "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
              }
              
              //装不下,则放入文件中
              mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
                  compressedLength);
            }
                
            return mapOutput;
          }
    
    在这里我们看到了,Hadoop通过URL资源定位符,获取远程输入流,进行操作的,在拷贝到本地的时候,还分了2种情况处理,当当前的内存能方得下当前数据的时候,放入内存中,放不下则写入到磁盘中。这里还出现了ShuffleRamManager的用法。至此,Shuffle阶段宣告完成。还是比较深的,一层,又一层的。

           Merger阶段。Merge阶段其实是和Shuffle阶段并行进行的,刚刚也看到了,在fetchOutputs中,这些相关进程都是同时开启的,

    public boolean fetchOutputs() throws IOException {
          int totalFailures = 0;
          int            numInFlight = 0, numCopied = 0;
          DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
          final Progress copyPhase = 
            reduceTask.getProgress().phase();
          //单独的线程用于对本地磁盘的文件进行定期的合并
          LocalFSMerger localFSMergerThread = null;
          //单独的线程用于对内存上的文件进行进行定期的合并
          InMemFSMergeThread inMemFSMergeThread = null;
          ....
    Merge的主要工作就是合并数据,当内存中或者磁盘中的文件比较多的时候,将小文件进行合并变成大文件。挑出其中的一个run方法
    ....
          public void run() {
            LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
            try {
              boolean exit = false;
              do {
                exit = ramManager.waitForDataToMerge();
                if (!exit) {
                  //进行内存merger操作
                  doInMemMerge();
    目的非常明确,就是Merge操作,这是内存文件的合并线程的run方法,LocalFSMerger与此类似,不分析了。这个Mergr处理是并与Shuffle阶段的。在这里这2个阶段都完成了。还是有点复杂的。下面是相关的一些类关系图,主要要搞清4个线程是什么作用的。


    4个线程的调用都是在ReduceCopier.fetchOutput()方法中进行的。在Shuffle,Merge阶段的后面就来到了,Sort阶段。

           Sort阶段,的任务和轻松,就是完成一次对内存和磁盘总的一次Merge合并操作,其中还会对其中进行一次sort排序操作。

    ....
        //标识copy操作已经完成
        copyPhase.complete();                         // copy is already complete
        setPhase(TaskStatus.Phase.SORT);
        statusUpdate(umbilical);
    
        //进行内存和磁盘中的总的merge阶段的操作,Sort包含其中执行
        final FileSystem rfs = FileSystem.getLocal(job).getRaw();
        RawKeyValueIterator rIter = isLocal
          ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
              job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
              !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
              new Path(getTaskID().toString()), job.getOutputKeyComparator(),
              reporter, spilledRecordsCounter, null)
          : reduceCopier.createKVIterator(job, rfs, reporter);
    那么Sort操作在哪里呢,就在最下面的createKVIterator中:

    private RawKeyValueIterator createKVIterator(
            JobConf job, FileSystem fs, Reporter reporter) throws IOException {
    
          .....
          //在Merge阶段对所有的数据进行归并排序
          Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
            public int compare(Segment<K, V> o1, Segment<K, V> o2) {
              if (o1.getLength() == o2.getLength()) {
                return 0;
              }
              return o1.getLength() < o2.getLength() ? -1 : 1;
            }
          });
    
          // build final list of segments from merged backed by disk + in-mem
          List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
    ,Sort阶段的任务就是这么简单。下面看一下前3个阶段主要的执行流程,这3个阶段构成了Reduce Task的核心。



           Reduce阶段,跟随这个图的执行方向,接下来我们应该执行的是key-value的reduce()函数了,没错就是循环键值对,执行此函数

    ....
        //判断执行的是新的API还是旧的API
        if (useNewApi) {
          runNewReducer(job, umbilical, reporter, rIter, comparator, 
                        keyClass, valueClass);
        } else {
          runOldReducer(job, umbilical, reporter, rIter, comparator, 
                        keyClass, valueClass);
        }
    在这里我们执行的就是runReducer方法了,我们往老的API跳:

      private <INKEY,INVALUE,OUTKEY,OUTVALUE>
      void runOldReducer(JobConf job,
                         TaskUmbilicalProtocol umbilical,
                         final TaskReporter reporter,
                         RawKeyValueIterator rIter,
                         RawComparator<INKEY> comparator,
                         Class<INKEY> keyClass,
                         Class<INVALUE> valueClass) throws IOException {
        Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = 
          ReflectionUtils.newInstance(job.getReducerClass(), job);
        // make output collector
        String finalName = getOutputName(getPartition());
    
        //获取输出的key,value
        final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
            reduceOutputCounter, job, reporter, finalName);
        
        OutputCollector<OUTKEY,OUTVALUE> collector = 
          new OutputCollector<OUTKEY,OUTVALUE>() {
            public void collect(OUTKEY key, OUTVALUE value)
              throws IOException {
              //将处理后的key,value写入输出流中,最后写入HDFS作为最终结果
              out.write(key, value);
              // indicate that progress update needs to be sent
              reporter.progress();
            }
          };
        
        // apply reduce function
        try {
          //increment processed counter only if skipping feature is enabled
          boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
            SkipBadRecords.getAutoIncrReducerProcCount(job);
          
          //判断是否为跳过错误记录模式
          ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ? 
              new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter, 
                  comparator, keyClass, valueClass, 
                  job, reporter, umbilical) :
              new ReduceValuesIterator<INKEY,INVALUE>(rIter, 
              job.getOutputValueGroupingComparator(), keyClass, valueClass, 
              job, reporter);
          values.informReduceProgress();
          while (values.more()) {
            reduceInputKeyCounter.increment(1);
            //Record迭代器中获取每一对,执行用户定义的Reduce函数,此阶段为Reduce阶段
            reducer.reduce(values.getKey(), values, collector, reporter);
            if(incrProcCount) {
              reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
                  SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
            }
            //获取下一个key,value
            values.nextKey();
            values.informReduceProgress();
          }
         //...
    和Map Task的过程很类似,也正如我们预期的那样,循环迭代执行,这就是Reduce阶段。

            Write阶段。Write阶段是最后一个阶段,在用户自定义的reduce中,一般用户都会调用collect.collect方法,这时候就是写入的操作了。这时的写入就是将最后的结果写入HDFS作为最终结果了。这里先定义了OutputCollector的collect方法:

    OutputCollector<OUTKEY,OUTVALUE> collector = 
          new OutputCollector<OUTKEY,OUTVALUE>() {
            public void collect(OUTKEY key, OUTVALUE value)
              throws IOException {
              //将处理后的key,value写入输出流中,最后写入HDFS作为最终结果
              out.write(key, value);
              // indicate that progress update needs to be sent
              reporter.progress();
            }
          };
    至此,完成了Reduce任务的所有阶段。下面是一张时序图,便于理解:


    掌握了Map ,Reduce2个过程核心实现的过程将会帮助我们更加理解Hadoop作业运行的整个流程。整个分析的过程也许会有点枯燥,但是苦中作乐。


  • 相关阅读:
    Civil 3D 二次开发 创建Civil 3D 对象—— 01 —— 创建几何空间点
    Civil 3D 二次开发 创建Civil 3D 对象—— 00 ——
    Civil 3D 二次开发 创建AutoCAD对象—— 01 —— 创建直线
    Civil 3D 二次开发 新建CLR项目出现错误C2143
    Civil 3D 二次开发 创建AutoCAD对象—— 00 ——
    了解AutoCAD对象层次结构 —— 6 ——块表记录
    datepicker97使用
    使用angular 外接 templateUrl,使用ng-include
    angularJs 遮罩
    网上找的有关css兼容问题
  • 原文地址:https://www.cnblogs.com/bianqi/p/12184163.html
Copyright © 2011-2022 走看看