zoukankan      html  css  js  c++  java
  • spark调优篇-oom 优化(汇总)

    spark 之所以需要调优,一是代码执行效率低,二是经常 OOM 

    内存溢出

    内存溢出无非两点

    1. Driver 内存不够

    2. Executor 内存不够

    Driver 内存不够无非两点

    1. 读取数据太大

    2. 数据回传

    Executor 内存不够无非两点

    1. map 类操作产生大量数据,包括 map、flatMap、filter、mapPartitions 等

    2. shuffle 后产生数据倾斜

    Executor 内存不够

    有个通用的解决办法就是增加 Executor 内存

    --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

    但这并不一定是最好的办法

    map 过程产生大量对象

    造成 Executor 内存溢出

    解决思路是减少每个 task 的大小,从而减少每个 task 的输出;

    具体做法是在 会产生大量对象的 map 操作前 添加 repartition(重新分区) 方法,分区成更小的块传入 map

    rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count()      # 100 * 100000000 个对象,内存溢出
    rdd.flatMap(lambda x: len(['%d'%x*50 for _ in range(100000000)])).sum()     # 内存溢出
    
    rdd.repartition(1000000).flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count()   # 可执行

    数据倾斜

    参考我的博客 数据倾斜

    standalone 模式资源分配不均

    该模式下配置了 

    --total-executor-cores NUM  (Total cores for all executors.)   集群 executor 核数

    --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).  每个 executor 内存

    而没有配置

    --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
    or all available cores on the worker in standalone mode)  每个 executor 核数

    假如各个 executor 核数不一样,核数多的 executor 执行的 task 就多,内存就容易溢出

    解决方法是配置参数 --executor-cores,或者是在 spark 中配置 spark.executor.cores

    在 RDD 中共用对象

    rdd = sc.parallelize(range(100))
    def myfunc(x): return x
    rdd.flatMap(lambda x: [('k', 'v') for _ in range(200000000)]).foreach(myfunc)     # 每次生成一个 tuple 对象,内存溢出
    rdd.flatMap(lambda x: ['k'+'v' for _ in range(2000000)]).count()        # 无需生成新的 string 对象,可执行

    tuple 为不可变对象,不过字符串也是可变对象

    此条方法有待进一步验证

    Driver 中需要读取大量数据

    造成 Driver 内存溢出

    解决思路是增加 Driver 内存,具体做法为设置参数

    --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M). 

    示例

    from pyspark import SparkContext
    sc = SparkContext(master='yarn')
    rdd = sc.parallelize(range(300000000))
    # spark-submit --master yarn-client  --driver-memory 512M  driver_oom.py    内存溢出
    # spark-submit --master yarn-client  --driver-memory 3G  driver_oom.py  可以执行

    collect

    大量数据回传 Driver,造成内存溢出

    解决思路是分区输出,具体做法是 foreach

    rdd = sc.parallelize(range(100))
    rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).collect()     # 内存溢出
    
    def func(x): print(x)
    rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).foreach(func) # 分区输出

    或者增加 Driver 内存

    代码优化

    mapPartitions

    1. 批处理

    2. 减少中间输出

    用 mapPartitions 替代多个 map,减少 Executor 内存压力

    from pyspark import SparkContext
    sc = SparkContext(master='yarn')
    data = range(10)
    rdd = sc.parallelize(data, 2)
    
    ##### map
    rdd.map(lambda x: x % 3).filter(lambda x : x>1 ).countByValue().values()        # [3]
    
    ##### mapPartitions
    # 避免了中间 RDD 的产生,节约内存,防止 oom
    def myfunc(datas):
        # datas type is itertools.chain
        for data in datas:
            value = data % 3
            if value > 1:
                yield value
    
    print rdd.mapPartitions(myfunc).countByValue().values()     # [3]
    # spark-submit --master yarn-client mapVSmapPartitions.py   python 只支持 client 模式

    DataFrame 代替 RDD

    任务被划分成多个 stage,在每个 stage 内部,RDD 是无法自动优化的,

    rdd.map(lambda x: x+1).map(lambda x: x+1)   ==  rdd.map(lambda x: x+2)

    如上面两个操作是等价的,但是 RDD 并不会自动优化,

    而 DataFrame 使用 sql 查询,自带 sql 优化器,可自动找到最优方案

    broadcast join

    在分布式计算中,数据跨节点移动是非常影响性能的,网络传输耗时,多次传输消耗内存,broadcast 在某些场景可以减少数据移动;

    如 一个 小RDD 要和 一个 大RDD 进行 join 操作,常规情况下要互传 RDD,由于多个 task,故需多次传输,    【注意必须是有个小 RDD,否则这种做法意义不大,因为后面要遍历这个广播变量】

    如果把 小RDD 变成 broadcast 变量,就不用传输 大RDD,把 broadcast(小RDD) 缓存在对应 Executor 上即可

    对 大RDD 进行 map 操作,在 map 函数中调用 小RDD 的 value,遍历 小RDD

    map(lambda x: i for i in smallRDD.value if x == i)

    filter 之后再 join

    就是所谓的谓词下推,在 sparkSQL 中会自动这么操作;

    如果是自己操作 RDD,可以减少 shuffle 的数据量 

    cache and persist

    缓存 RDD 既可以节省内存,也可以提高性能;

    cahce 是缓存到内存,等同于 persist(Storage.MEMORY_ONLY),在内存不足时,这种缓存方式会丢失数据,再次使用时会重新计算;

    rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 在内存不足时会写到磁盘,避免重复,只是耗费一点 IO 时间

    combineByKey

    在 hadoop 中也有 combine,有 combine 比 没有combine 效率高;

    比如 reduceByKey (combine操作) 就比 groupyByKey (非combine操作) 效率高

    import time
    from pyspark import SparkContext
    
    sc = SparkContext(master='yarn')
    
    strs = list('abcd')*10000000
    rdd = sc.parallelize(strs)
    
    time.clock()
    print rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y).collect()     # combinByKey 操作耗时少3.2s
    # print rdd.map(lambda x: (x, 1)).groupByKey().mapValues(sum).collect()        # 非 combinByKey 操作耗时3.6s
    # 二者结果一样
    print(time.clock())
    
    strs = list('abcd')*10000000
    for i in strs:i = (i, 1)    # 6s,单机for循环做更少的事情,耗时更多

    图解如下

     

    参考资料:

    https://blog.csdn.net/yhb315279058/article/details/51035631

  • 相关阅读:
    非系统表空间损坏,rman备份恢复
    非系统数据文件损坏,rman备份恢复
    开启 控制文件自动备份下,参数文件、控制文件全部丢失恢复
    rman命令详解(三)
    Block Change Tracking (块改变跟踪)
    如何加快建 index 索引 的时间
    RMAN兼容性、控制文件自动备份、保存时间、备份策略、备份脚本(二)
    rman理论(一)
    动态参数与静态参数的判断、修改
    闪回之 Flashback Data Archive
  • 原文地址:https://www.cnblogs.com/yanshw/p/11988347.html
Copyright © 2011-2022 走看看