zoukankan      html  css  js  c++  java
  • combine组合器

    具有选择性(适合sum,max,不适合avg)

    1.做优化:前提是不影响最终结果;

      a.实现map端到reduce端减少数据网络传输(网络IO)

      b.减少map Task数据输出(磁盘IO)

    2.combine其实是Reduce,combine的输出作为reduce的输入

    3.疑问:添加combine后的shuffle流程?

      a.无combine情况:shuffle流程

        inputformat-->map函数-->环形缓冲区-->partititon分区-->sort排序(quickSort)-->spill溢写-->merge合并-->sort排序(Collections.sort)-->fetch拉取-->merge合并-->sort排序(Collections.sort)-->reduce函数-->output

      b.含combine情况:shuffle流程

        inputformat-->map函数-->环形缓冲区-->partititon分区-->sort排序(quickSort)-->combiner-->spill溢写-->merge合并-->sort排序(Collections.sort)-->combiner-->fetch拉取-->merge合并-->sort排序(Collections.sort)-->combiner-->reduce函数-->output

    4.源码说明

    -----------------------------------Map端combiner-------------------------------------------------------------------
    【MapTask.class$MapOutputBuffer.class】sortAndSpill()
    -->sorter.sort(MapOutputBuffer.this, mstart, mend, reporter)下面:
    if (combinerRunner == null) {
    // spill directly
    DataInputBuffer key = new DataInputBuffer();
    ....
    }
    } else {
    .....
    if (spstart != spindex) {
    combineCollector.setWriter(writer);
    RawKeyValueIterator kvIter =
    new MRResultIterator(spstart, spindex);
    combinerRunner.combine(kvIter, combineCollector);
    }
    }

    -->mergeParts()
    -->Merger.merge()之后,调用combiner
    -->调用combiner
    if (combinerRunner == null || numSpills < minSpillsForCombine) {
    Merger.writeFile(kvIter, writer, reporter, job);
    } else {
    combineCollector.setWriter(writer);
    combinerRunner.combine(kvIter, combineCollector);
    }
    说明:当溢写个数大于等于3,开启combiner操作;参照属性为:mapreduce.map.combine.minspills
    ----------------------------------Map端combiner----------------------------------------------------------------
    ----------------------------------Reducer端combiner----------------------------------------------------------------
    -->【ReduceTask.class】run()
      -->ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(...)
      -->【ShuffleConsumerPlugin接口】加载实现类Shuffle.class
        -->【Shuffle.class】init()
          -->【Shuffle.class】createMergeManager(context)
            -->【Shuffle.class】 new MergeManagerImpl()
              -->【MergeManagerImp.class】merge(List<InMemoryMapOutput<K,V>> inputs)
                -->Merger.merge()之后调用combiner
    if (null == combinerClass) {
    Merger.writeFile(rIter, writer, reporter, jobConf);
    } else {
    combineCollector.setWriter(writer);
    combineAndSpill(rIter, reduceCombineInputCounter);
    }

    ----------------------------------Reducer端combiner----------------------------------------------------------------

  • 相关阅读:
    古人诗词之王安石
    关于周期函数的命题
    Strum—Lioville问题
    计算反常积分
    【面积原理】计算级数和
    【洛谷P6046】纯粹容器
    【洛谷P3631】方格染色
    【牛客挑战赛48 E】速度即转发
    【CF103D】Time to Raid Cowavans
    【洛谷P4280】逆序对
  • 原文地址:https://www.cnblogs.com/lyr999736/p/9381500.html
Copyright © 2011-2022 走看看