zoukankan      html  css  js  c++  java
  • combine组合器

    具有选择性(适合sum,max,不适合avg)

    1.做优化:前提是不影响最终结果;

      a.实现map端到reduce端减少数据网络传输(网络IO)

      b.减少map Task数据输出(磁盘IO)

    2.combine其实是Reduce,combine的输出作为reduce的输入

    3.疑问:添加combine后的shuffle流程?

      a.无combine情况:shuffle流程

        inputformat-->map函数-->环形缓冲区-->partititon分区-->sort排序(quickSort)-->spill溢写-->merge合并-->sort排序(Collections.sort)-->fetch拉取-->merge合并-->sort排序(Collections.sort)-->reduce函数-->output

      b.含combine情况:shuffle流程

        inputformat-->map函数-->环形缓冲区-->partititon分区-->sort排序(quickSort)-->combiner-->spill溢写-->merge合并-->sort排序(Collections.sort)-->combiner-->fetch拉取-->merge合并-->sort排序(Collections.sort)-->combiner-->reduce函数-->output

    4.源码说明

    -----------------------------------Map端combiner-------------------------------------------------------------------
    【MapTask.class$MapOutputBuffer.class】sortAndSpill()
    -->sorter.sort(MapOutputBuffer.this, mstart, mend, reporter)下面:
    if (combinerRunner == null) {
    // spill directly
    DataInputBuffer key = new DataInputBuffer();
    ....
    }
    } else {
    .....
    if (spstart != spindex) {
    combineCollector.setWriter(writer);
    RawKeyValueIterator kvIter =
    new MRResultIterator(spstart, spindex);
    combinerRunner.combine(kvIter, combineCollector);
    }
    }

    -->mergeParts()
    -->Merger.merge()之后,调用combiner
    -->调用combiner
    if (combinerRunner == null || numSpills < minSpillsForCombine) {
    Merger.writeFile(kvIter, writer, reporter, job);
    } else {
    combineCollector.setWriter(writer);
    combinerRunner.combine(kvIter, combineCollector);
    }
    说明:当溢写个数大于等于3,开启combiner操作;参照属性为:mapreduce.map.combine.minspills
    ----------------------------------Map端combiner----------------------------------------------------------------
    ----------------------------------Reducer端combiner----------------------------------------------------------------
    -->【ReduceTask.class】run()
      -->ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(...)
      -->【ShuffleConsumerPlugin接口】加载实现类Shuffle.class
        -->【Shuffle.class】init()
          -->【Shuffle.class】createMergeManager(context)
            -->【Shuffle.class】 new MergeManagerImpl()
              -->【MergeManagerImp.class】merge(List<InMemoryMapOutput<K,V>> inputs)
                -->Merger.merge()之后调用combiner
    if (null == combinerClass) {
    Merger.writeFile(rIter, writer, reporter, jobConf);
    } else {
    combineCollector.setWriter(writer);
    combineAndSpill(rIter, reduceCombineInputCounter);
    }

    ----------------------------------Reducer端combiner----------------------------------------------------------------

  • 相关阅读:
    linux进程间通信之消息队列
    本地安装discuz x2.5(论坛站)程序
    缩小IO/CPU瓶颈:linux平台加速编译速度的几种方法
    php mcrypt
    Nginx工作原理和优化、漏洞。
    Linux下两种TCP网络服务器实现方式:循环服务&并发服务
    version `GLIBC_2.14' not found 解决方法.
    Flex Ant自动构建
    函数传指针和传引用
    JEECG 列表行编辑模式下实现文本的xheditor富文本框编辑器
  • 原文地址:https://www.cnblogs.com/lyr999736/p/9381500.html
Copyright © 2011-2022 走看看