zoukankan      html  css  js  c++  java
  • Spark源码系列:RDD repartition、coalesce 对比

    在上一篇文章中 Spark源码系列:DataFrame repartition、coalesce 对比 对DataFrame的repartition、coalesce进行了对比,在这篇文章中,将会对RDD的repartition、coalesce进行对比。

    RDD重新分区的手段与DataFrame类似,有repartition、coalesce两个方法

    repartition

    • def repartition(numPartitions: Int): JavaRDD[T]
     1   /**
     2    * Return a new RDD that has exactly numPartitions partitions.
     3    *
     4    * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
     5    * a shuffle to redistribute data.
     6    *
     7    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
     8    * which can avoid performing a shuffle.
     9    */
    10   def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions)  

    返回一个新的RDD,该RDD恰好具有numPartitions分区。

    repartition这个方法可以增加或减少此RDD中的并行度。在内部,这使用shuffle来重新分配数据。

    如果要减少RDD中的分区数量,请考虑使用“coalesce”,这样可以避免执行shuffle。 

    这个方法在org.apache.spark.api.java.JavaRDD里面

    真正调用的是org.apache.spark.rdd.RDD里面的repartition 

     1   /**
     2    * Return a new RDD that has exactly numPartitions partitions.
     3    *
     4    * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
     5    * a shuffle to redistribute data.
     6    *
     7    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
     8    * which can avoid performing a shuffle.
     9    */
    10   def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    11     coalesce(numPartitions, shuffle = true)
    12   } 

    从上面可以看出,在此处还不是方法最终的,还调用了coalesce(numPartitions, shuffle = true) 这个方法,这个方法实现如下:

     1   /**
     2    * Return a new RDD that is reduced into `numPartitions` partitions.
     3    *
     4    * This results in a narrow dependency, e.g. if you go from 1000 partitions
     5    * to 100 partitions, there will not be a shuffle, instead each of the 100
     6    * new partitions will claim 10 of the current partitions.
     7    *
     8    * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
     9    * this may result in your computation taking place on fewer nodes than
    10    * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
    11    * you can pass shuffle = true. This will add a shuffle step, but means the
    12    * current upstream partitions will be executed in parallel (per whatever
    13    * the current partitioning is).
    14    *
    15    * Note: With shuffle = true, you can actually coalesce to a larger number
    16    * of partitions. This is useful if you have a small number of partitions,
    17    * say 100, potentially with a few partitions being abnormally large. Calling
    18    * coalesce(1000, shuffle = true) will result in 1000 partitions with the
    19    * data distributed using a hash partitioner.
    20    */
    21   def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
    22       : RDD[T] = withScope {
    23     if (shuffle) {
    24       /** Distributes elements evenly across output partitions, starting from a random partition. 注意,键的哈希代码就是键本身。HashPartitioner将用分区的总数对它进行修改。*/
    25       val distributePartition = (index: Int, items: Iterator[T]) => {
    26         var position = (new Random(index)).nextInt(numPartitions)
    27         items.map { t =>
    28           // Note that the hash code of the key will just be the key itself. The HashPartitioner
    29           // will mod it with the number of total partitions.
    30           position = position + 1
    31           (position, t)
    32         }
    33       } : Iterator[(Int, T)]
    34 
    35       // include a shuffle step so that our upstream tasks are still distributed 包含一个shuffle步骤,以便我们的上游任务仍然是分布式的。
    36       new CoalescedRDD(
    37         new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
    38         new HashPartitioner(numPartitions)),
    39         numPartitions).values
    40     } else {
    41       new CoalescedRDD(this, numPartitions)
    42     }
    43   } 

    这个方法返回一个新的RDD,它被简化为"numpartition"分区。

    这导致了一个狭窄的依赖关系,例如,如果从1000个分区到100个分区,将不会有一个shuffle,而是100个新分区中的每一个都会声明10个当前分区。

    然而,如果你正在做一个剧烈的合并,例如当numPartitions = 1时,这可能导致您的计算发生在比您期待的更少的节点上(例如numpartition=1的情况下只有一个节点),即可能导致并行度下降,无法充分利用分布式环境的优势。

    为了避免这种情况,可以传递shuffle = true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。

    注意:使用shuffle = true,您实际上可以合并到更多的分区。

    如果您有少量的分区(比如100个),可能有一些分区非常大,那么这是非常有用的,调用coalesce(1000, shuffle = true)将产生1000个分区,使用散列分区器分发数据。

    从上面的源码可以看到,def repartition(numPartitions: Int): JavaRDD[T] 其实调用的是coalesce(numPartitions, shuffle = true)这个方法,而且这个方法产生shuffle操作,分区的规则采用的个是哈希分区。

    coalesce

    • def coalesce(numPartitions: Int): JavaRDD[T]
    1  
    2  /**
    3    * Return a new RDD that is reduced into `numPartitions` partitions.
    4    */
    5   def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions) 

    而这个方法调用的是org.apache.spark.rdd.RDD里面的def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T]。

    这个方法和上面repartitions的是一样的,只不过此处的shuffle参数是默认的false。

    真正调用的是new CoalescedRDD(this, numPartitions)此时不会触发shuffle。

    • def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]
    1 /**
    2    * Return a new RDD that is reduced into `numPartitions` partitions.
    3    */
    4   def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] =
    5     rdd.coalesce(numPartitions, shuffle) 

    这个和上面的coalesce(numPartitions: Int)类似,只是此处的shuffle参数不再是默认的false,而是自己指定的了,当shuffletrue是会触发shuffle,反之不会。

    演示

     1 scala> var rdd1=sc.textFile("hdfs://file.txt")
     2 rdd1: org.apache.spark.rdd.RDD[String] = hdfs://file.txt MapPartitionsRDD[20] at textFile at <console>:27
     3 
     4 //默认分区数量为177
     5 scala> rdd1.partitions.size
     6 res12: Int = 177
     7 
     8 //调用coalesce(10) 减少分区数量
     9 scala> var rdd2 = rdd1.coalesce(10)
    10 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[21] at coalesce at <console>:29
    11 
    12 //分区数量减少到10个
    13 scala> rdd2.partitions.size
    14 res13: Int = 10
    15 
    16 //直接增加分区数量到200
    17 scala> var rdd2 = rdd1.coalesce(200)
    18 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[22] at coalesce at <console>:29
    19 
    20 //方法没有生效
    21 scala> rdd2.partitions.size
    22 res14: Int = 177
    23 
    24 //将shuffle设置为true,增加分区到200
    25 scala> var rdd2 = rdd1.coalesce(200,true)
    26 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at coalesce at <console>:29
    27 
    28 //重新分区生效
    29 scala> rdd2.partitions.size
    30 res15: Int = 200
    31 
    32 ------------------------------------------------------------------------------------------------
    33 //对于repartition增加分区到200
    34 scala> var rdd2 = rdd1.repartition 直接增加o(200)
    35 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[30] at repartition at <console>:29
    36 
    37 //增加分区生效
    38 scala> rdd2.partitions.size
    39 res16: Int = 200
    40 
    41 //对于repartition减少分区到10
    42 scala> var rdd2 = rdd1.repartition(10)
    43 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[34] at repartition at <console>:29
    44 
    45 //减少分区生效
    46 scala> rdd2.partitions.size
    47 res17: Int = 10 

    总结

    • coalesce(numPartitions: Int)

    当新的分区数小于原来的分区时,分区生效切并且不会触发shuffle

    当新的分区数大于原来的分区时,分区无效还是原来的数量。

    • coalesce(numPartitions: Int, shuffle: Boolean)

    shuffletrue时候,无论新的分区比原来的大还是小,分区均生效,并且触发shuffle操作,此时等同于repartition(numPartitions: Int)

    shufflefalse时候,等同于coalesce(numPartitions: Int)

    • def repartition(numPartitions: Int)

    无论新的分区比原来的大还是小,分区均生效,并且触发shuffle操作;

    很明显repartition就是当shuffletrue时候的coalesce(numPartitions: Int, shuffle: Boolean)方法。

    此为本人学习工作总结,转载请注明出处!!!!

  • 相关阅读:
    美化滚动条
    js 格式转化
    vue 实现 前端生成随机验证码
    Vue.js CLI4 Vue.config.js标准配置
    在鼠标右键 新建 添加md文件
    节流和防抖
    关于IE 浏览器 GET 请求缓存问题
    VSCode 背景插件
    Java后台开发Tomcat添加https支持小程序开发过程
    InnoDB与MyISAM等存储引擎对比
  • 原文地址:https://www.cnblogs.com/lillcol/p/9889162.html
Copyright © 2011-2022 走看看