zoukankan      html  css  js  c++  java
  • Spark中repartition和coalesce的用法区别及源码分析

    1.reparttion 实际就是强制shuffle的coalesce

     repartition 在spark中源码中实际执行的是: coalesce(numPartitions, shuffle = true)



    * Return a new RDD that has exactly numPartitions partitions.
    * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
    * a shuffle to redistribute data.
    * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
    * which can avoid performing a shuffle.
    */
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
    • 由于强制开启shuffle,所以既可以扩大分区数,也可以缩小分区数量
    • 同样因为开启了shuffle,中间会有写磁盘操作,所以缺点是性能差,优点是相比coalesce不易OOM
    • 只能接受一个Int参数

     2.coalesce默认不开启shuffle
     coalesce 在spark中的源码:  def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null)

    def coalesce(numPartitions: Int, shuffle: Boolean = false,
    partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    (implicit ord: Ordering[T] = null)
    : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
    val distributePartition = (index: Int, items: Iterator[T]) => {
    var position = (new Random(index)).nextInt(numPartitions)
    items.map { t =>
    // Note that the hash code of the key will just be the key itself. The HashPartitioner
    // will mod it with the number of total partitions.
    position = position + 1
    (position, t)
    }
    } : Iterator[(Int, T)]
    
    // include a shuffle step so that our upstream tasks are still distributed
    new CoalescedRDD(
    new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
    new HashPartitioner(numPartitions)),
    numPartitions,
    partitionCoalescer).values
    } else {
    new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
    }
    • coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定
    • 默认不开启shuffle,所以默认情况下只能缩小分区
    • 如果开启了shuffle,则效果等同repartition,使用hash partitioner分区
    • 相比repartition,coalasce还可以传入一个自定义分区器,分区器必须实现serializable序列化

     总结:如果是减少分区, 用coalasce即可,尽量避免 shuffle

  • 相关阅读:
    [NHibernate]第一个NHibernate的应用配置
    [NHibernate]利用LINQPad查看NHibernate生成SQL语句
    [NHibernate]查看NHibernate生成的SQL语句
    Twitter的分布式自增ID雪花算法snowflake (Java版)
    雪花算法:生成分布式全局唯一ID
    数据加密共享与签名方案
    Java 8中处理集合的优雅姿势——Stream
    消息中间件选型分析——从Kafka与RabbitMQ的对比来看全局
    从概念到底层技术,一文看懂区块链架构设计(附知识图谱)
    以太坊源码分析——BlockChain
  • 原文地址:https://www.cnblogs.com/successok/p/14218862.html
Copyright © 2011-2022 走看看