zoukankan      html  css  js  c++  java
  • spark算子篇-repartition and coalesce

    我们知道 RDD 是分区的,但有时候我们需要重新设置分区数量,增大还是减少需要结合实际场景,还有可以通过设置 RDD 分区数来指定生成的文件的数量

    重新分区有两种方法:repartition and coalesce

    先看源代码

    def repartition(self, numPartitions):
            """
             Return a new RDD that has exactly numPartitions partitions.
    
             Can increase or decrease the level of parallelism in this RDD.
             Internally, this uses a shuffle to redistribute data.
             If you are decreasing the number of partitions in this RDD, consider
             using `coalesce`, which can avoid performing a shuffle.
    
             >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
             >>> sorted(rdd.glom().collect())
             [[1], [2, 3], [4, 5], [6, 7]]
             >>> len(rdd.repartition(2).glom().collect())
             2
             >>> len(rdd.repartition(10).glom().collect())
             10
            """
            return self.coalesce(numPartitions, shuffle=True)
    
        def coalesce(self, numPartitions, shuffle=False):
            """
            Return a new RDD that is reduced into `numPartitions` partitions.
    
            >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
            [[1], [2, 3], [4, 5]]
            >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
            [[1, 2, 3, 4, 5]]
            """
            if shuffle:
                # Decrease the batch size in order to distribute evenly the elements across output
                # partitions. Otherwise, repartition will possibly produce highly skewed partitions.
                batchSize = min(10, self.ctx._batchSize or 1024)
                ser = BatchedSerializer(PickleSerializer(), batchSize)
                selfCopy = self._reserialize(ser)
                jrdd_deserializer = selfCopy._jrdd_deserializer
                jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
            else:
                jrdd_deserializer = self._jrdd_deserializer
                jrdd = self._jrdd.coalesce(numPartitions, shuffle)
            return RDD(jrdd, self.ctx, jrdd_deserializer)

    我们看到 repartition 最终是调用了 coalesce 方法,并且把 coalesce 的参数 shuffle 设置成 True;

    所以搞懂了 coalesce,也就搞懂了 repartition

    如果是生成一个窄依赖的结果,无需 shuffle,比如 1000个分区重新分成10个分区;

    窄依赖:一个父RDD的分区对应一个子RDD的分区,或者多个父RDD的分区对应一个子RDD的分区;
    宽依赖:一个父RDD的分区对应多个子RDD的分区;

    如果分区数量变化巨大,如设置 numPartition=1,这可能造成运行计算的节点比你想象的少,为了避免这种情况,可以设置 shuffle=True ;

    此外,如果需要增加分区数,shuffle 设置成 False 时,并不会进行重分区,只有设置成 True 才可以;

    也就是说,repartition 是 特殊的 coalesce,相当于把 coalesce 的参数 shuffle 写死成 True 了

    小结一下:

    减少分区时,一般无需 shuffle,二者皆可,

    增加分区时,需要 shuffle,一般用 repartition,因为方便

    参考资料:

    https://www.cnblogs.com/fillPv/p/5392186.html

  • 相关阅读:
    如何利用FineBI做财务分析
    mysqlbinlog 读取多个文件
    Chapter 13. Miscellaneous PerlTk Methods PerlTk 方法杂项:
    跨越多台haproxy 跳转
    haproxy redirect prefix
    大数据决策领跑零售业
    大数据决策领跑零售业
    perl 实现微信简版
    perl 调用按钮输出到文本框
    Chapter 11. Frame, MainWindow, and Toplevel Widgets 框架,主窗体,顶级部件
  • 原文地址:https://www.cnblogs.com/yanshw/p/12011504.html
Copyright © 2011-2022 走看看