摘要:
1.背景
2.rangeBounds 上边界数组源码走读
3.RangePartitioner的sketch 源码走读
4.determineBounds 源码走读
5.关于RangePartitioner和sortByKey实验
内容:
1.背景:这是一个填之前Spark RDD 核心总结这篇博文中RangePartitioner留下的坑,没想到又发现一个坑(XORShiftRandom:生成随机数的一个算法,有时间再来总结)
RangePartitioner 是Spark Partitioner 中的一种分区方式,在排序算子(sortByKey)中使用;相比HashPartitioner,RangePartitioner分区会尽量保证每个分区中数据量的均匀
2.rangeBounds 上边界数组源码走读
rangeBounds是一个Array,保存着每个分区的上界(upper bounds)值;
一般是过采样抽样大小的3倍来保证采样样本是基本平衡的;
然后调用sketch(rdd.map(_._1), sampleSizePerPartition) 方法进行抽样,下文会详细说明;
如果一个分区抽样的样本数比平均抽样的样本数还多,会调用rdd.sample再次对不平衡样本进行采样。
之后调用determineBounds(candidates, partitions)来返回分区对用的rangeBounds,下文也会详细介绍这个方法
// An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx, n, sample) => if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.length).toFloat for (key <- sample) { candidates += ((key, weight)) } } } if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id - 1) val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } RangePartitioner.determineBounds(candidates, partitions) } } }
3.RangePartitioner的sketch 源码走读
下面代码跟到了RangePartitioner这个伴生对象,其主要包括如下两个方法:
sketch(rdd.map(_._1), sampleSizePerPartition) 这个方法会返回抽样的总数和一个元素为(分区id,分区总数,以及抽样到的所有Key)的三元组的Array,其中使用到了水塘抽样算法,可以查看蓄水池(Reservoir_sampling)抽样算法简记
private[spark] object RangePartitioner { /** * Sketches the input RDD via reservoir sampling on each partition. * * @param rdd the input RDD to sketch * @param sampleSizePerPartition max sample size per partition * @return (total number of items, an array of (partitionId, number of items, sample)) */ def sketch[K : ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = { val shift = rdd.id val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2).sum (numItems, sketched) }
4.determineBounds 源码走读:
determineBounds(candidates, partitions)这个方法返回实际Key对应的分区上界值,其中candidates包含Key和Key所占的比例(weight)
/** * Determines the bounds for range partitioning from candidates with weights indicating how many * items each represents. Usually this is 1 over the probability used to sample this candidate. * * @param candidates unordered candidates with weights * @param partitions number of partitions * @return selected bounds */ def determineBounds[K : Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] val ordered = candidates.sortBy(_._1) val numCandidates = ordered.size val sumWeights = ordered.map(_._2.toDouble).sum val step = sumWeights / partitions var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions - 1)) { val (key, weight) = ordered(i) cumWeight += weight if (cumWeight >= target) { // Skip duplicate values. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray }
5.关于RangePartitioner和sortByKey实验
RangePartitioner在SortByKey中的应用:
返回的就是一个以RangePartitioner作为分区函数的ShuffledRDD
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
以下是做的有关RangePartition和SortByKey的实验:
自己实现的sortByKey