zoukankan      html  css  js  c++  java
  • Spark性能优化(2)——广播变量、本地缓存目录、RDD操作、数据倾斜

    广播变量

    背景

    一般Task大小超过10K时(Spark官方建议是20K),需要考虑使用广播变量进行优化。
    大表小表Join,小表使用广播的方式,减少Join操作。

    参考:Spark广播变量与累加器

    Local Dir

    背景

    shuffle过程中,临时数据需要写入本地磁盘。本地磁盘的临时目录通过参数spark.local.dir配置。

    性能优化点

    spark.local.dir支持配置多个目录。配置spark.local.dir有多个目录,每个目录对应不同的磁盘,这样可以提升IO效率。另外,可以采用IO性能较高的磁盘作为local dir的磁盘。

    注意:

    • 如果使用YARN、Mesos等资源框架,此参数应该通过相应资源框架的参数来设置。
    • 如果只有一个磁盘,配置了多个目录,性能提升不大。

    RDD操作:使用MapPartitions替代Map

    性能优化点

    map方法是对RDD的每一条记录逐一操作。mapPartitons是对整个RDD,以迭代器的方式逐一操作。比如对条记录的开销较大,比如需要连接、断开数据库。使用map方法需要对每一条记录都连接、断开数据库,效率差。此时,可以改用mapPartitons操作,只需要整个Partition连接、断开一次数据库即可。

    1
    rdd.map{x => conn=getDBConn;conn.write(x.toString);conn.close}
    

    改为:

    1
    rdd.mapPartitions(records => conn.getDBConn;for(item <- records) write(item.toString); conn.close)
    

    RDD操作:使用coalesce减小空运行的任务数量

    性能优化点

    • 场景一

    当对RDD进行多次过滤时,可能会形成很多空的、无数据的Partition。通过调用coalesce方法,可以减小Task个数。让有的Task可以同时管理多个Partition。

    • 场景二

    当任务数过多的时候,Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时,可以通过调用coalesce方法,可以减小Task个数,让程序得以继续运行。

    coalesce()方法接受一个参数,为减小后的目标Partition个数。

    RDD操作:collect

    Collect操作会将Executor的数据发送的Driver端。需要确保Driver有足够的内存。Driver的内存通过参数spark.driver.memory参数进行配置。

    RDD操作:使用reduceByKey替代groupByKey

    reduceByKey会在Map端做本地聚合,而groupByKey等Shuffle操作不会再Map端做聚合。 能使用reduceByKey的地方尽量使用该方式,避免出现.groupByKey().map(x=>(x.1,x.2.size))

    • 举例

    对于数据

    2015-05-01 13:00:00,B101,MEILIN
    2015-05-01 10:04:20,B101,GUANLAN
    2015-05-01 09:18:00,F301,MEILIN
    2015-05-01 12:00:00,B107,WUHE
    2015-05-01 18:20:00,F301,WUHE
    2015-05-02 12:00:02,T442,GUANLAN
    2015-05-01 07:00:00,B101,GUANLAN
    2015-05-01 21:31:00,M721,WUHE
    2015-05-01 09:00:00,Z007,MEILIN

    现在要统计各个车牌(第二列)出现的次数,则应使用:

    1
    2
    3
    4
    5
    var dataRDD = sc.textFile("file:///tmp/data.txt")
    var data2RDD = dataRDD.map(s => s.split(","))
    var data3RDD = data2RDD.map( a => (a(1),1) )
    var data4RDD = data3RDD.reduceByKey(_ + _)
    data4RDD.collect
    

    而不是:

    1
    2
    3
    4
    5
    6
    var dataRDD = sc.textFile("file:///tmp/data.txt")
    var data2RDD = dataRDD.map(s => s.split(","))
    var data3RDD = data2RDD.map( a => (a(1),Array(a(0),a(2))) )
    var data4RDD = data3RDD.groupByKey()
    var data5RDD = data4RDD.map(x => (x._1,x._2.size))
    data5RDD.collect
    

    避免数据倾斜

    如何检测数据倾斜?

    现象:没有GC,各Task执行时间严重不一致。

    性能优化点

    • 重新设计key,以更小粒度的key使得Task大小合理化。
    • 有时提升并行度,有助于解决数据倾斜

    作者:明翼(XGogo) 
    出处:http://www.cnblogs.com/seaspring/ 

  • 相关阅读:
    一个简单的makefile,一次性编译本文件夹下所有的cpp文件
    c++ 最短路两种算法
    C++语言十进制数,CDecimal(未完成)
    C语言面向对象的简便方法
    C语言2048
    C图书借还示例
    Javascript 备忘
    原型与原型链
    css3动画-跳动圈
    学习css3动画
  • 原文地址:https://www.cnblogs.com/hd-zg/p/7850314.html
Copyright © 2011-2022 走看看