zoukankan      html  css  js  c++  java
  • combineByKey

    通过分析reduceByKey和groupByKey的源码,发现两个算子都使用了combineByKey这个算子,那么现在来分析一下combineByKey算子。

    /**
       * Simplified version of combineByKey that hash-partitions the output RDD.
       */
      def combineByKey[C](
        createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          numPartitions: Int): RDD[(K, C)] = {
        combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
      }
    
    
    /**
     * Generic function to combine the elements for each key using a custom set of aggregation
     * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
     * Note that V and C can be different -- for example, one might group an RDD of type
     * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
     *
     * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
     * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
     * - `mergeCombiners`, to combine two C's into a single one.
     *
     * In addition, users can control the partitioning of the output RDD, and whether to perform
     * map-side aggregation (if a mapper can produce multiple items with the same key).
     */
    def combineByKey[C](createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        partitioner: Partitioner,
        mapSideCombine: Boolean = true,
        serializer: Serializer = null): RDD[(K, C)] = {
      require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
      if (keyClass.isArray) {
        if (mapSideCombine) {
          throw new SparkException("Cannot use map-side combining with array keys.")
        }
        if (partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("Default partitioner cannot partition array keys.")
        }
      }
      val aggregator = new Aggregator[K, V, C](
        self.context.clean(createCombiner),
        self.context.clean(mergeValue),
        self.context.clean(mergeCombiners))
      if (self.partitioner == Some(partitioner)) {
        self.mapPartitions(iter => {
          val context = TaskContext.get()
          new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
        }, preservesPartitioning = true)
      } else {
        new ShuffledRDD[K, V, C](self, partitioner)
          .setSerializer(serializer)
          .setAggregator(aggregator)
          .setMapSideCombine(mapSideCombine)
      }
    }

    在combineByKey函数中包含 createCombiner、mergeValue、mergeCombiners函数

    createCombiner: V => C :`createCombiner`, which turns a V into a C (e.g., creates a one-element list) 。如果在第一次执行combineByKey时,此时会调用此函数,它会负责将一个Value值转换成一个Iterator

    mergeValue: (C, V) => C :`mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)  如果不是第一次执行combinByKey,此时会将新传入的Value参数添加到原有的Iterator集合尾部,此函数负责将一个Value值加入到List的尾部(此函数在每个Parttition中执行)

    mergeCombiners: (C, C) => C :`mergeCombiners`, to combine two C's into a single one. 此函数的作用是将多个集合合并成一个集合(此函数在不同的Partition中执行)

    参考:

    http://codingjunkie.net/spark-combine-by-key/

  • 相关阅读:
    DVWA的安装过程
    《论美国的民主》读后感
    《C专家编程》读书笔记(三)
    vue中插槽(slot)的使用
    element-ui中el-table表格的使用(如何取到当前列的所有数据)
    element-ui遮罩层el-dialog的使用
    移动端开发网页时,有部分字体无故变大或变小
    Meathill的博客地址
    css让文字,字母折行
    vue-element-admin平时使用归纳
  • 原文地址:https://www.cnblogs.com/0xcafedaddy/p/7630254.html
Copyright © 2011-2022 走看看