zoukankan      html  css  js  c++  java
  • Spark源码分析 -- PairRDD

    和一般RDD最大的不同就是有两个泛型参数, [K, V]表示pair的概念
    关键的function是, combineByKey, 所有pair相关操作的抽象

    combine是这样的操作, Turns an RDD[(K, V)] into a result of type RDD[(K, C)]
    其中C有可能只是简单类型, 但经常是seq, 比如(Int, Int) to (Int, Seq[Int])

    下面来看看combineByKey的参数,
    首先需要用户自定义一些操作,
    createCombiner: V => C, C不存在的情况下, 比如通过V创建seq C
    mergeValue: (C, V) => C, 当C已经存在的情况下, 需要merge, 比如把item V加到seq C中, 或者叠加 
    mergeCombiners: (C, C) => C,  合并两个C
    partitioner: Partitioner, Shuffle时需要的Partitioner
    mapSideCombine: Boolean = true, 为了减小传输量, 很多combine可以在map端先做, 比如叠加, 可以先在一个partition中把所有相同的key的value叠加, 再shuffle
    serializerClass: String = null, 传输需要序列化, 用户可以自定义序列化类

    /**
     * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
     * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
     */
    class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
      extends Logging
      with SparkHadoopMapReduceUtil
      with Serializable {
    
      /**
       * Generic function to combine the elements for each key using a custom set of aggregation
       * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
       * Note that V and C can be different -- for example, one might group an RDD of type
       * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
       *
       * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
       * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
       * - `mergeCombiners`, to combine two C's into a single one.
       *
       * In addition, users can control the partitioning of the output RDD, and whether to perform
       * map-side aggregation (if a mapper can produce multiple items with the same key).
       */
      def combineByKey[C](createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializerClass: String = null): RDD[(K, C)] = {
        val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) //1.Aggregator
        //RDD本身的partitioner和传入的partitioner相等时, 即不需要重新shuffle, 直接map即可
        if (self.partitioner == Some(partitioner)) {  
          self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //2. mapPartitions, map端直接调用combineValuesByKey
        } else if (mapSideCombine) { //如果需要mapSideCombine
          val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //先在partition内部做mapSideCombine
          val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializerClass) //3. ShuffledRDD, 进行shuffle
          partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true) //Shuffle完后, 在reduce端再做一次combine, 使用combineCombinersByKey
        } else {
          // Don't apply map-side combiner.和上面的区别就是不做mapSideCombine
          // A sanity check to make sure mergeCombiners is not defined.
          assert(mergeCombiners == null)
          val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
          values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
        }
      }
    }

    1. Aggregator

    在combineByKey中, 首先创建Aggregator, 其实在Aggregator中封装了两个函数,
    combineValuesByKey, 用于处理将V加入到C的case, 比如加入一个item到一个seq里面, 用于map端
    combineCombinersByKey, 用于处理两个C合并, 比如两个seq合并, 用于reduce端

    case class Aggregator[K, V, C] (
        createCombiner: V => C,
        mergeValue: (C, V) => C,
        mergeCombiners: (C, C) => C) {
    
      def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
        val combiners = new JHashMap[K, C]
        for (kv <- iter) {
          val oldC = combiners.get(kv._1)
          if (oldC == null) {
            combiners.put(kv._1, createCombiner(kv._2))
          } else {
            combiners.put(kv._1, mergeValue(oldC, kv._2))
          }
        }
        combiners.iterator
      }
    
      def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
        val combiners = new JHashMap[K, C]
        iter.foreach { case(k, c) =>
          val oldC = combiners.get(k)
          if (oldC == null) {
            combiners.put(k, c)
          } else {
            combiners.put(k, mergeCombiners(oldC, c))
          }
        }
        combiners.iterator
      }
    }

    2. mapPartitions

    mapPartitions其实就是使用MapPartitionsRDD
    做的事情就是对当前partition执行map函数f, Iterator[T] => Iterator[U]
    比如, 执行combineValuesByKey: Iterator[_ <: Product2[K, V]] to Iterator[(K, C)]

      /**
       * Return a new RDD by applying a function to each partition of this RDD.
       */
      def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U] =
        new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
     
    private[spark]
    class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
        prev: RDD[T],
        f: Iterator[T] => Iterator[U],
        preservesPartitioning: Boolean = false)
      extends RDD[U](prev) {
    
      override val partitioner =
        if (preservesPartitioning) firstParent[T].partitioner else None
    
      override def getPartitions: Array[Partition] = firstParent[T].partitions
    
      override def compute(split: Partition, context: TaskContext) =
        f(firstParent[T].iterator(split, context)) // 对于map,就是调用f

    3. ShuffledRDD

    Shuffle实际上是由系统的shuffleFetcher完成的, Spark的抽象封装非常的好
    所以在这里看不到Shuffle具体是怎么样做的, 这个需要分析到shuffleFetcher时候才能看到 
    因为每个shuffle是有一个全局的shuffleid的
    所以在compute里面, 你只是看到用BlockStoreShuffleFetcher根据shuffleid和partitionid直接fetch到shuffle过后的数据

    /**
    * The resulting RDD from a shuffle (e.g. repartitioning of data).
    * @param prev the parent RDD.
    * @param part the partitioner used to partition the RDD
    * @tparam K the key class.
    * @tparam V the value class.
    */
    class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
        @transient var prev: RDD[P],
        part: Partitioner)
      extends RDD[P](prev.context, Nil) {
      override val partitioner = Some(part)  
      //ShuffleRDD会进行repartition, 所以从Partitioner中取出新的part数目  
      //并用Array.tabulate动态创建相应个数的ShuffledRDDPartition
      override def getPartitions: Array[Partition] = {
        Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i)) 
      }
      
      override def compute(split: Partition, context: TaskContext): Iterator[P] = {
        val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
        SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context.taskMetrics,
          SparkEnv.get.serializerManager.get(serializerClass))
      }
    }

    ShuffledRDDPartition没啥区别, 一样只是记录id

    private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
      override val index = idx
      override def hashCode(): Int = idx
    }

    下面再来看一下, 如果使用combineByKey来实现其他的操作的,

    group

    group是比较典型的例子, (Int, Int) to (Int, Seq[Int])
    由于groupByKey不使用map side combine, 因为这样也无法减少传输空间, 所以不需要实现mergeCombiners

      /**
       * Group the values for each key in the RDD into a single sequence. Allows controlling the
       * partitioning of the resulting key-value pair RDD by passing a Partitioner.
       */
      def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
        // groupByKey shouldn't use map side combine because map side combine does not
        // reduce the amount of data shuffled and requires all map side data be inserted
        // into a hash table, leading to more objects in the old gen.
        def createCombiner(v: V) = ArrayBuffer(v) //创建seq
        def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v //添加item到seq
        val bufs = combineByKey[ArrayBuffer[V]](
          createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
        bufs.asInstanceOf[RDD[(K, Seq[V])]]
      }

    reduce

    reduce是更简单的一种情况, 只是两个值合并成一个值, (Int, Int V) to (Int, Int C), 比如叠加
    所以createCombiner很简单, 就是直接返回v
    而mergeValue和mergeCombiners逻辑是相同的, 没有区别

      /**
       * Merge the values for each key using an associative reduce function. This will also perform
       * the merging locally on each mapper before sending results to a reducer, similarly to a
       * "combiner" in MapReduce.
       */
      def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
        combineByKey[V]((v: V) => v, func, func, partitioner)
      }
  • 相关阅读:
    pm2进阶使用
    javascript装饰器模式
    pupeteer初体验
    重构:从Promise到Async/Await
    # electron-vue 尝试做个网易云音乐
    Kafka监控:主要性能指标
    生产环境Rabbitmq集群安装部署与配置
    Java同步块(synchronized block)
    RabbitMQ高可用镜像队列
    kafka-0.9消费者新API
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3489111.html
Copyright © 2011-2022 走看看