zoukankan      html  css  js  c++  java
  • 大数据之性能调优方面(数据倾斜、shuffle、JVM等方面)

    一、对于数据倾斜的发生一般都是一个key对应的数据过大,而导致Task执行过慢,或者内存溢出(OOM),一般是发生在shuffle的时候,比如reduceByKey,groupByKey,sortByKey等,容易产生数据倾斜。

    那么针对数据倾斜我们如何解决呢?我们可以首先观看log日志,以为log日志报错的时候会提示在哪些行,然后就去检查发生shuffle的地方,这些地方比较容易发生数据倾斜。

    其次,因为我们都是测试的,所以都是在client端进行的,也可以观察WebUI,上面也会有所有对应的stage的划分等。

    解决方案:

    ①聚合源数据,我们的数据一般来自Hive表,那么在生成Hive表的时候对数据进行聚合,按照key进行分组,将key所对应的value以另一种方式存储,比如拼接成一个字符串这样的,我们就可以省略groupByKey和reduceByKey的操作,那么我们就避免了shuffle的产生,如果不能完美的拼接成字符串,那我们也至少可以减少数据量,提高一点性能

    ②过滤key操作,这种方式就有点粗暴了,如果你老大允许的话,这也是一种不错的方案。

    ③提高并行度,我们可以通过提高shuffle的reduce的并行度来提高reduce端的task执行数量,从而分担数据压力,但是如果出现之前运行时OOM了,加大了reduce端的task的数量,可以运行了,但是执行时间一长就要放弃这种方案。

    ④双重聚合,用于groupByKey和reduceByKey,比较使用join(hive中join优化也有类似的双重聚合操作设置参数hive.mao.aggr=true和hive.groupby.skewindata=true具体过程这里不做介绍)先加入随机数进行分组,然后前缀去掉在进行分组

    ⑤将reduce的join转换成map的join,如果两个RDD进行join,有一个表比较小的话,可以将小表进行broadcast,这样每个节点都会用一个小表,如果两个表都很大,可以先将两个表按相同的方式进行分区操作,最后合并。虽然map join替换了reduce join ,但这是我们消耗了部分内存换来的,所以我们需要考虑OOM现象。

    ⑥sample抽样分解聚合,也就是说将倾斜的key单独拉出来,然后用一个RDD进行打乱join

    ⑦随机数+扩容表,也就是说通过flatMap进行扩容,然后将随机数打入进去,再进行join,但是这样不能从根本上解决数据倾斜,只能缓解这种现象。

    ⑧如果你所在公司很有钱,可以直接加机器,硬件足够高,这些也就不是问题了。

    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    shuffle方面的性能调优:

    设置map输出文件的合并参数  .set(“spark.shuffle.consolidation”,"true"),默认是不开启的,设置为true,则无法并发执行。

    设置map端的内存缓冲区大小,和reduce端内存的大小,这个主要针对文件过大,导致性能低,但是调优效果不是很明显。  

      map端参数:  .set("spark.shuffle.file.buffer","64k")

      reduce端参数  .set("spark.shuffle.memoryFraction","0.3")

    这两个参数根据我们观察日志的读写文件的多少来调节的,适量调节,如果是stand-alone模式观察4040页面,如果是yarn模式直接进入yarn Ulog日志查看。

    shuffle中有以下几种shuffle,具体情况根据你的业务要求来取舍。hashshuffle+consolidation、sortshuffle、钨丝shuffle(tungsten-sort他里面有自己的内存机制,可以有效的防止内存溢出现象)

    mappartitions的使用必须要慎重!因为mappartition是取出一个分区的数据进行操作,那么如果数据过大我们就有可能造成OOM溢出现象的发生,所以mappartitions的使用应该只能在数据量小的情况下。

    默认shuffle的内存占用为20%,持久化占用60%,根据具体业务调整。

    ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    JVM方面性能调优:

    内存不足的时候会导致minor gc的频繁,导致spark停止工作。

    频繁进行gc的时候,可能有些年轻代里面的对象被回收,但是因为内存性能不足的问题,导致传入了老年代,而如果老年代里面内存溢满,就会进行full gc操作,也就是全局清理机制,这个过程时间会很长,十几秒中至几十分钟,这样就导致了spark的性能变低。

    调优:

    降低cache操作内存的占比,大不了用persist操作,将一部分数据写入磁盘或者是进行序列化操作,减少RDD缓存的内存占用,降低cache操作的内存占用,那么算子函数的内存占比就上去了,可以减少频繁的gc操作,简单来说就是让task执行算子函数的时候拥有更多可用的内存。

    spark.storage.memoryFraction=0.6  cache的默认占用内存是60%。上述已说明

    executor对外内存的设置,如果我们发现shuffle output  file  not  found的错误,那么我们就需要调节一下对外内存了

    (写在spark-submit参数中)--conf  spark.yarn.executor.memoryOverhead=2048(基于yarn模式)

    这个参数不是在sparkContext中调节的,而是在spark-submit中指定的参数,可以防止OOM溢出现象。

    偶尔也有可能出现连接等待超时的现象,因为executor跨接点拉取数据的时候,可能另一个executor正在进行JVM垃圾回收(所有线程停止),导致连接报错,not found,这时候我们就需要调节连接参数,要在spark-submit中指定spark.core.connection.ack.wait.timeout=300设置等待时间稍微长一点即可。

  • 相关阅读:
    机器学习(深度学习)
    机器学习(六)
    机器学习一-三
    Leetcode 90. 子集 II dfs
    Leetcode 83. 删除排序链表中的重复元素 链表操作
    《算法竞赛进阶指南》 第二章 Acwing 139. 回文子串的最大长度
    LeetCode 80. 删除有序数组中的重复项 II 双指针
    LeetCode 86 分割链表
    《算法竞赛进阶指南》 第二章 Acwing 138. 兔子与兔子 哈希
    《算法竞赛进阶指南》 第二章 Acwing 137. 雪花雪花雪花 哈希
  • 原文地址:https://www.cnblogs.com/skyyuan/p/9957487.html
Copyright © 2011-2022 走看看