zoukankan      html  css  js  c++  java
  • hadoop核心逻辑shuffle代码分析-map端

    首先要推荐一下:http://www.alidata.org/archives/1470

    阿里的大牛在上面的文章中比较详细的介绍了shuffle过程中mapper和reduce的每个过程,强烈推荐先读一下。

     

    不过,上文没有写明一些实现的细节,比如:spill的过程,mapper生成文件的 partition是怎么做的等等,相信有很多人跟我一样在看了上面的文章后还是有很多疑问,我也是带着疑问花了很久的看了cdh4.1.0版本 shuffle的逻辑,整理成本文,为以后回顾所用。

     

     首先用一张图展示下map的流程:

     
    在上图中,我们假设此次mapreduce有多个mapper和2个reducer,p0 p1分别代表该数据应该分配到哪个reducer端。我将mapper的过程大致分为5个过程。
     
    1.prepare Input。
    Mapreduce程序都需要指定输入文件,输入的格式有很多种,最常见的是保存 在hdfs上的文本文件。在用户提交job到jobtrack(ResourceManager)前的job就会根据用户的输入文件计算出需要多少 mapper,多少reducer,mapper的输入InputSplit有多大,block块名称等。mapper在prepare input阶段只需要根据inputFormat类型创建对应的RecordReader打开对应的inputSplit分片即可。如果job配置了 combiner还需初始化combiner。代码见MapTask类run方法
     
    2.mapper process
    这里的mapper指用户使用或自己继承的mapper类,这也是所有初学mapreduce的同学首先看到的类。
    1. <span style="font-size:18px;">  /** 
    2.    * Called once for each key/value pair in the input split. Most applications 
    3.    * should override this, but the default is the identity function. 
    4.    */  
    5.   @SuppressWarnings("unchecked")  
    6.   protected void map(KEYIN key, VALUEIN value,   
    7.                      Context context) throws IOException, InterruptedException {  
    8.     context.write((KEYOUT) key, (VALUEOUT) value);  
    9.   }  
    10. </span>  
    可以看到mapper默认的map方法就是取出key,value并放到context对象中。context对象包装了一个内存中的buf,下面会介绍。
    1. <span style="font-size:18px;">public void run(Context context) throws IOException, InterruptedException {  
    2.     setup(context);  
    3.     while (context.nextKeyValue()) {  
    4.       map(context.getCurrentKey(), context.getCurrentValue(), context);  
    5.     }  
    6.     cleanup(context);  
    7.   }</span>  
    run方法就是mapper实际运行的过程:不停的从context的inputSplit对象中取出keyvalue对,通过map方法处理再保存到context包装的内存buf中。
     
    3.buffer in memery
    key value在写入context中后实际是写入MapOutputBuffer类中。在第一个阶段的初始化过程中,MapOutputBuffer类会根据配置文件初始化内存buffer,我们来看下都有哪些参数:
    1. <span style="font-size:18px;">partitions = job.getNumReduceTasks();  
    2.       rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();  
    3.   
    4.       //sanity checks  
    5.       final float spillper =  
    6.         job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);  
    7.       final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);  
    8.       indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,  
    9.                                          INDEX_CACHE_MEMORY_LIMIT_DEFAULT);  
    10.       if (spillper > (float)1.0 || spillper <= (float)0.0) {  
    11.         throw new IOException("Invalid "" + JobContext.MAP_SORT_SPILL_PERCENT +  
    12.             "": " + spillper);  
    13.       }  
    14.       if ((sortmb & 0x7FF) != sortmb) {  
    15.         throw new IOException(  
    16.             "Invalid "" + JobContext.IO_SORT_MB + "": " + sortmb);  
    17.       }  
    18.       sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",  
    19.             QuickSort.class, IndexedSorter.class), job);</span>  
    partition:mapper的数据需要分配到reduce端的个数,由用户的job指定,默认为1.
    spillper:内存buf使用到此比例就会触发spill,将内存中的数据flush成一个文件。默认为0.8
    sortmb:内存buf的大小,默认100MB
    indexCacheMemoryLimit:内存index的大小。默认为1024*1024
    sorter:对mapper输出的key的排序,默认是快排
     
    内存buffer比较复杂,贴一张图介绍一下这块内存buf的结构:
    当一对keyvalue写入时首先会从wrap buf的右侧开始往左写,同时,会把一条keyvalue的meta信息(partition,keystart,valuestart)写入到最左边的 index区域。当wrap buf大小达到spill的触发比例后会block写入,挖出一部分数据开始spill,直到spill完成后才能继续写,不过写入位置不会置零,而是类 似循环buf那样,在spill掉数据后可以重复利用内存中的buf区域。
     
    这里单独讲一下partition:
    1. <span style="font-size:18px;">@Override  
    2.     public void write(K key, V value) throws IOException, InterruptedException {  
    3.       collector.collect(key, value,  
    4.                         partitioner.getPartition(key, value, partitions));  
    5.     }</span>  

    在keyvalue对写入MapOutputBuffer时会调用 partitioner.getPartition方法计算partition即应该分配到哪个reducer,这里的partition只是在内存的 buf的index区写入一条记录而已,和下一个部分的partition不一样哦。看下默认的partitioner:HashPartition

    1. <span style="font-size:18px;">/** Use {@link Object#hashCode()} to partition. */  
    2.   public int getPartition(K key, V value,  
    3.                           int numReduceTasks) {  
    4.     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;  
    5.   }</span>  

    HashPartition只是把key hash后按reduceTask的个数取模,因此一般来说,不同的key分配到哪个reducer是随即的!所以,reducer内的所有数据是有序的,但reducer之间的数据却是乱序的!要想数据整体排序,要不只设一个reducer,要不使用TotalOrderPartitioner!

     
    4.Partition Sort Store
    在第四步中,partition是和sort一起做的,负责Spill的线程在拿到一段内存buf后会调用QuickSort的sort方法进行内存中的快排。
    1. <span style="font-size:18px;">sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);</span>  
    排序的算法是先按keyvalue记录的partition排序后按key的compare方法:
    1. <span style="font-size:18px;">public int compare(final int mi, final int mj) {  
    2.       final int kvi = offsetFor(mi % maxRec);  
    3.       final int kvj = offsetFor(mj % maxRec);  
    4.       final int kvip = kvmeta.get(kvi + PARTITION);  
    5.       final int kvjp = kvmeta.get(kvj + PARTITION);  
    6.       // sort by partition  
    7.       if (kvip != kvjp) {  
    8.         return kvip - kvjp;  
    9.       }  
    10.       // sort by key  
    11.       return comparator.compare(kvbuffer,  
    12.           kvmeta.get(kvi + KEYSTART),  
    13.           kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),  
    14.           kvbuffer,  
    15.           kvmeta.get(kvj + KEYSTART),  
    16.           kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));  
    17.     }</span>  
    因此,mapper输出的keyvalue首先是按partition聚合。而我们如果指定key的compare方法会在这里生效并进行排序。最后,一次spill的输出文件类似下图。
    在对内存中的buf排序后开始写文件。
    1. <span style="font-size:18px;">for (int i = 0; i < partitions; ++i) {  
    2.           IFile.Writer<K, V> writer = null;  
    3.           try {  
    4.             long segmentStart = out.getPos();  
    5.             writer = new Writer<K, V>(job, out, keyClass, valClass, codec,  
    6.                                       spilledRecordsCounter);  
    7.             if (combinerRunner == null) {  
    8.               // spill directly  
    9.               DataInputBuffer key = new DataInputBuffer();  
    10.               while (spindex < mend &&  
    11.                   kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {  
    12.                 final int kvoff = offsetFor(spindex % maxRec);  
    13.                 key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),  
    14.                           (kvmeta.get(kvoff + VALSTART) -  
    15.                            kvmeta.get(kvoff + KEYSTART)));  
    16.                 getVBytesForOffset(kvoff, value);  
    17.                 writer.append(key, value);  
    18.                 ++spindex;  
    19.               }  
    20.             } else {  
    21.               int spstart = spindex;  
    22.               while (spindex < mend &&  
    23.                   kvmeta.get(offsetFor(spindex % maxRec)  
    24.                             + PARTITION) == i) {  
    25.                 ++spindex;  
    26.               }  
    27.               // Note: we would like to avoid the combiner if we've fewer  
    28.               // than some threshold of records for a partition  
    29.               if (spstart != spindex) {  
    30.                 combineCollector.setWriter(writer);  
    31.                 RawKeyValueIterator kvIter =  
    32.                   new MRResultIterator(spstart, spindex);  
    33.                 combinerRunner.combine(kvIter, combineCollector);  
    34.               }  
    35.             }</span>  
    如果job没有定义combiner则直接写文件,如果有combiner则在这里进行combine。
    在生成spill文件后还会将此次spillRecord的记录写在一个index文件中。
    1. <span style="font-size:18px;">Path indexFilename =  
    2.               mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions  
    3.                   * MAP_OUTPUT_INDEX_RECORD_LENGTH);  
    4.           spillRec.writeToFile(indexFilename, job);</span>  
    1. <span style="font-size:18px;">rec.startOffset = segmentStart;  
    2.             rec.rawLength = writer.getRawLength();  
    3.             rec.partLength = writer.getCompressedLength();  
    4.             spillRec.putIndex(rec, i);</span>  
     
    5.merge
    当mapper执行完毕后,就进入merge阶段。首先看下相关的配置参数:
    1. <span style="font-size:18px;">int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);</span>  
    mergeFactor:同时merge的文件数。
     
    merge阶段的目的是将多个spill生成的中间文件合并为一个输出文件,这里 的合并不同于combiner,无论有没有配置combiner这里的merge都会执行。merge阶段的输出是一个数据文件 MapFinalOutputFile和一个index文件。看下相关代码:
    1. <span style="font-size:18px;">RawKeyValueIterator kvIter = Merger.merge(job, rfs,  
    2.                          keyClass, valClass, codec,  
    3.                          segmentList, mergeFactor,  
    4.                          new Path(mapId.toString()),  
    5.                          job.getOutputKeyComparator(), reporter, sortSegments,  
    6.                          null, spilledRecordsCounter, sortPhase.phase());  
    7.   
    8.           //write merged output to disk  
    9.           long segmentStart = finalOut.getPos();  
    10.           Writer<K, V> writer =  
    11.               new Writer<K, V>(job, finalOut, keyClass, valClass, codec,  
    12.                                spilledRecordsCounter);  
    13.           if (combinerRunner == null || numSpills < minSpillsForCombine) {  
    14.             Merger.writeFile(kvIter, writer, reporter, job);  
    15.           } else {  
    16.             combineCollector.setWriter(writer);  
    17.             combinerRunner.combine(kvIter, combineCollector);  
    18.           }</span>  
    说 下merge的算法。每个spill生成的文件中keyvalue都是有序的,但不同的文件却是乱序的,类似多个有序文件的多路归并算法。Merger分 别取出需要merge的spillfile的最小的keyvalue,放入一个内存堆中,每次从堆中取出一个最小的值,并把此值保存到merge的输出文 件中。这里和hbase中scan的算法非常相似,在分布式系统中多路归并排序真是当红小生啊!
    这里merge时不同的partition的key是不会比较的,只有相同的partition的keyvalue才会进行排序和合并。最后的输出文件类似下图。
    如果用户定义了combiner,在merge的过程中也会进行combine, 因为虽然第四步中combine过但那只是部分输入的combine,在merge时仍然需要combine。这里有人问了,既然这里有 combiner,为啥在spill输出时还要combine纳,我认为是因为每次combine都会大大减少输出文件的大小,spill时就 combine能减少一定的IO操作。
     
    在merge完后会把不同partition的信息保存进一个index文件以便之后reducer来拉自己部分的数据。
    1. <span style="font-size:18px;">// record offsets  
    2.           rec.startOffset = segmentStart;  
    3.           rec.rawLength = writer.getRawLength();  
    4.           rec.partLength = writer.getCompressedLength();  
    5.           spillRec.putIndex(rec, parts);</span>  

    最后,我们再对mapper过程中的要点总结一下:
    1.对map输出<key,value>的分区(partition)是在写入内存buf前就做好的了,方法是对key的hash。我们可以通过继承Partitioner类自己实现分区,将自己想要的数据分到同一个reducer中。
    2.写入内存buf速度是非常快的,但spill过程会block写入。因此,对内存buf相关参数的调优是mapreduce调优的重点之一。
    3.对数据的排序是基于MapOutKey排序的,因此,我们可以重载对应的方法实现customize的排序顺序

    4.combine在spill和merge中都是进行。多次的combine会减少mapreduce中的IO操作,如果使用得当会很好的提高性能。但需要注意的是要深刻理解combine的意义,比如平均值就不适合用combine。

  • 相关阅读:
    C#中使用MATLAB
    各种插值法及效果图对比
    Visual Studio中你所不知道的智能感知
    C#中使用Surfer
    XCode中连接使用GitHub
    个人作业Week1
    个人作业Week3
    个人作业Week2
    第一次作业——subway
    执行ajax加载页面中的js总结
  • 原文地址:https://www.cnblogs.com/pricks/p/3875026.html
Copyright © 2011-2022 走看看