我们知道 RDD 是分区的,但有时候我们需要重新设置分区数量,增大还是减少需要结合实际场景,还有可以通过设置 RDD 分区数来指定生成的文件的数量
重新分区有两种方法:repartition and coalesce
先看源代码
def repartition(self, numPartitions): """ Return a new RDD that has exactly numPartitions partitions. Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `coalesce`, which can avoid performing a shuffle. >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) >>> sorted(rdd.glom().collect()) [[1], [2, 3], [4, 5], [6, 7]] >>> len(rdd.repartition(2).glom().collect()) 2 >>> len(rdd.repartition(10).glom().collect()) 10 """ return self.coalesce(numPartitions, shuffle=True) def coalesce(self, numPartitions, shuffle=False): """ Return a new RDD that is reduced into `numPartitions` partitions. >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() [[1], [2, 3], [4, 5]] >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() [[1, 2, 3, 4, 5]] """ if shuffle: # Decrease the batch size in order to distribute evenly the elements across output # partitions. Otherwise, repartition will possibly produce highly skewed partitions. batchSize = min(10, self.ctx._batchSize or 1024) ser = BatchedSerializer(PickleSerializer(), batchSize) selfCopy = self._reserialize(ser) jrdd_deserializer = selfCopy._jrdd_deserializer jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle) else: jrdd_deserializer = self._jrdd_deserializer jrdd = self._jrdd.coalesce(numPartitions, shuffle) return RDD(jrdd, self.ctx, jrdd_deserializer)
我们看到 repartition 最终是调用了 coalesce 方法,并且把 coalesce 的参数 shuffle 设置成 True;
所以搞懂了 coalesce,也就搞懂了 repartition
如果是生成一个窄依赖的结果,无需 shuffle,比如 1000个分区重新分成10个分区;
窄依赖:一个父RDD的分区对应一个子RDD的分区,或者多个父RDD的分区对应一个子RDD的分区;
宽依赖:一个父RDD的分区对应多个子RDD的分区;
如果分区数量变化巨大,如设置 numPartition=1,这可能造成运行计算的节点比你想象的少,为了避免这种情况,可以设置 shuffle=True ;
此外,如果需要增加分区数,shuffle 设置成 False 时,并不会进行重分区,只有设置成 True 才可以;
也就是说,repartition 是 特殊的 coalesce,相当于把 coalesce 的参数 shuffle 写死成 True 了
小结一下:
减少分区时,一般无需 shuffle,二者皆可,
增加分区时,需要 shuffle,一般用 repartition,因为方便
参考资料:
https://www.cnblogs.com/fillPv/p/5392186.html