zoukankan      html  css  js  c++  java
  • hadoop2 作业执行过程之reduce过程


    reduce阶段就是处理map的输出数据,大部分过程和map差不多


      1 //ReduceTask.run方法开始和MapTask类似,包括initialize()初始化,根据情况看是否调用runJobCleanupTask(),
      2   //runJobSetupTask(),runTaskCleanupTask()。之后进入正式的工作,主要有这么三个步骤:Copy、Sort、Reduce。
      3   @Override
      4   @SuppressWarnings("unchecked")
      5   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
      6     throws IOException, InterruptedException, ClassNotFoundException {
      7     this.umbilical = umbilical;
      8     job.setBoolean("mapred.skip.on", isSkipping());
      9     /*添加reduce过程需要经过的几个阶段。以便通知TaskTracker目前运   行的情况*/ 
     10     if (isMapOrReduce()) {
     11       copyPhase = getProgress().addPhase("copy");
     12       sortPhase  = getProgress().addPhase("sort");
     13       reducePhase = getProgress().addPhase("reduce");
     14     }
     15     // start thread that will handle communication with parent
     16  // 设置并启动reporter进程以便和TaskTracker进行交流 
     17     TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
     18         jvmContext);
     19     reporter.startCommunicationThread();
     20     //在job client中初始化job时,默认就是用新的API,详见Job.setUseNewAPI()方法
     21     boolean useNewApi = job.getUseNewReducer();        
     22     /*用来初始化任务,主要是进行一些和任务输出相关的设置,比如创建commiter,设置工作目录等*/ 
     23     initialize(job, getJobID(), reporter, useNewApi);//这里将会处理输出目录
     24     /*以下4个if语句均是根据任务类型的不同进行相应的操作,这些方 法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关*/ 
     25     // check if it is a cleanupJobTask
     26     if (jobCleanup) {
     27       runJobCleanupTask(umbilical, reporter);
     28       return;
     29     }
     30     if (jobSetup) {
     31         //主要是创建工作目录的FileSystem对象
     32       runJobSetupTask(umbilical, reporter);
     33       return;
     34     }
     35     if (taskCleanup) {
     36          //设置任务目前所处的阶段为结束阶段,并且删除工作目录 
     37       runTaskCleanupTask(umbilical, reporter);
     38       return;
     39     }
     40     
     41     // Initialize the codec
     42     codec = initCodec();
     43 
     44     boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));  //判断是否是单机hadoop
     45     if (!isLocal) {
     46         //1. Copy.就是从执行各个Map任务的服务器那里,收到map的输出文件。拷贝的任务,是由ReduceTask.ReduceCopier 类来负责。
     47         //ReduceCopier对象负责将Map函数的输出拷贝至Reduce所在机器 
     48       reduceCopier = new ReduceCopier(umbilical, job, reporter);
     49       if (!reduceCopier.fetchOutputs()) {////fetchOutputs函数负责拷贝各个Map函数的输出 
     50         if(reduceCopier.mergeThrowable instanceof FSError) {
     51           throw (FSError)reduceCopier.mergeThrowable;
     52         }
     53         throw new IOException("Task: " + getTaskID() + 
     54             " - The reduce copier failed", reduceCopier.mergeThrowable);
     55       }
     56     }
     57     copyPhase.complete();                         // copy is already complete
     58     setPhase(TaskStatus.Phase.SORT);
     59     statusUpdate(umbilical);
     60 
     61     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
     62     //2.Sort(其实相当于合并).排序工作,就相当于上述排序工作的一个延续。它会在所有的文件都拷贝完毕后进行。
     63     //使用工具类Merger归并所有的文件。经过这一个流程,一个合并了所有所需Map任务输出文件的新文件产生了。
     64     //而那些从其他各个服务器网罗过来的 Map任务输出文件,全部删除了。
     65     
     66     //根据hadoop是否分布式来决定调用哪种排序方式 
     67     RawKeyValueIterator rIter = isLocal
     68       ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
     69           job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
     70           !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
     71           new Path(getTaskID().toString()), job.getOutputKeyComparator(),
     72           reporter, spilledRecordsCounter, null)
     73       : reduceCopier.createKVIterator(job, rfs, reporter);
     74         
     75     // free up the data structures
     76     mapOutputFilesOnDisk.clear();
     77     
     78     sortPhase.complete();                         // sort is complete
     79     setPhase(TaskStatus.Phase.REDUCE); 
     80     statusUpdate(umbilical);
     81     //3.Reduce 1.Reduce任务的最后一个阶段。它会准备好Map的 keyClass("mapred.output.key.class"或"mapred.mapoutput.key.class"), 
     82     //valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")
     83     //和 Comparator (“mapred.output.value.groupfn.class”或 “mapred.output.key.comparator.class”)
     84     Class keyClass = job.getMapOutputKeyClass();
     85     Class valueClass = job.getMapOutputValueClass();
     86     RawComparator comparator = job.getOutputValueGroupingComparator();
     87     //2.根据参数useNewAPI判断执行runNewReduce还是runOldReduce。分析润runNewReduce
     88     if (useNewApi) {
     89         //3.runNewReducer
     90         //0.像报告进程书写一些信息
     91         //1.获得一个TaskAttemptContext对象。通过这个对象创建reduce、output及用于跟踪的统计output的RecordWrit、最后创建用于收集reduce结果的Context
     92         //2.reducer.run(reducerContext)开始执行reduce
     93       runNewReducer(job, umbilical, reporter, rIter, comparator, 
     94                     keyClass, valueClass);
     95     } else {
     96       runOldReducer(job, umbilical, reporter, rIter, comparator, 
     97                     keyClass, valueClass);
     98     }
     99     done(umbilical, reporter);
    100   }

    1.reduce过程中三个大的阶段比较重要:Copy、Sort、Reduce;

    2.codec = initCodec()这句是检查map的输出是否是压缩的,压缩的则返回压缩codec实例,否则返回null,这里讨论不压缩的;

    3.实际中使用完全分布式的hadoop,即isLocal==false,然后构造一个ReduceCopier对象reduceCopier,并调用reduceCopier.fetchOutputs()方法拷贝各个Mapper的输出,到本地;

    4.done(umbilical, reporter)这个方法用于做结束任务的一些清理工作:更新计数器updateCounters();如果任务需要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交,然后调用commit提交任务;设置任务结束标志位;结束Reporter通信线程;发送最后一次统计报告(通过sendLastUpdate方法);利用TaskUmbilicalProtocol报告结束状态(通过sendDone方法)。


  • 相关阅读:
    顺序容器删除元素 vector list deque
    smart_pointer example
    c++ new bad_alloc
    mat工具MemoryAnalyzer进行分析java内存溢出hprof文件
    centos7.2环境elasticsearch-5.0.1+kibana-5.0.1+zookeeper3.4.6+kafka_2.9.2-0.8.2.1部署详解
    配置zabbix当内存剩余不足10%的时候触发报警
    zabbix通过curl命令判断web服务是否正常并自动重启服务
    日志分析工具ELK配置详解
    zabbix3.0.4监控mysql主从同步
    使用percona-xtrabackup实现对线上zabbix监控系统数据库mariadb5.5.47的主从同步
  • 原文地址:https://www.cnblogs.com/admln/p/hadoop2-work-excute-reduce.html
Copyright © 2011-2022 走看看