zoukankan      html  css  js  c++  java
  • dataset的reparation和coalesce

       * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions
       * are requested. If a larger number of partitions is requested, it will stay at the current
       * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in
       * a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not
       * be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
       * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
       * this may result in your computation taking place on fewer nodes than
       * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
       * you can call repartition. This will add a shuffle step, but means the
       * current upstream partitions will be executed in parallel (per whatever
       * the current partitioning is).
       * @group typedrel
       * @since 1.6.0
      def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
        Repartition(numPartitions, shuffle = false, planWithBarrier)







       * Returns a new Dataset that has exactly `numPartitions` partitions.
       * @group typedrel
       * @since 1.6.0
      def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
        Repartition(numPartitions, shuffle = true, planWithBarrier)







       * Return a new RDD that has exactly numPartitions partitions.
       * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
       * a shuffle to redistribute data.
       * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
       * which can avoid performing a shuffle.
       * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
      def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
        coalesce(numPartitions, shuffle = true)
       * Return a new RDD that is reduced into `numPartitions` partitions.
       * This results in a narrow dependency, e.g. if you go from 1000 partitions
       * to 100 partitions, there will not be a shuffle, instead each of the 100
       * new partitions will claim 10 of the current partitions. If a larger number
       * of partitions is requested, it will stay at the current number of partitions.
       * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
       * this may result in your computation taking place on fewer nodes than
       * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
       * you can pass shuffle = true. This will add a shuffle step, but means the
       * current upstream partitions will be executed in parallel (per whatever
       * the current partitioning is).
       * @note With shuffle = true, you can actually coalesce to a larger number
       * of partitions. This is useful if you have a small number of partitions,
       * say 100, potentially with a few partitions being abnormally large. Calling
       * coalesce(1000, shuffle = true) will result in 1000 partitions with the
       * data distributed using a hash partitioner. The optional partition coalescer
       * passed in must be serializable.
      def coalesce(numPartitions: Int, shuffle: Boolean = false,
                   partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
                  (implicit ord: Ordering[T] = null)
          : RDD[T] = withScope {
        require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
        if (shuffle) {
          /** Distributes elements evenly across output partitions, starting from a random partition. */
          val distributePartition = (index: Int, items: Iterator[T]) => {
            var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
            items.map { t =>
              // Note that the hash code of the key will just be the key itself. The HashPartitioner
              // will mod it with the number of total partitions.
              position = position + 1
              (position, t)
          } : Iterator[(Int, T)]
          // include a shuffle step so that our upstream tasks are still distributed
          new CoalescedRDD(
            new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
            new HashPartitioner(numPartitions)),
        } else {
          new CoalescedRDD(this, numPartitions, partitionCoalescer)
  • 相关阅读:
    spark streaming 概述
    spark sql 的性能调优
    LeetCode 106. Construct Binary Tree from Inorder and Postorder Traversal (用中序和后序树遍历来建立二叉树)
    LeetCode 105. Construct Binary Tree from Preorder and Inorder Traversal (用先序和中序树遍历来建立二叉树)
    LeetCode 90. Subsets II (子集合之二)
    LeetCode 88. Merge Sorted Array(合并有序数组)
    LeetCode 81. Search in Rotated Sorted Array II(在旋转有序序列中搜索之二)
    LeetCode 80. Remove Duplicates from Sorted Array II (从有序序列里移除重复项之二)
    LeetCode 79. Word Search(单词搜索)
    LeetCode 78. Subsets(子集合)
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/11570571.html
Copyright © 2011-2022 走看看