1.reparttion 实际就是强制shuffle的coalesce
repartition 在spark中源码中实际执行的是: coalesce(numPartitions, shuffle = true)
* Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true)
- 由于强制开启shuffle,所以既可以扩大分区数,也可以缩小分区数量
- 同样因为开启了shuffle,中间会有写磁盘操作,所以缺点是性能差,优点是相比coalesce不易OOM
- 只能接受一个Int参数
2.coalesce默认不开启shuffle
coalesce 在spark中的源码: def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null)
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { new CoalescedRDD(this, numPartitions, partitionCoalescer) } }
- coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定
- 默认不开启shuffle,所以默认情况下只能缩小分区
- 如果开启了shuffle,则效果等同repartition,使用hash partitioner分区
- 相比repartition,coalasce还可以传入一个自定义分区器,分区器必须实现serializable序列化
总结:如果是减少分区, 用coalasce即可,尽量避免 shuffle