zoukankan      html  css  js  c++  java
  • combineByKey

    通过分析reduceByKey和groupByKey的源码,发现两个算子都使用了combineByKey这个算子,那么现在来分析一下combineByKey算子。

    /**
       * Simplified version of combineByKey that hash-partitions the output RDD.
       */
      def combineByKey[C](
        createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          numPartitions: Int): RDD[(K, C)] = {
        combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
      }
    
    
    /**
     * Generic function to combine the elements for each key using a custom set of aggregation
     * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
     * Note that V and C can be different -- for example, one might group an RDD of type
     * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
     *
     * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
     * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
     * - `mergeCombiners`, to combine two C's into a single one.
     *
     * In addition, users can control the partitioning of the output RDD, and whether to perform
     * map-side aggregation (if a mapper can produce multiple items with the same key).
     */
    def combineByKey[C](createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C,
        partitioner: Partitioner,
        mapSideCombine: Boolean = true,
        serializer: Serializer = null): RDD[(K, C)] = {
      require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
      if (keyClass.isArray) {
        if (mapSideCombine) {
          throw new SparkException("Cannot use map-side combining with array keys.")
        }
        if (partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("Default partitioner cannot partition array keys.")
        }
      }
      val aggregator = new Aggregator[K, V, C](
        self.context.clean(createCombiner),
        self.context.clean(mergeValue),
        self.context.clean(mergeCombiners))
      if (self.partitioner == Some(partitioner)) {
        self.mapPartitions(iter => {
          val context = TaskContext.get()
          new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
        }, preservesPartitioning = true)
      } else {
        new ShuffledRDD[K, V, C](self, partitioner)
          .setSerializer(serializer)
          .setAggregator(aggregator)
          .setMapSideCombine(mapSideCombine)
      }
    }

    在combineByKey函数中包含 createCombiner、mergeValue、mergeCombiners函数

    createCombiner: V => C :`createCombiner`, which turns a V into a C (e.g., creates a one-element list) 。如果在第一次执行combineByKey时,此时会调用此函数,它会负责将一个Value值转换成一个Iterator

    mergeValue: (C, V) => C :`mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)  如果不是第一次执行combinByKey,此时会将新传入的Value参数添加到原有的Iterator集合尾部,此函数负责将一个Value值加入到List的尾部(此函数在每个Parttition中执行)

    mergeCombiners: (C, C) => C :`mergeCombiners`, to combine two C's into a single one. 此函数的作用是将多个集合合并成一个集合(此函数在不同的Partition中执行)

    参考:

    http://codingjunkie.net/spark-combine-by-key/

  • 相关阅读:
    spring 环绕通知 ProceedingJoinPoint 执行proceed方法的作用是什么
    SpringMVC之RequestContextHolder分析
    MySQL中索引不会被用到的情况
    使用Stream快速对List进行一些操作
    Vue中this.$refs[name].resetFields();的使用
    好看的字体
    转,javascript中call()、apply()、bind()的用法终于理解
    vue中的$props
    手机端页面自适应解决方案-rem布局
    查看项目里特定npm包的版本号
  • 原文地址:https://www.cnblogs.com/0xcafedaddy/p/7630254.html
Copyright © 2011-2022 走看看