zoukankan      html  css  js  c++  java
  • Spark性能优化(2)——广播变量、本地缓存目录、RDD操作、数据倾斜

    广播变量

    背景

    一般Task大小超过10K时(Spark官方建议是20K),需要考虑使用广播变量进行优化。
    大表小表Join,小表使用广播的方式,减少Join操作。

    参考:Spark广播变量与累加器

    Local Dir

    背景

    shuffle过程中,临时数据需要写入本地磁盘。本地磁盘的临时目录通过参数spark.local.dir配置。

    性能优化点

    spark.local.dir支持配置多个目录。配置spark.local.dir有多个目录,每个目录对应不同的磁盘,这样可以提升IO效率。另外,可以采用IO性能较高的磁盘作为local dir的磁盘。

    注意:

    • 如果使用YARN、Mesos等资源框架,此参数应该通过相应资源框架的参数来设置。
    • 如果只有一个磁盘,配置了多个目录,性能提升不大。

    RDD操作:使用MapPartitions替代Map

    性能优化点

    map方法是对RDD的每一条记录逐一操作。mapPartitons是对整个RDD,以迭代器的方式逐一操作。比如对条记录的开销较大,比如需要连接、断开数据库。使用map方法需要对每一条记录都连接、断开数据库,效率差。此时,可以改用mapPartitons操作,只需要整个Partition连接、断开一次数据库即可。

    1
    rdd.map{x => conn=getDBConn;conn.write(x.toString);conn.close}
    

    改为:

    1
    rdd.mapPartitions(records => conn.getDBConn;for(item <- records) write(item.toString); conn.close)
    

    RDD操作:使用coalesce减小空运行的任务数量

    性能优化点

    • 场景一

    当对RDD进行多次过滤时,可能会形成很多空的、无数据的Partition。通过调用coalesce方法,可以减小Task个数。让有的Task可以同时管理多个Partition。

    • 场景二

    当任务数过多的时候,Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时,可以通过调用coalesce方法,可以减小Task个数,让程序得以继续运行。

    coalesce()方法接受一个参数,为减小后的目标Partition个数。

    RDD操作:collect

    Collect操作会将Executor的数据发送的Driver端。需要确保Driver有足够的内存。Driver的内存通过参数spark.driver.memory参数进行配置。

    RDD操作:使用reduceByKey替代groupByKey

    reduceByKey会在Map端做本地聚合,而groupByKey等Shuffle操作不会再Map端做聚合。 能使用reduceByKey的地方尽量使用该方式,避免出现.groupByKey().map(x=>(x.1,x.2.size))

    • 举例

    对于数据

    2015-05-01 13:00:00,B101,MEILIN
    2015-05-01 10:04:20,B101,GUANLAN
    2015-05-01 09:18:00,F301,MEILIN
    2015-05-01 12:00:00,B107,WUHE
    2015-05-01 18:20:00,F301,WUHE
    2015-05-02 12:00:02,T442,GUANLAN
    2015-05-01 07:00:00,B101,GUANLAN
    2015-05-01 21:31:00,M721,WUHE
    2015-05-01 09:00:00,Z007,MEILIN

    现在要统计各个车牌(第二列)出现的次数,则应使用:

    1
    2
    3
    4
    5
    var dataRDD = sc.textFile("file:///tmp/data.txt")
    var data2RDD = dataRDD.map(s => s.split(","))
    var data3RDD = data2RDD.map( a => (a(1),1) )
    var data4RDD = data3RDD.reduceByKey(_ + _)
    data4RDD.collect
    

    而不是:

    1
    2
    3
    4
    5
    6
    var dataRDD = sc.textFile("file:///tmp/data.txt")
    var data2RDD = dataRDD.map(s => s.split(","))
    var data3RDD = data2RDD.map( a => (a(1),Array(a(0),a(2))) )
    var data4RDD = data3RDD.groupByKey()
    var data5RDD = data4RDD.map(x => (x._1,x._2.size))
    data5RDD.collect
    

    避免数据倾斜

    如何检测数据倾斜?

    现象:没有GC,各Task执行时间严重不一致。

    性能优化点

    • 重新设计key,以更小粒度的key使得Task大小合理化。
    • 有时提升并行度,有助于解决数据倾斜

    作者:明翼(XGogo) 
    出处:http://www.cnblogs.com/seaspring/ 

  • 相关阅读:
    中国历史朝代公元对照简表
    [Solved] DashBoard – Excel Service: The data sources may be unreachable, may not be responding, or may have denied you access.
    Delete/Remove Project from TFS 2010
    Sharepoint site showing system account instead of my username on the top right corner.
    你的成功在于你每天养成的习惯
    Internet Information Services is running in 32bit emulation mode. Correct the issue listed above and rerun setup.
    Prepare to back up and restore a farm (Office SharePoint Server 2007)
    Word中字号与磅值的对应关系
    How to: Change the Frequency for Refreshing the Data Warehouse for Team System
    UI Automation in WPF/Silverlight
  • 原文地址:https://www.cnblogs.com/hd-zg/p/7850314.html
Copyright © 2011-2022 走看看