zoukankan      html  css  js  c++  java
  • Hadoop on Mac with IntelliJ IDEA

    下午对着源码看陆喜恒. Hadoop实战(第2版)6.4.1  (Shuffle和排序)Map端,发现与Hadoop 1.2.1的源码有些出入。下面作个简单的记录,方便起见,引用自书本的语句都用斜体表示

    依书本,从MapTask.java开始。这个类有多个内部类:

    从书的描述可知,collect()并不在MapTask类,而在MapOutputBuffer类,其函数功能是

    1、定义输出内存缓冲区为环形结构
    2、定义输出内存缓冲区内容到磁盘的操作

    在collect函数中将缓冲区的内容写出时会调用sortAndSpill函数。好了,从这里开始就糊涂了,因为collect()没调用这个函数,接触Hadoop也就几天时间,啥都不懂,晕了。

    简单表示下当前的函数调用关系:

    0 ---- MapOutputBuffer::collect()

    达到写出阈值后,写了缓冲区内容,形成spill文件。即,调用了startSpill()。

    0 ---- MapOutputBuffer::collect()

    1 -------- startSpill()

    startSpill()触发了条件:spillReady.signal()。字段spillReady在SpillThread类中用到,SpillThread为Thread的子类,其run方法有如下内容:

    SpillThread::run()
        // ...
        spillReady.await();
        // ...
        MapOutputBuffer::sortAndSpill()
        // ...

    那么,这里第一次看到sortAndSpill方法被调用,接上了书本的描述。现在主要函数调用关系如下

    线程1 线程2(MapOutputBuffer构造函数中启动)

    0 ---- MapOutputBuffer::collect()

    1 -------- startSpill()

    2 ------------ spillReady.signal()

    SpillThread::run()
      // ...
      spillReady.await();
      // ...
      MapOutputBuffer::sortAndSpill()
      // ...

    sortAndSpill内部使用了快排:

    ...
    sorter = ReflectionUtils.newInstance(
                                    job.getClass(
                            "map.sort.class", 
                            QuickSort.class, 
                            IndexedSorter.class), 
                            job);
    ...
    sorter.sort();
    ...

    排序后,判断combinerRunner是否为空,为空直接写入spill,否则调用combinerRunner.combine方法,而不是combineAndSpill方法,Hadoop 1.2.1源码中没书上写的这句代码。combinerRunner在MapOutputBuffer的构造函数中定义

    combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                                 combineInputCounter,
                                                 reporter, null);

    sortAndSpill()另一个调用者是flush(),此函数是MapOutputBuffer成员。这段代码位于

    MapTask::run()

      runOldMapper()

        collector = new MapOutputBuffer(umbilical, job, reporter);

        ...

        collector.flush()

          sortAndSpill()

          mergeParts()

    done(umbilical, reporter)

    上述的调用关系才符合书第112页最后一段的描述。mergeParts()执行合并操作,这个操作的主要目的是将Map生成的众多spill文件中的数据按照划分重新组织,以便于Reduce处理。这里的划分,应该是partition之意。

    待唯一的已分区且排序的Map输出文件写入最后一条记录后,Map端的shuffle阶段就结束了。从源码看,这步应该是执行done(umbilical, reporter)后才完成。

    ╮(╯_╰)╭  我不是为了情怀,我就是认真。

  • 相关阅读:
    vue中使用v-bind="$attrs"和v-on="$listeners"进行多层组件监听
    钉钉小程序开发遇到的坑
    promise、async、await、settimeout异步原理与执行顺序
    js获取url参数值的几种方式
    ES6解构赋值
    2019年前端面试题
    浏览器兼容问题踩坑收集
    react+antd分页 实现分页及页面刷新时回到刷新前的page
    无侵入埋点
    CSS学习笔记(三)空白符和换行
  • 原文地址:https://www.cnblogs.com/michaellfx/p/4012353.html
Copyright © 2011-2022 走看看