zoukankan      html  css  js  c++  java
  • RDD关键性能考量之 并行度

    《Spark快速大数据分析》 

    8.4 关键性能考量

    并行度

    RDD的逻辑表示其实是一个对象的集合。在物理执行期间,RDD会被分为一系列的分区,

    每个分区都是整个数据的子集。当Spark调度并运行任务时,Spark会为每个分区中的数据

    创建出一个任务,该任务在默认情况下会需要集群中的一个计算节点来执行。

    Spark也会针对RDD直接自动推断出合适的并行度,这对于大多数用例来说已经足够了。

    输入RDD一般会根据其底层的存储系统选择并行度。例如,从HDFS上读数据的输入RDD

    会为数据在HDFS上的每个文件区块创建一个分区。从数据混洗后的RDD派生下来的RDD

    则会采用与其父RDD相同的并行度。

    并行度会从两个方面影响程序的性能。

    首先,当并行度过低时,Spark集群会出现资源闲置的情况

    比如,假设你的应用有1000个可使用的计算节点,但所运行的步骤只有30个任务,你就应该提高并行度

    来充分利用更多的计算节点。

    并行度过高时,每个分区产生的间接开销累计起来会更大。评判并行度是否过高的标准包括

    任务是否是几乎在瞬间(毫秒级)完成的,或者是否观察到任务没有读写任务数据

    Spark提供了两种方法来对操作的并行度进行调优。

    第一种方法是在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度

    第二种方法是对于任何已有的RDD,可以进行重新分区来获取更多或者更少的分区数。

    重新分区操作通过repartition()实现,该操作会把RDD随机打乱并分成设定的分区数目。

    如果你确定要减少分区数,可以使用coalesce()操作。由于没有打乱数据,该操作比repartition()更为高效。

    如果你认为当前的并行度过高或者过低,可以利用这些方法对分区重新调整。

    举个栗子,假设我们从S3上读取了大量数据,然后马上进行fileter()操作筛选调数据集合中的

    大部分数据。默认情况下,filter()返回的RDD的分区数和其父RDD一样,这样会产生很多的空分区

    或者只有少量数据的分区。这时,可以通过 合并得到分区数更少的RDD来提高应用的性能。

     
    def testCoalesce = {
    
        val conf = new SparkConf().setMaster("local").setAppName("testCoalesce")
    
        val sc = new SparkContext(conf)
    
        val input = sc.parallelize(1 to 9999, 1000)
    
        logger.warn(s"RDD[input] partitionCount[${input.partitions.length}]")
    
     
    
        val test = input.filter { x => x % 2015 == 0  }
    
        logger.warn(s"RDD[test]  partitionCount[${test.partitions.length}]")
    
     
    
        val test2 = test.coalesce(2, true).cache()
    
        logger.warn(s"RDD[test2] partitionCount[${test2.partitions.length}]")
    
     
    
        val result = test2.collect()
    
     
    
        logger.warn(s"result [${result.mkString(",")}]")
    
     
    
        Thread.sleep(Int.MaxValue)
    
      }
    

      

    执行结果

    00:47:21 831 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:19): RDD[input] partitionCount[1000]
    00:47:22 009 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:22): RDD[test]  partitionCount[1000]
    00:47:22 122 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:25): RDD[test2] partitionCount[2]
     
    [Stage 0:===>                                                   (58 + 1) / 1000]
    [Stage 0:=====>                                                 (95 + 1) / 1000]
    [Stage 0:=======>                                              (131 + 1) / 1000]
    [Stage 0:============>                                        (238 + 24) / 1000]
    [Stage 0:============>                                        (243 + 19) / 1000]
    [Stage 0:================>                                     (314 + 1) / 1000]
    [Stage 0:=================>                                    (330 + 1) / 1000]
    [Stage 0:=====================>                                (390 + 1) / 1000]
    [Stage 0:=======================>                              (443 + 1) / 1000]
    [Stage 0:===========================>                          (500 + 2) / 1000]
    [Stage 0:==============================>                       (557 + 1) / 1000]
    [Stage 0:=================================>                    (618 + 1) / 1000]
    [Stage 0:===================================>                  (662 + 1) / 1000]
    [Stage 0:=======================================>              (724 + 1) / 1000]
    [Stage 0:==========================================>           (791 + 1) / 1000]
    [Stage 0:==============================================>       (855 + 1) / 1000]
    [Stage 0:================================================>     (895 + 1) / 1000]
    [Stage 0:===================================================>  (953 + 1) / 1000]
     
    00:47:30 466 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:29): result [2015,4030,6045,8060]
    打开http://localhost:4040/jobs/,可以看到任务执行统计。
     
  • 相关阅读:
    Java Web系统经常使用的第三方接口
    ExtJS笔记--applyTo和renderTo的差别
    ORACLE触发器具体解释
    java多线程样例
    RapeLay(电车之狼R)的结局介绍 (隐藏结局攻略)
    排序——选择排序
    常见hash算法的原理
    jdk和jre是什么?都有什么用?(转帖)
    Ubuntu下deb包的安装方法
    參加《全流程全要素的研发项目管理》培训记录与心得
  • 原文地址:https://www.cnblogs.com/ihongyan/p/4976414.html
Copyright © 2011-2022 走看看