zoukankan      html  css  js  c++  java
  • hadoop知识整理(2)之MapReduce

    之前写的关于MR的文章的前半部分已丢。

    所以下面重点从3个部分来谈MR:

      1)Job任务执行过程,以及主要进程-ResourceManager和NodeManager作用;

      2)shuffle过程;

      3)主要代码;

    一、Job任务执行过程

      

      这里是hadoop2.0-ResourceManager的Job的执行过程:

      1)run job阶段,由提交Job客户端JVM完成,主要做job环境信息的收集,各个组件类,如Mapper、Reducer类,输出输入的K-V类型做检验是否合法,并且检验输入hdfs路径的合法性,还有输出hdfs目录是否已经存在,检测不通过,则Job停止。

      2)1阶段通过后,Job会获取一个Application对象,同时给一个应用ID,用于MapReduce的作业ID;

      3)再次检查输入输出目录的合法性,hdfs目录的合法性,计算作业的输入分片,如果分片无法计算,作业将不会提交,错误将返回给MR客户端程序,如果没有问题,将运行作业的所需资源,包括MR程序的JAR文件,配置文件以及输入分片,复制到一个以应用ID命名的hdfs目录下的共享文件系统中,JOB的jar的副本较多,所以在运行job时,集群的所有节点都可访问job的副本;

      4)MR客户端通过调用submitApplication()方法提交Job给RM;

      5)RM即资源管理器(ResourceManager)收到Job作业后,并将请求传递给YARN的调度器(scheduler),调度器会分配一个容器container,然后资源管理器在节点管理器(NodeManager)中启动application master进程;

      6)application master是一个java应用程序,主类为MRAppMaster,他将接收来自Job的进度和完成报告;

      7)application master对Job的初始化,是创建了很多薄记对象,以保持对于job进度的跟踪,然后他将从hdfs共享存储中获得由MR客户端计算的输入分片,然后对每一个split创建一个Map任务,以及确定有几个redece任务;

      8)分配资源,application master程序,会计算构成MR的job的所有任务,判断是在一个节点上进行还是多个节点进行并行计算,简单来说,通过MR的数量来将这个job定性为小任务还是超级(uber)任务;

        小job指的是,少于10个mapper且只有1个reducer,且输入大小小于一个HDFS块的job。通过设置mapreduce.job.ubertask.enable设置为true才可确保启动超级任务作业。

      如果非uber任务,application master会向资源管理器RM请求需要的所有容器资源;当然,请求中先为map任务请求,然后是reduce任务,通常,完成有5%的map任务完成之后,为reduce任务请求资源的信息才会发出; 

      reduce任务可以在集群的任何节点运行,但是map任务尽量本着本地化的策略在进行,尽量减少磁盘的IO操作,通常情况之下,每个map任务和reduce任务都会申请获得1核的cpu以及1GB的内存,参数可配。

      9)一旦RM分配了一个特定节点的容器,那么application master就与该nodeManager进行通信来启动容器;

      10)执行任务的主类为YarnChild,一个JAVA程序,在运行任务之前,首先将需要的资源本地化,从共享的hdfs中取得,包括作业的配置,jar包和其他所有缓存文件等等;

      11)执行MR任务。

      以上是整个Job的生命周期。

      

    ResourceManager(RM)
    RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。
    调度器 调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(Resource Container,简称Container)表示,Container是一个动态资源分配单位,它将内存、CPU资源封装在一起,从而限定每个任务使用的资源量。

    应用程序管理器(Applications Manager)负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。

    ApplicationMaster(AM)
    用户提交的每个应用程序均包含一个AM,主要功能包括:
    与RM调度器协商以获取资源(用Container表示);
    将得到的任务进一步分配给内部的任务(资源的二次分配);
    与NM通信以启动/停止任务;
    监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。

    NodeManager(NM)
    NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求。


    Container

    Container是YARN中的资源抽象,它封装了某个节点上的内存、CPU资源,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。

    二、shuffle

      MapReduce确保每个reducer的输入都是按照Key来进行排序的。系统执行排序,且将Map输出作为输入给Reduce的过程称之为shuffer。

       1)map端在输出时,会首先输出到一个内存缓冲区,英文名字为spill,他的默认大小为100M,可以理解为在内存中一部分首尾相连的内存区域,这个内存缓冲区的阈值为80%,可通过mapreduce.task.io.spill.percent参数改变,当map的输出达到阈值时,会把溢出的旧输出内容写入磁盘,新的输出继续往缓冲区去写,至于为什么是80%,是因为内存区的IO远快于物理磁盘的IO速度,所以在达到阈值时,开始溢写,如果spill写满时,仍未写到物理磁盘上,那么map会处于wait状态;

      2)而在将map输出结果写磁盘之前,会根据最后返回给reduce的数据划分成对应的分区,且在每个分区中,后台线程会按照key进行排序,这个时候如果存在一个conbiner,那么conbiner的函数redece是在排序后进行的。运行combiner会使map的输出结果更加紧凑,因此会减少写磁盘IO的压力。

      3)这时的疑问在于,当spill内存缓冲区不足以支撑map的输出时,那么会将输出溢写到本地磁盘中,那么map的输出会有多个磁盘文件,所以,在map任务完成之前,会对他们进行合并排序。

       至于存在combine的情况时, 任务会判断溢出文件的数量,假如溢出文件的数量大于3,那么有必要再对此进行一次combine操作,这个操作的时间是,map任务的最终输出准备向磁盘上写时。所以由此判断,combine可以在map任务中多次执行,也不会影响最终的结果,至于是否再次进行combine操作,那么由map来进行判断,通过溢出文件的数量来进行判断,其主要目的时判断进行combine带来的开销是否足够抵消IO磁盘操作。

      4)在这里,map任务的输出,对这个输出文件进行压缩,然后放到磁盘会更好,这是一个典型的节省磁盘IO的有效操作。这样同样可以减少通过网络IO传输给reduce的文件大小。压缩的配置参数为:mapreduce.map.output.compress设置为true,hadoop便会启动map结果的压缩功能。

      5)至于reduce任务,前面在分析job的执行过程的时候,知道有一个参数会影响application master向RM为reduce申请资源的时间,那便是map任务完成的比率,比率默认是5%,即有5%的map任务完成时,那么reduce任务将开始进行工作。在上图中,reduce是通过fetch(抓)过来map的输出结果,其实是通过网络通信将map的输出结果复制过来。reduce任务有少量的复制线程,默认值为5个,这5个线程可以从多个执行完毕的map任务中复制过来其输出结果。而这个线程的数量,可以通过mapreduce.reduce.shuffle.parallelcopies属性。

      6)这里的问题关键点在于,reduce任务如何得知map任务已经结束,且从哪里获得其输出结果?其实还在于强大的application master,全程负责所有任务的调度工作,当map任务完成后,会通过心跳机制,告知application master,而reduce任务一旦开启,也会有一个线程,不停轮询application master的map任务完成情况,这里推测,完成的map任务的网络主机情况,输出结果的磁盘存储情况,会保存在application master的一个对象中,大概率是一个(HashMap),而当reduce取得map的输出结果之后,并不会马上删除此map的结果释放资源,他会等待application master的通知,这是在整体job完成后执行的。

      7)下一步当reduce取得map任务的输出结果只会,需要进行的就是不停的merge工作。

      如果,map的输出结果非常小,那么直接在reduce任务的jvm内存中进行合并了,但往往这种情况并不会经常发生。

      当有很多个map的输出,且输出文件都比较大,redece会将map的输出结果复制到磁盘,如果磁盘上的副本太多了,那么reduce会将这些个文件合并成更大的文件,而之前在于被压缩的map输出,都会在内存中被解压缩。

      直到将所有的map复制完毕,那么下一步会进行真正的reduce合并操作。

      reduce合并这一块很有意思,hadoop为了减少磁盘的IO,做了很多构想,很巧妙。

      首先有一个指定参数,名字为合并因子,通过:mapreduce.task.io.sort.factor属性设置,默认为10。

      这个因子决定你的map输出数量合并多少次,假如有40个map的输出结果,那么将会合并4次。

      如上图所看,一共有40个map输出,那么hadoop不会每10个文件合并一次,将合并完成的4个文件交给reduce task。

      他会第一次合并4个文件形成s1,第二次、三次、四次分别合并10个文件形成s2和s3以及s4,然后它会将s1、s2、s3、s4以及剩下未合并的6个文件直接交给reduce函数。

      因为map输出结果本身为排序状态,这样操作可以减少了6个map结果文件的多一次通过磁盘IO进行合并的操作,而hadoop这样做,也只是为了减少磁盘IO,多用内存。

      而做完这个操作之后,reduce调用reduce函数,将输出结果复用到HDFS之前配置地输出目录当中。至此,shuffle结束。

    而MR任务的魔幻点就在于shuffle过程,他神奇地将乱序地文件,通过一系列map和reduce操作,通过强大地设计application master地控制中心,完美的完成了整理数据工作。

    所以,知晓了MR任务地执行过程和shuffle内容,那么MR任务地优化点也来了,不论是通过参数还是在编码中刻意进行改变,都会很好地优化MR。

       推测执行:application master会跟踪每个task的执行情况,当某个task执行过慢时,会创建出这个task的副本,从而进一步判定task是否存在执行失误的情况,假如副本task先行执行完成,那么会废掉原task。

          从中体现application master强大的task线程调度能力。而这个参数的配置方法为:mapreduce.map.speculative/reduce.speculative->true。

        这个推测执行功能有点过于屌了,但并不推荐使用,因为它是以整个集群的资源为代价的,应该根据具体情况开启此功能。

    参数优化点

      

    调优的总体纲领为:

      1)减少数据传输--》增加conbine操作和map输出压缩操作;

      2)尽量使用内存-》增加spill内存缓冲区的大小-增加map和reduce的jvm内存参数->mapred.child.java.opts,这个是task任务执行时的jvm内存大小;

      3)减少磁盘IO-》压缩map输出,减少reduce合并次数,即增大合并因子参数;

      4)增大任务并行数-》增加reduce的fetch数量,尽量更改此参数数量与map数量一致,达到并行抽取;

      5)剩下就是推测执行了,根据集群网络情况和机器性能进行调优操作。

     jvm调优,jvm重用机制:

      1)默认不允许JVM重用;

      2)一旦开启JVM的重用,所有的task都将在一个jvm中执行,简单表达,即是,所有的包括application master、map task、reduce task都会在一个jvm-container中执行;

      3)此种情况适用于小的MR任务,默认为10个及其以下的map任务,1个的reduce任务,且reduce输入大小为小于一个hdfs文件块的任务。

      4)此种情况依旧适用于海量小文件的情况,减少jvm的频繁启停;

    对于海量的小文件,应该将多个小文件处理成为一个文件,以减少map的任务数量。

    三、部分源码,主要代码

      MapTask部分,即map小任务部分:

      启动map任务,调用的是其中的run方法

    @Override
      public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
        throws IOException, ClassNotFoundException, InterruptedException {
        this.umbilical = umbilical;
    
        if (isMapTask()) {
          // If there are no reducers then there won't be any sort. Hence the map 
          // phase will govern the entire attempt's progress.
          if (conf.getNumReduceTasks() == 0) {
            mapPhase = getProgress().addPhase("map", 1.0f);
          } else {
            // If there are reducers then the entire attempt's progress will be 
            // split between the map phase (67%) and the sort phase (33%).
            mapPhase = getProgress().addPhase("map", 0.667f);
            sortPhase  = getProgress().addPhase("sort", 0.333f);
          }
        }
        TaskReporter reporter = startReporter(umbilical);
     
        boolean useNewApi = job.getUseNewMapper();
        initialize(job, getJobID(), reporter, useNewApi);
    
        // check if it is a cleanupJobTask
        if (jobCleanup) {
          runJobCleanupTask(umbilical, reporter);
          return;
        }
        if (jobSetup) {
          runJobSetupTask(umbilical, reporter);
          return;
        }
        if (taskCleanup) {
          runTaskCleanupTask(umbilical, reporter);
          return;
        }
    
        if (useNewApi) {
          runNewMapper(job, splitMetaInfo, umbilical, reporter);
        } else {
          runOldMapper(job, splitMetaInfo, umbilical, reporter);
        }
        done(umbilical, reporter);
      }

      这里启动了taskReporter,向application master报告执行情况,并且初始化了整个Job任务,且在2.0中,调用了runNewMapper方法;

      

      @SuppressWarnings("unchecked")
      private <INKEY,INVALUE,OUTKEY,OUTVALUE>
      void runNewMapper(final JobConf job,
                        final TaskSplitIndex splitIndex,
                        final TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
                        ) throws IOException, ClassNotFoundException,
                                 InterruptedException {
        // make a task context so we can get the classes
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
          new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                      getTaskID(),
                                                                      reporter);
        // make a mapper
        org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
          (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
            ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
        // make the input format
        org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
          (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
            ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
        // rebuild the input split
        org.apache.hadoop.mapreduce.InputSplit split = null;
        split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
            splitIndex.getStartOffset());
        LOG.info("Processing split: " + split);
    
        org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
          new NewTrackingRecordReader<INKEY,INVALUE>
            (split, inputFormat, reporter, taskContext);
        
        job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
        org.apache.hadoop.mapreduce.RecordWriter output = null;
        
        // get an output object
        if (job.getNumReduceTasks() == 0) {
          output = 
            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
        } else {
          output = new NewOutputCollector(taskContext, job, umbilical, reporter);
        }
    
        org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
        mapContext = 
          new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
              input, output, 
              committer, 
              reporter, split);
    
        org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
            mapperContext = 
              new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
                  mapContext);
    
        try {
          input.initialize(split, mapperContext);
          mapper.run(mapperContext);
          mapPhase.complete();
          setPhase(TaskStatus.Phase.SORT);
          statusUpdate(umbilical);
          input.close();
          input = null;
          output.close(mapperContext);
          output = null;
        } finally {
          closeQuietly(input);
          closeQuietly(output, mapperContext);
        }
      }
    View Code

    在代码中可见, 在这个方法中解析了job中的计算出的,InputSplit信息,这里面封装了所有的map文件的切片信息,而InputSplit对象的初始化,由 private <T> T getSplitDetails(Path file, long offset)方法获得。

    这个方法里去获取 T split = deserializer.deserialize(null);切片信息,而切片信息,又通过AvroSerialization获得,代码如下,现在就可以串起来,Job客户端从RM获取了一个输入流,而这个输入流中存储了map所需输入文件的切片信息,类似上文讲的,从hdfs文件系统中下载文件的过程之一,先从NN节点获取文件的切片信息:

    @Override
        public T deserialize(T t) throws IOException {
          return reader.read(t, decoder);
        }
    View Code

     InputSplit中包含了切片信息,拿到本map任务需要的切片后,通过RecordReader,获取文件内容,然后反射调用程序员缩写的Mapper类。此项代码在runNewMapper方法的第722行:

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
          (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
            ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    View Code

    然后在最后有如下代码:

    try {
          input.initialize(split, mapperContext);
          mapper.run(mapperContext);
          mapPhase.complete();
          setPhase(TaskStatus.Phase.SORT);
          statusUpdate(umbilical);
          input.close();
          input = null;
          output.close(mapperContext);
          output = null;
        } finally {
          closeQuietly(input);
          closeQuietly(output, mapperContext);
        }
    View Code

    这里,初始化输入文件切片,然后run程序员写的mapper再之后输出结果,关闭资源。

    需要注意的是,NewOutputCollector这个在上文代码中的作用:

    他会每次收集调用map新的kv对,然后将他们spill到内存或者文件中,还可以做进一步的partition和sort和combine操作,当存在reduce的时候,此类代码如下:

      private class NewOutputCollector<K,V>
        extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
        private final MapOutputCollector<K,V> collector;
        private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
        private final int partitions;
    
        @SuppressWarnings("unchecked")
        NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                           JobConf job,
                           TaskUmbilicalProtocol umbilical,
                           TaskReporter reporter
                           ) throws IOException, ClassNotFoundException {
          collector = createSortingCollector(job, reporter);
          partitions = jobContext.getNumReduceTasks();
          if (partitions > 1) {
            partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
              ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
          } else {
            partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
              @Override
              public int getPartition(K key, V value, int numPartitions) {
                return partitions - 1;
              }
            };
          }
        }
    
        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
          collector.collect(key, value,
                            partitioner.getPartition(key, value, partitions));
        }
    
        @Override
        public void close(TaskAttemptContext context
                          ) throws IOException,InterruptedException {
          try {
            collector.flush();
          } catch (ClassNotFoundException cnf) {
            throw new IOException("can't find class ", cnf);
          }
          collector.close();
        }
      }
    View Code

    还有一个MapOutputBuffer需要注意,他是在实例化NewOutputCollector时被创建的:

    构造方法:

     private final MapOutputCollector<K,V> collector;

    collector = createSortingCollector(job, reporter);

    然后在

     

    创建了这个buffer对象,在这个对象中

     

    ReduceTask部分源码:

      

  • 相关阅读:
    leetcode[164] Maximum Gap
    leetcode[162] Find Peak Element
    leetcode[160] Intersection of Two Linked Lists
    leetcode[156] binary tree upside down
    leetcode[155] Min Stack
    leetcode Find Minimum in Rotated Sorted Array II
    leetcode Find Minimum in Rotated Sorted Array
    leetcode Maximum Product Subarray
    ROP
    windbg bp condition
  • 原文地址:https://www.cnblogs.com/qfxydtk/p/11167437.html
Copyright © 2011-2022 走看看