zoukankan      html  css  js  c++  java
  • Spark中repartition和coalesce的用法区别及源码分析

    1.reparttion 实际就是强制shuffle的coalesce

     repartition 在spark中源码中实际执行的是: coalesce(numPartitions, shuffle = true)



    * Return a new RDD that has exactly numPartitions partitions.
    * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
    * a shuffle to redistribute data.
    * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
    * which can avoid performing a shuffle.
    */
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
    • 由于强制开启shuffle,所以既可以扩大分区数,也可以缩小分区数量
    • 同样因为开启了shuffle,中间会有写磁盘操作,所以缺点是性能差,优点是相比coalesce不易OOM
    • 只能接受一个Int参数

     2.coalesce默认不开启shuffle
     coalesce 在spark中的源码:  def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null)

    def coalesce(numPartitions: Int, shuffle: Boolean = false,
    partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    (implicit ord: Ordering[T] = null)
    : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
    val distributePartition = (index: Int, items: Iterator[T]) => {
    var position = (new Random(index)).nextInt(numPartitions)
    items.map { t =>
    // Note that the hash code of the key will just be the key itself. The HashPartitioner
    // will mod it with the number of total partitions.
    position = position + 1
    (position, t)
    }
    } : Iterator[(Int, T)]
    
    // include a shuffle step so that our upstream tasks are still distributed
    new CoalescedRDD(
    new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
    new HashPartitioner(numPartitions)),
    numPartitions,
    partitionCoalescer).values
    } else {
    new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
    }
    • coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定
    • 默认不开启shuffle,所以默认情况下只能缩小分区
    • 如果开启了shuffle,则效果等同repartition,使用hash partitioner分区
    • 相比repartition,coalasce还可以传入一个自定义分区器,分区器必须实现serializable序列化

     总结:如果是减少分区, 用coalasce即可,尽量避免 shuffle

  • 相关阅读:
    js 正则验证输入框只允许输入正实数和正整数和负整数
    阿里maven镜像服务器配置
    JDK环境变量配置
    AndroidStudio OpenCv的配置,不用安装opencv manager
    Java实现红黑树
    基于红黑树的骨架提取Java
    基于Mat变换的骨架提取Java
    Java实现二叉树的四种遍历
    Java实现常见的几种排序
    hough变换检测直线Java
  • 原文地址:https://www.cnblogs.com/successok/p/14218862.html
Copyright © 2011-2022 走看看