zoukankan      html  css  js  c++  java
  • RangePartitioner 实现简记

    摘要:

      1.背景

      2.rangeBounds 上边界数组源码走读

      3.RangePartitioner的sketch 源码走读

      4.determineBounds 源码走读

      5.关于RangePartitioner和sortByKey实验

    内容:  

    1.背景:这是一个填之前Spark RDD 核心总结这篇博文中RangePartitioner留下的坑,没想到又发现一个坑(XORShiftRandom:生成随机数的一个算法,有时间再来总结)

    RangePartitioner 是Spark Partitioner 中的一种分区方式,在排序算子(sortByKey)中使用;相比HashPartitioner,RangePartitioner分区会尽量保证每个分区中数据量的均匀 

    2.rangeBounds 上边界数组源码走读

    rangeBounds是一个Array,保存着每个分区的上界(upper bounds)值;

    一般是过采样抽样大小的3倍来保证采样样本是基本平衡的;

    然后调用sketch(rdd.map(_._1), sampleSizePerPartition) 方法进行抽样,下文会详细说明;

    如果一个分区抽样的样本数比平均抽样的样本数还多,会调用rdd.sample再次对不平衡样本进行采样。

    之后调用determineBounds(candidates, partitions)来返回分区对用的rangeBounds,下文也会详细介绍这个方法

    // An array of upper bounds for the first (partitions - 1) partitions
      private var rangeBounds: Array[K] = {
        if (partitions <= 1) {
          Array.empty
        } else {
          // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
          val sampleSize = math.min(20.0 * partitions, 1e6)
          // Assume the input partitions are roughly balanced and over-sample a little bit.
          val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
          val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
          if (numItems == 0L) {
            Array.empty
          } else {
            // If a partition contains much more than the average number of items, we re-sample from it
            // to ensure that enough items are collected from that partition.
            val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
            val candidates = ArrayBuffer.empty[(K, Float)]
            val imbalancedPartitions = mutable.Set.empty[Int]
            sketched.foreach { case (idx, n, sample) =>
              if (fraction * n > sampleSizePerPartition) {
                imbalancedPartitions += idx
              } else {
                // The weight is 1 over the sampling probability.
                val weight = (n.toDouble / sample.length).toFloat
                for (key <- sample) {
                  candidates += ((key, weight))
                }
              }
            }
            if (imbalancedPartitions.nonEmpty) {
              // Re-sample imbalanced partitions with the desired sampling probability.
              val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
              val seed = byteswap32(-rdd.id - 1)
              val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
              val weight = (1.0 / fraction).toFloat
              candidates ++= reSampled.map(x => (x, weight))
            }
            RangePartitioner.determineBounds(candidates, partitions)
          }
        }
      }
    

      

    3.RangePartitioner的sketch 源码走读

      

    下面代码跟到了RangePartitioner这个伴生对象,其主要包括如下两个方法:

    sketch(rdd.map(_._1), sampleSizePerPartition) 这个方法会返回抽样的总数和一个元素为(分区id,分区总数,以及抽样到的所有Key)的三元组的Array,其中使用到了水塘抽样算法,可以查看蓄水池(Reservoir_sampling)抽样算法简记

    private[spark] object RangePartitioner {
    
      /**
       * Sketches the input RDD via reservoir sampling on each partition.
       *
       * @param rdd the input RDD to sketch
       * @param sampleSizePerPartition max sample size per partition
       * @return (total number of items, an array of (partitionId, number of items, sample))
       */
      def sketch[K : ClassTag](
          rdd: RDD[K],
          sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
        val shift = rdd.id
       
        val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
          val seed = byteswap32(idx ^ (shift << 16))
          val (sample, n) = SamplingUtils.reservoirSampleAndCount(
            iter, sampleSizePerPartition, seed)
          Iterator((idx, n, sample))
        }.collect()
        val numItems = sketched.map(_._2).sum
        (numItems, sketched)
      }
    

    4.determineBounds 源码走读:

    determineBounds(candidates, partitions)这个方法返回实际Key对应的分区上界值,其中candidates包含Key和Key所占的比例(weight)

    /**
       * Determines the bounds for range partitioning from candidates with weights indicating how many
       * items each represents. Usually this is 1 over the probability used to sample this candidate.
       *
       * @param candidates unordered candidates with weights
       * @param partitions number of partitions
       * @return selected bounds
       */
      def determineBounds[K : Ordering : ClassTag](
          candidates: ArrayBuffer[(K, Float)],
          partitions: Int): Array[K] = {
        val ordering = implicitly[Ordering[K]]
        val ordered = candidates.sortBy(_._1)
        val numCandidates = ordered.size
        val sumWeights = ordered.map(_._2.toDouble).sum
        val step = sumWeights / partitions
        var cumWeight = 0.0
        var target = step
        val bounds = ArrayBuffer.empty[K]
        var i = 0
        var j = 0
        var previousBound = Option.empty[K]
        while ((i < numCandidates) && (j < partitions - 1)) {
          val (key, weight) = ordered(i)
          cumWeight += weight
          if (cumWeight >= target) {
            // Skip duplicate values.
            if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
              bounds += key
              target += step
              j += 1
              previousBound = Some(key)
            }
          }
          i += 1
        }
        bounds.toArray
      }
    

      

    5.关于RangePartitioner和sortByKey实验

    RangePartitioner在SortByKey中的应用:

    返回的就是一个以RangePartitioner作为分区函数的ShuffledRDD

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] = self.withScope
      {
        val part = new RangePartitioner(numPartitions, self, ascending)
        new ShuffledRDD[K, V, V](self, part)
          .setKeyOrdering(if (ascending) ordering else ordering.reverse)
      }
    

    以下是做的有关RangePartition和SortByKey的实验:

    自己实现的sortByKey

     

      

  • 相关阅读:
    error: gnu/stubs32.h: 没有那个文件或目录
    vim配色方案
    Linux文件合并、去除重复
    Debian网络安装中的驱动问题
    汽油、柴油标号
    Debian SSH登录慢的解决办法
    Debian下的时间和时区问题
    解决vim、gvim在windows下中文乱码
    使用本地Debian ISO镜像作为网络安装源
    Debian如何永久添加静态路由
  • 原文地址:https://www.cnblogs.com/arachis/p/Range_Partitioner.html
Copyright © 2011-2022 走看看