zoukankan      html  css  js  c++  java
  • Hadoop-2.4.1学习之Map任务源代码分析(下)

          在Map任务源码分析(上)中,对MAP阶段的代码进行了学习,这篇文章文章将学习Map任务的SORT阶段。假设Reducer的数量不为0。则还须要进行SORT阶段。但从上面的学习中并未发现与MAP阶段运行完毕调用mapPhase.complete()相似的在SORT阶段运行完毕调用sortPhase.complete()的源码,那SORT阶段是在什么时候启动的?对于Map任务来说,有输入就有输出。输入由RecordReader负责。输出则由RecordWriter负责,当Reducer的数量不为0时,RecordWriter为NewOutputCollector(该类为MapTask的私有内部类),SORT阶段对map的输出进行处理,由此判断SORT阶段的工作是由NewOutputCollector完毕的。以下将通过分析NewOutputCollector的源码要验证这一判断是否成立。该类继承自RecordWriter。拥有的变量例如以下:

    private final MapOutputCollector<K,V> collector;//负责实际的输出操作
    private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;//对键空间进行分区
    private final int partitions;//分区数量。与Reducer的数量同样
    
          该类的构造函数例如以下:

    collector = createSortingCollector(job, reporter);
    partitions = jobContext.getNumReduceTasks();
    if (partitions > 1) {
    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
    ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
    } else {
    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
    @Override
    public int getPartition(K key, V value, int numPartitions) {
    return partitions - 1;
    }
    };
    }
    

          在该段代码中还是未发现与SORT阶段有关的不论什么信息,但却发现了Sorting,据此判断方法createSortingCollector具有最大的可能性。该方法的源码例如以下:

    //依据mapreduce.job.map.output.collector.class的值构建MapOutputCollector
    //在未指定该參数值的情况,返回MapOutputBuffer对象
    MapOutputCollector<KEY, VALUE> collector= (MapOutputCollector<KEY, VALUE>)
    ReflectionUtils.newInstance.
    (job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
    MapOutputBuffer.class, MapOutputCollector.class), job);
    LOG.info("Map output collector class = " + collector.getClass().getName());
    MapOutputCollector.Context context =new MapOutputCollector.Context(this, job, reporter);
    //默认调用MapOutputBuffer的init方法
    collector.init(context);
    return collector;
    

          经过上面的一系列分析可知,SORT阶段的工作由NewOutputCollector完毕,而NewOutputCollector又将SORT工作交给了MapOutputCollector。终于由该接口的实现类MapOutputBuffer完毕,该类做为MapTask的内部类占用了MapTask源码中超过一半的行数(MapTask行数为2000行,MapOutputBuffer约为1100行)。但从行数就能够得出该类的重要性。

    MapOutputBuffer的init方法的第一部分代码依据设置构建缓存,代码已经加入了相关凝视:

    //sanity checks
    //当kvbuffer缓存达到该值时,溢出线程将缓存内容写入硬盘
    final float spillper =job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
    //用于排序文件的缓存的大小,默觉得100MB
    final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
    //mapreduce.task.index.cache.limit.bytes,默认值为1024 * 1024(1M)
    indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,                                     INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
    if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid "" + JobContext.MAP_SORT_SPILL_PERCENT +
         "": " + spillper);
    }
    //sortmb的最大值为2047Mb(111 1111 1111),取sortmb的最低11位
    //若大于2047Mb,以下的左移20位将导致溢出
    if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException("Invalid "" + JobContext.IO_SORT_MB + "": " + sortmb);
    }
    //默认使用高速排序
    sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
    QuickSort.class, IndexedSorter.class), job);
    // buffers and accounting
    //sortbm*(2的20次方)。将sortbm转换为字节数(1024*1024,2的十次方乘以2的十次方)
    int maxMemUsage = sortmb << 20;
    //METASIZE=16,maxMemUsage=sortmb << 20
    maxMemUsage -= maxMemUsage % METASIZE;
    kvbuffer = new byte[maxMemUsage];
    bufvoid = kvbuffer.length;
    kvmeta = ByteBuffer.wrap(kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer();
    setEquator(0);
    //equator:标记元数据或者序列化数据的起源
    //bufstart:标记溢出的起始位置,bufend:标记可收集收据的起始位置
    //bufindex:标记已收集数据的结束位置。

    所有初始化为0 bufstart = bufend = bufindex = equator; //kvstart:标记溢出元数据的起源,kvend:标记溢出元数据的结束位置 //kvindex:标记全然序列化的记录的结束位置 kvstart = kvend = kvindex; maxRec = kvmeta.capacity() / NMETA; softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit;

          MapOutputBuffer的init方法的第二部分启动SpillThread线程,该线程用于完毕SORT阶段的工作,并负责溢出缓存中的数据。

    在该线程中的run方法中调用了sortAndSpill方法,由方法名就能够得知该方法负责map输出的排序和溢出工作。排序部分的源码例如以下:

    final int mstart = kvend / NMETA;
    // kvend is a valid record
    final int mend = 1 + (kvstart >= kvend? kvstart
              : kvmeta.capacity() + kvstart) / NMETA;
    //对指定范围的数据进行排序。默认使用的QuickSort
    sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
    

          对数据排完序后就须要将已排序数据写出到文件里。源码例如以下:

    int spindex = mstart;
    //记录索引的startOffset、rawLength和partLength
    final IndexRecord rec = new IndexRecord();
    //封装value的字节表示的内部类
    final InMemValBytes value = new InMemValBytes();
    for (int i = 0; i < partitions; ++i) {
       //负责将map的输出写入中间文件
       IFile.Writer<K, V> writer = null;
       try {
           long segmentStart = out.getPos();
           writer = new Writer<K, V>(job, out, keyClass, valClass, codec,spilledRecordsCounter);
           if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < mend &&
    kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                    final int kvoff = offsetFor(spindex % maxRec);
                    int keystart = kvmeta.get(kvoff + KEYSTART);
                    int valstart = kvmeta.get(kvoff + VALSTART);
                    key.reset(kvbuffer, keystart, valstart - keystart);
                    getVBytesForOffset(kvoff, value);
                    writer.append(key, value);
                    ++spindex;
               }
            } else {
               int spstart = spindex;
               while (spindex < mend
    &&kvmeta.get(offsetFor(spindex % maxRec)+ PARTITION) == i) {
                    ++spindex;
               }
               // Note: we would like to avoid the combiner if we've fewer
               // than some threshold of records for a partition
               if (spstart != spindex) {
                 combineCollector.setWriter(writer);
                 RawKeyValueIterator kvIter =new MRResultIterator(spstart, spindex);
                 combinerRunner.combine(kvIter, combineCollector);
                }
            }
           // close the writer
           writer.close();
           // record offsets
           rec.startOffset = segmentStart;
           rec.rawLength = writer.getRawLength();
           rec.partLength = writer.getCompressedLength();
           spillRec.putIndex(rec, i);
           writer = null;
       } finally {
           if (null != writer) writer.close();
       }
    }
    

          当保存索引的缓存超过限制时,就将索引保存到文件里,源码例如以下:

    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
     // create spill index file
    //MAP_OUTPUT_INDEX_RECORD_LENGTH值为24,表示索引文件里每条记录的大小
     Path indexFilename =mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                      * MAP_OUTPUT_INDEX_RECORD_LENGTH);
     spillRec.writeToFile(indexFilename, job);
    } else {
     indexCacheList.add(spillRec);
     totalIndexCacheMemory +=spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    

          综合上面的分析可知,当在map方法中运行context.write时,将先数据写入到缓存中,当缓存中的数据达到预先设置的阈值时由后台SpillThread线程负责数据排序并将数据溢出到map任务的中间输出文件里。

  • 相关阅读:
    SharePoint中获取当前登录的用户名
    SharePoint 2013 图文开发系列之InfoPath入门
    在InfoPath中如何获取当前用户的信息(Profile)
    更新当前列并添加其他列
    poj3067 Japan
    poj2481 Cows
    poj1195 Mobile phones
    poj2299 Ultra-QuickSort
    lower_bound()和upper_bound
    hdu4339 Query
  • 原文地址:https://www.cnblogs.com/blfshiye/p/5225092.html
Copyright © 2011-2022 走看看