zoukankan      html  css  js  c++  java
  • spark调优篇-数据倾斜(汇总)

    数据倾斜

    为什么会数据倾斜

    spark 中的数据倾斜并不是说原始数据存在倾斜,原始数据都是一个一个的 block,大小都一样,不存在数据倾斜;

    而是指 shuffle 过程中产生的数据倾斜,由于不同的 key 对应的数据量不同导致不同 task 处理的数据量不同

    注意:数据倾斜与数据过量不同,数据倾斜是某几个 task 处理的数据量很大,数据过量是所有 task 处理的数据量都很大

    数据倾斜的表现

    大部分 task 都快速执行完毕,少数 task 执行缓慢,甚至报错 OOM,即使最终运行完毕,也叫数据倾斜

    数据倾斜的后果

    1. 程序执行缓慢

    2. 报错 OOM

    定位数据倾斜问题

    1. 经验:查看代码中的 shuffle 算子,如 reduceByKey、countByKey、groupByKey、join 等,根据代码逻辑判断是否会出现数据倾斜

    2. spark log 文件:log 记录错误发生在代码的哪一行,从而再根据自己的理解定位哪个 shuffle 算子

    3. spark web UI:

    解决方案

    聚合原数据

    避免 shuffle 

    1. 既然数据倾斜是 shuffle 产生的,那没有 shuffle 肯定就没有数据倾斜了;

    2. 没有 shuffle 意味着在 原数据层面已经进行了 “shuffle”

    这种方法适合 原数据方便实现大量数据合并操作 的数据源,确切的说就是 Hive;

    比如在 Hive 中按照 key 进行分组,把每个 key 的 value 拼成一个 字符串(当然也可以是其他操作,根据需求定),那么这种数据进入 spark 后,无需 shuffle 了,直接 map 即可

    缩小 key 粒度

    比如把按 月 统计变成按 周 统计;

    key 的粒度缩小,每个 key 对应的数据量降低,task 处理的数据量减少;

    这样可以保证程序顺利执行,减少 OOM,但是可能引起更大的数据倾斜

    增大 key 粒度

    比如把按 周 统计变成按 月 统计;

    key 的粒度增大,每个 key 对应的数据量增大,task 处理的数据量增大;

    这样可以降低数据倾斜的可能,但是 可能引起 OOM

    过滤导致倾斜的 key

    极端方法,扔掉导致倾斜的 key,不建议

    提高 shuffle 操作的 reduce 并行度

    增加 reduce 并行度其实就是增加 reduce 端 task 的数量, 这样每个 task 处理的数据量减少,避免 oom

    reduce 端并行度设置

    1. 在很多 shuffle 算子中可直接指定并行度,如 reduceByKey(lambda x, y: x+y, 10)

      // 注意如果并行度大于 executor 数 x executor core 数,以小为准

    2. 在 sparkSQL 中 groupBy、join 等 shuffle 操作,需要设定 spark 参数:spark.sql.shuffle.partitions

      // 该参数默认为 200

    这样做是有缺陷的

    map 端不断地写入数据,reduce task 不断地从指定位置读取数据,如果 task 很多,读取的速度增加,但是每个 key 对应的 reduce 处理的总量没变,

    所以它并没有从根本上解决数据倾斜的问题,只是尽量去减少 reduce task 的数据量,适用于 较多 key 对应的数据量都很大的问题;

    试想,如果只有 1 个 key 数据量较大,那么其他 key 高并行就是资源的浪费;

    使用随机 key 进行双重聚合

    双重聚合,就是聚合多次,大致思路为:

    由于一个 key 对应的数据量太大,我先给这个 key 加个随机数,num_key,强行把一个 key 变成 多个 key,这样每个 key 的数据量减小,然后按  num_key 进行聚合,聚合之后,把 num_key 再转回 key,然后对 key 再次聚合;

    把一次聚合变成多次聚合,如图

    这种方法适合于 reduceByKey、groupByKey 等算子,不适合 join 算子

    将 reduce join 变成 map join 

    正常情况下,join 会产生 shuffle 过程,而且是 reduce join,即先将相同 key 对应的 value 汇聚到一个 reduce task 中,再进行 join,如下图

     

    如果其中有一个 RDD 很小,就可以采用 广播小 RDD + map 大 RDD 实现 join 功能,

    此时没有 shuffle 操作,自然不会有数据倾斜

    注意:RDD 是不能广播的,只有将 RDD 通过 collect 拉取到 Driver 内存中才能进行广播

    如果是两个 RDD 都很大,则不适合这种方法 

    sample 采样对倾斜 key 单独进行 join

    思考一个问题,如果 RDD 中只有一个 key,shuffle 如何做?

    在 spark 中,如果只有一个 key,shuffle 过程会将 所有 value 打散,分配到不同的 reduce task 中;

    那么如果某个 RDD 中因为单个 key 导致数据倾斜,我们可以把这个 key 拿出来形成一个新的 RDD1,其他 key 形成一个 RDD2,然后用 RDD1 再与目标 RDD 进行 join 操作,根据 spark 运行机制,他会把 RDD1 打散分配到多个 task 中进行 join 操作;

    这样可以避免因 这个 key 对应的数据量太大导致数据倾斜,甚至 OOM

    如果一个 RDD 中有多个 key 导致数据倾斜,则此法不适用

    使用随机数以及扩容进行 join

    如果有多个 key 导致数据倾斜,无法通过采样确定哪个 key 数据量大,只能把多个 key 都提出来,这样效率貌似不太高;

    此时我们只有一种解决方案:对一个 RDD 扩容,对另一个 RDD 稀释,再进行 join

    具体做法如图

    注意:如果 RDD 过大,进行 随机数扩容后,可能产生 OOM

    repartition

    这个也是较常用的方法,它的本质就是减少 task 处理的数据量,一般发生在 shuffle 之前,当然它本身也是个 shuffle 操作

    总结

    数据倾斜 是 某些 task 处理了大量数据,所以数据倾斜很可能引起 OOM,数据倾斜和 OOM 的某些解决办法可以通用;

    解决数据倾斜的大致思路为:

    1. 避免 shuffle,可在数据输入 spark 之前进行 shuffle,或者 用 map 等替代 shuffle

      // 聚合原始数据、广播小 RDD

    2. 如果避免不了 shuffle,就减少 reduce task 的数据量

      // 缩小 key 粒度、增加 reduce task 数量

      // 通过随机数多次聚合,减少每次聚合的数据量,针对 reduceByKey、groupByKey 等

    3. 其他情况,单个 key 数据量大,多个 key 数据量大,针对 join

  • 相关阅读:
    android-6
    android-5
    android-购物商城
    安卓简易计算器
    安卓第四周作业
    安卓第一周作业
    第十五周作业
    第十三周作业
    第十三周上机练习
    第三次安卓作业
  • 原文地址:https://www.cnblogs.com/yanshw/p/12058160.html
Copyright © 2011-2022 走看看