zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Spark(6)Spark Rdd Sort实现原理

    spark 2.1.1

    spark中可以通过RDD.sortBy来对分布式数据进行排序,具体是如何实现的?来看代码:

    org.apache.spark.rdd.RDD

      /**
       * Return this RDD sorted by the given key function.
       */
      def sortBy[K](
          f: (T) => K,
          ascending: Boolean = true,
          numPartitions: Int = this.partitions.length)
          (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
        this.keyBy[K](f)
            .sortByKey(ascending, numPartitions)
            .values
      }
      
      /**
       * Creates tuples of the elements in this RDD by applying `f`.
       */
      def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
        val cleanedF = sc.clean(f)
        map(x => (cleanedF(x), x))
      }
      
      /**
       * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
       * `collect` or `save` on the resulting RDD will return or output an ordered list of records
       * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
       * order of the keys).
       */
      // TODO: this currently doesn't work on P other than Tuple2!
      def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
          : RDD[(K, V)] = self.withScope
      {
        val part = new RangePartitioner(numPartitions, self, ascending)
        new ShuffledRDD[K, V, V](self, part)
          .setKeyOrdering(if (ascending) ordering else ordering.reverse)
      }

    代码比较简单:sort是一个transformation操作,需要定义一个keyBy,即根据什么排序,然后会做一步map,即 item -> (keyBy(item), item),然后定义一个Partitioner,即分区策略(多少个分区,升序降序等),最后返回一个ShuffledRDD;

    ShuffledRDD原理详见 https://www.cnblogs.com/barneywill/p/10158457.html

    这里重点说下RangePartitioner:

    org.apache.spark.RangePartitioner

    /**
     * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
     * equal ranges. The ranges are determined by sampling the content of the RDD passed in.
     *
     * @note The actual number of partitions created by the RangePartitioner might not be the same
     * as the `partitions` parameter, in the case where the number of sampled records is less than
     * the value of `partitions`.
     */
    class RangePartitioner[K : Ordering : ClassTag, V](
        partitions: Int,
        rdd: RDD[_ <: Product2[K, V]],
        private var ascending: Boolean = true)
      extends Partitioner {
    
      // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
      require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
    
      private var ordering = implicitly[Ordering[K]]
    
      // An array of upper bounds for the first (partitions - 1) partitions
      private var rangeBounds: Array[K] = {
        if (partitions <= 1) {
          Array.empty
        } else {
          // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
          val sampleSize = math.min(20.0 * partitions, 1e6)
          // Assume the input partitions are roughly balanced and over-sample a little bit.
          val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
          val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
          if (numItems == 0L) {
            Array.empty
          } else {
            // If a partition contains much more than the average number of items, we re-sample from it
            // to ensure that enough items are collected from that partition.
            val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
            val candidates = ArrayBuffer.empty[(K, Float)]
            val imbalancedPartitions = mutable.Set.empty[Int]
            sketched.foreach { case (idx, n, sample) =>
              if (fraction * n > sampleSizePerPartition) {
                imbalancedPartitions += idx
              } else {
                // The weight is 1 over the sampling probability.
                val weight = (n.toDouble / sample.length).toFloat
                for (key <- sample) {
                  candidates += ((key, weight))
                }
              }
            }
            if (imbalancedPartitions.nonEmpty) {
              // Re-sample imbalanced partitions with the desired sampling probability.
              val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
              val seed = byteswap32(-rdd.id - 1)
              val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
              val weight = (1.0 / fraction).toFloat
              candidates ++= reSampled.map(x => (x, weight))
            }
            RangePartitioner.determineBounds(candidates, partitions)
          }
        }
      }
    
      def numPartitions: Int = rangeBounds.length + 1
    
      private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
    
      def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[K]
        var partition = 0
        if (rangeBounds.length <= 128) {
          // If we have less than 128 partitions naive search
          while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
            partition += 1
          }
        } else {
          // Determine which binary search method to use only once.
          partition = binarySearch(rangeBounds, k)
          // binarySearch either returns the match location or -[insertion point]-1
          if (partition < 0) {
            partition = -partition-1
          }
          if (partition > rangeBounds.length) {
            partition = rangeBounds.length
          }
        }
        if (ascending) {
          partition
        } else {
          rangeBounds.length - partition
        }
      }

    这里会根据partition的数量确定rangeBounds,rangeBounds很像QuickSort中的pivot,

    举例来说:集群现在有10个节点,对1亿数据做排序,partition数量是100,最理想的情况是1亿数据平均分成100份,然后每个节点存放10份,然后各自排序就好,没有数据倾斜;
    但是这个很难实现,要注意的是这里平分的过程实际上也是划分边界的过程,即确定每份的最小值和最大值边界,需要对全部数据遍历统计之后才能精确实现;

    spark中采用的是一种通过对数据采样了解数据分布并最终达到近似精确的方式,具体实现为在从全部数据中采样sampleSize个数据,每个分区采样sampleSizePerPartition个,如果某些分区很大,会追加采样个数,这样保证采样过程尽可能的平均,然后针对采样数据进行探测划分边界,得到rangeBounds,有了rangeBounds之后就可以知道1亿数据中的每一条具体在哪个新的分区;

    还有一个问题:在sort之后如果collect到driver,array数据还会保持排序状态吗?

    org.apache.spark.rdd.RDD

      /**
       * Return an array that contains all of the elements in this RDD.
       *
       * @note This method should only be used if the resulting array is expected to be small, as
       * all the data is loaded into the driver's memory.
       */
      def collect(): Array[T] = withScope {
        val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
        Array.concat(results: _*)
      }

    答案是肯定的;

  • 相关阅读:
    矩阵求导笔记
    Saliency map实现
    lime用法浅析
    LeetCode 989. 数组形式的整数加法
    题解:[P1009 阶乘之和]
    P1008三连击
    vim-2
    c#基础零碎记录
    asp.net core MySQL 數據遷移
    虚函数
  • 原文地址:https://www.cnblogs.com/barneywill/p/10192553.html
Copyright © 2011-2022 走看看