zoukankan      html  css  js  c++  java
  • combineByKey

    通过分析reduceByKey和groupByKey的源码,发现两个算子都使用了combineByKey这个算子,那么现在来分析一下combineByKey算子。

    /**
       * Simplified version of combineByKey that hash-partitions the output RDD.
       */
      def combineByKey[C](
        createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          numPartitions: Int): RDD[(K, C)] = {
        combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
      }
    
    
    /**
     * Generic function to combine the elements for each key using a custom set of aggregation
     * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
     * Note that V and C can be different -- for example, one might group an RDD of type
     * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
     *
     * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
     * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
     * - `mergeCombiners`, to combine two C's into a single one.
     *
     * In addition, users can control the partitioning of the output RDD, and whether to perform
     * map-side aggregation (if a mapper can produce multiple items with the same key).
     */
    def combineByKey[C](createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        partitioner: Partitioner,
        mapSideCombine: Boolean = true,
        serializer: Serializer = null): RDD[(K, C)] = {
      require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
      if (keyClass.isArray) {
        if (mapSideCombine) {
          throw new SparkException("Cannot use map-side combining with array keys.")
        }
        if (partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("Default partitioner cannot partition array keys.")
        }
      }
      val aggregator = new Aggregator[K, V, C](
        self.context.clean(createCombiner),
        self.context.clean(mergeValue),
        self.context.clean(mergeCombiners))
      if (self.partitioner == Some(partitioner)) {
        self.mapPartitions(iter => {
          val context = TaskContext.get()
          new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
        }, preservesPartitioning = true)
      } else {
        new ShuffledRDD[K, V, C](self, partitioner)
          .setSerializer(serializer)
          .setAggregator(aggregator)
          .setMapSideCombine(mapSideCombine)
      }
    }

    在combineByKey函数中包含 createCombiner、mergeValue、mergeCombiners函数

    createCombiner: V => C :`createCombiner`, which turns a V into a C (e.g., creates a one-element list) 。如果在第一次执行combineByKey时,此时会调用此函数,它会负责将一个Value值转换成一个Iterator

    mergeValue: (C, V) => C :`mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)  如果不是第一次执行combinByKey,此时会将新传入的Value参数添加到原有的Iterator集合尾部,此函数负责将一个Value值加入到List的尾部(此函数在每个Parttition中执行)

    mergeCombiners: (C, C) => C :`mergeCombiners`, to combine two C's into a single one. 此函数的作用是将多个集合合并成一个集合(此函数在不同的Partition中执行)

    参考:

    http://codingjunkie.net/spark-combine-by-key/

  • 相关阅读:
    Delphi操作Excel大全
    一名Delphi程序员的开发习惯
    七维互联(www.7wei.com)
    Android开发数据库三层应用-DataSnap
    如何破解excel宏的密码
    让Delphi的DataSnap发挥最大效率
    使用 TRegistry 类[1]: 显示各主键下的项
    ini 文件操作记要(1): 使用 TIniFile
    Delphi经验总结(1)
    Delphi经验总结(2)
  • 原文地址:https://www.cnblogs.com/0xcafedaddy/p/7630254.html
Copyright © 2011-2022 走看看