zoukankan      html  css  js  c++  java
  • spark调优--shuffle调优

    1、shuffle 的原理  

      在 spark 中,发生 shuffle 操作主要是以下几个算子:groupByKey、reduceByKey、 countByKey、join,等等。

    (1)Shuffle 原理介绍:

       groupByKey,要把分布在集群各个节点上的数据中的同一个 key,对应的 values,都给集中到一块儿,集中到集群中同一个节点上,更严密一点说,就是 集中到一个节点的一个 executor 的一个 task 中。然后呢,集中一个 key 对应的 values 之后,才能交给我们来进行处理,<key, Iterable<value>>;

      reduceByKey,算子函数去对 values 集合进行 reduce 操作,最后变成一个 value;

      countByKey,需要在一个 task 中,获取到一个 key 对应的所有的 value,然 后进行计数,统计总共有多少个 value;

      join,RDD<key, value>,RDD<key, value>,只要是两个 RDD 中,key 相同 对应的 2 个 value,都能到一个节点的 executor 的 task 中,给我们进行处理。

      Shuffle,一定是分为两个 stage 来完成的。因为这其实是个逆向的过程,不是 stage 决定 shuffle,是 shuffle 决定 stage。

      reduceByKey(_+_),在某个 action 触发 job 的时候,DAGScheduler,会负责 将 job 划分为多个 stage。划分的依据,就是,如果发现有会触发 shuffle 操作的 算子,比如 reduceByKey,就将这个操作的前半部分,以及之前所有的 RDD 和 transformation 操作,划分为一个 stage;shuffle 操作的后半部分,以及后面的, 直到 action 为止的 RDD 和 transformation 操作,划分为另外一个 stage。

      每一个 shuffle 的前半部分 stage 的 task,每个 task 都会创建下一个 stage 的 task 数量相同的文件,比如下一个 stage 会有 100 个 task,那么当前 stage 每个 task 都会创建 100 份文件;会将同一个 key 对应的 values,一定是写入同一个文件中 的;不同节点上的 task,也一定会将同一个 key 对应的 values,写入下一个 stage, 同一个 task 对应的文件中。

      shuffle 的后半部分 stage 的 task,每个 task 都会从各个节点上的 task 写的属 于自己的那一份文件中,拉取 key, value 对;然后 task 会有一个内存缓冲区,然 后会用 HashMap,进行 key, values 的汇聚;(key ,values);
    task 会用我们自己定义的聚合函数,比如 reduceByKey(_+_),把所有 values 进行一对一的累加;聚合出来最终的值。就完成了 shuffle。

      shuffle 前半部分的 task 在写入数据到磁盘文件之前,都会先写入一个一个 的内存缓冲,内存缓冲满溢之后,再 spill 溢写到磁盘文件中。

    (2)Shuffle 过程:

      第一个 stage,每个 task,都会给第二个 stage 的每个 task 创建一份 map 端的输 出文件。 第二个 stage,每个 task,会到各个节点上面去,拉取第一个 stage 每个 task 输出 的,属于自己的那一份文件。

      默认的这种 shuffle 行为,对性能有什么样的恶劣影响呢?

      实际生产环境的条件:(每一个 task 会创建下一个 stage 的 task 数量的文件,例 子中的 stage 并行度为 1000)

      100 个节点(每个节点一个 executor): 100 个 executor 每个 executor:2 个 cpu core 总共 1000 个 task:每个 executor 平均 10 个 task 每个节点,10 个 task,每个节点会输出多少份 map 端文件? 单节点输出:10 * 1000=1 万个文件 总共有多少份 map 端输出文件? 总节点输出:100 * 10000 = 100 万个文件

      分析: shuffle 中的写磁盘的操作,基本上就是 shuffle 中性能消耗最为严重的部分。一 个普通的生产环境的 spark job 的一个 shuffle 环节,会写入磁盘 100 万个文件。 磁盘 IO 对性能和 spark 作业执行速度的影响,是极其惊人和吓人的。

    2、调优方法

    (1)合并 map 端输出文件(旧版本hash shuffle)

      针对旧版本,使用hashshuffle的spark。

      new SparkConf().set("spark.shuffle.consolidateFiles", "true")

      开启 shuffle map 端输出文件合并的机制;默认情况下,是不开启的,就是会发生如上所述 的大量 map 端输出文件的操作,严重影响性能。

      开启了 map 端输出文件的合并机制之后: 第一个 stage,同时就运行 cpu core 个 task,比如 cpu core 是 2 个,并行运行 2 个 task; 每个 task 都创建下一个 stage 的 task 数量个文件; 第一个 stage,并行运行的 2 个 task 执行完以后;就会执行另外两个 task;另外 2 个 task 不会再重新创建输出文件;而是复用之前的 task 创建的 map 端输出文件,将数据写入上一 批 task 的输出文件中。 第二个 stage,task 在拉取数据的时候,就不会去拉取上一个 stage 每一个 task 为自己创 建的那份输出文件了;而是拉取少量的输出文件,每个输出文件中,可能包含了多个 task 给自己的 map 端输出。

      开启了 map 端输出文件合并机制之后,生产环境上的例子,会有什么样的变化? 实际生产环境的条件: 100 个节点(每个节点一个 executor): 100 个 executor 每个 executor:2 个 cpu core 总共 1000 个 task:每个 executor 平均 10 个 task // 上一个 stage 的每一个 task 会创建生成下一个 stage 的并行度 task 数的文件 每个节点,2 个 cpu core,有多少份输出文件呢?2 * 1000 = 2000 个(文件复用生效,创建文 件嫌少了,原来是总共创建 10 个) 总共 100 个节点,总共创建多少份输出文件呢?100 * 2000 = 20 万个文件 相比较开启合并机制之前的情况,100 万个 map 端输出文件,在生产环境中,立减 5 倍!

      合并 map 端输出文件,对咱们的 spark 的性能有哪些方面的影响呢?

      1、map task 写入磁盘文件的 IO,减少:100 万文件 -> 20 万文件

       2、第二个 stage,原本要拉取第一个 stage 的 task 数量份文件,即 1000 个 task,第二个 stage 的每个 task,都要拉取 1000 份文件,走网络传输;合并以后,100 个节点,每个节点 2 个 cpu core,第二个 stage 的每个 task,主要拉取 100 * 2 = 200 份文件即可;网络传输的性能消 耗是不是也大大减少

      3、分享一下,实际在生产环境中,使用了 spark.shuffle.consolidateFiles 机制以后,实际的性 能调优的效果:对于上述的这种生产环境的配置,性能的提升,还是相当的客观的。spark 作业,5 个小时 -> 2~3 个小时。

    (2)shuffle 调优之 map 端内存缓存与 reduce 端内存占比(已弃用,需要额外开启其他参数支持)

      spark.shuffle.file.buffer,默认 32k

      spark.shuffle.memoryFraction,0.2  (spark统一内存管理下,参数已经弃用)

      默认,map 端内存缓冲是每个 task,32kb。

      默认,reduce 端聚合内存比例,是 0.2,也就是 20%。

      默认情况下,shuffle 的 map task,输出到磁盘文件的时候,统一都会先写入每个 task 自己 关联的一个内存缓冲区。这个缓冲区大小,默认是 32kb。每一次,当内存缓冲区满溢之后, 才会进行 spill 操作,溢写操作,溢写到磁盘文件中去。

    spark.shuffle.file.buffer 32k Size of the in-memory buffer for each shuffle file output stream, in KiB unless otherwise specified. These buffers reduce the number of disk seeks and system calls made in creating intermediate shuffle files.

    spark.shuffle.memoryFraction 0.2 (deprecated) This is read only if spark.memory.useLegacyMode is enabled. Fraction of Java heap to use for aggregation and cogroups during shuffles. At any given time, the collective size of all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction.

      默认情况下,shuffle 的 map task,输出到磁盘文件的时候,统一都会先写入每个 task 自己 关联的一个内存缓冲区。这个缓冲区大小,默认是 32kb。每一次,当内存缓冲区满溢之后, 才会进行 spill 操作,溢写操作,溢写到磁盘文件中去。

      如果 map 端的 task,处理的数据量比较大,可能会出现什么样的情况?

      每个 task 就处理 320kb,32kb,总共会向磁盘溢写 320 / 32 = 10 次。 每个 task 处理 32000kb,32kb,总共会向磁盘溢写 32000 / 32 = 1000 次。

      在 map task 处理的数据量比较大的情况下,而你的 task 的内存缓冲默认是比较小的,32kb。 可能会造成多次的 map 端往磁盘文件的 spill 溢写操作,发生大量的磁盘 IO,从而降低性能。

      如果数据量比较大,reduce 端聚合时可能会出现什么样的情况? reduce 端聚合内存,占比。默认是 0.2。如果数据量比较大,reduce task 拉取过来的数据很 多,那么就会频繁发生 reduce 端聚合内存不够用,频繁发生 spill 操作,溢写到磁盘上去。 而且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操作的时候,很可能会多次读 取磁盘中的数据,进行聚合。默认不调优,在数据量比较大的情况下,可能频繁地发生 reduce 端的磁盘文件的读写。

      生产调优过程: 在实际生产环境中,我们在什么时候来调节两个参数?

      看 Spark UI,如果是 standalone 模式,通过 Spark UI 的地址,4040 的端口,进去看,依次 点击进去,可以看到每个 stage 的详情,有哪些 executor,有哪些 task,每个 task 的 shuffle write 和 shuffle read 的量,shuffle 的磁盘和内存,读写的数据量;如果是 yarn 模式来提交,课程 最前面,从 yarn 的界面进去,点击对应的 application,进入 Spark UI,查看详情。

      如果发现 shuffle 磁盘的 write 和 read 很大。这个时候,就意味着最好调节一些 shuffle 的 参数。进行调优。首先当然是考虑开启 map 端输出文件合并机制。

      调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍, 然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高 0.1,看看效果。

      调节了以后,map task 内存缓冲变大了,减少 spill 到磁盘文件的次数;reduce 端聚合内存变 大了,减少 spill 到磁盘的次数,而且减少了后面聚合读取磁盘文件的数量。(不能调节的太 大)

    (3)shuffle 调优之 HashShuffleManager 与 SortShuffleManager,若非业务需要尽量使用bypass机制的sortShuffle

      spark.shuffle.manager:hash、sort、tungsten-sort(自己实现内存管理) spark.shuffle.sort.bypassMergeThreshold:200 (输出文件小于等于 200 的;最后只会将 所有的输出文件合并为一份文件,并不会进行 sort 操作)

      SortShuffleManager 与 HashShuffleManager 两点不同:

      1、SortShuffleManager 会对每个 reduce task 要处理的数据,进行排序(默认的)。

      2、SortShuffleManager 会避免像 HashShuffleManager 那样,不会去创建多份磁盘文件。一 个 task,只会写入一个磁盘文件,不同 reduce task 的数据,用 offset 来划分界定。

      3、自己可以设定一个阈值,默认是 200,当 reduce task 数量少于等于 200;map task 创建 的输出文件小于等于 200 的;最后只会将所有的输出文件合并为一份文件,并不会进行 sort 操作。这样做的好处,就是避免了 sort 排序,节省了性能开销。而且还能将多个 reduce task 的文件合并成一份文件。节省了 reduce task 拉取数据的时候的磁盘 IO 的开销。

      钨丝 sort shuffle manager,效果跟 sort shuffle manager 是差不多的。 但是,唯一的不同之处在于,钨丝 manager,是使用了自己实现的一套内存管理机制,性能 上有很大的提升, 而且可以避免 shuffle 过程中产生的大量的 OOM,GC,等等内存相关的 异常。
    hash、sort、tungsten-sort。如何来选择?

      1、需不需要数据默认就让 spark 给你进行排序?就好像 mapreduce,默认就是有按照 key 的 排序。如果不需要的话,其实还是建议搭建就使用最基本的 HashShuffleManager,因为最开 始就是考虑的是不排序,换取高性能;

      2、什么时候需要用 sort shuffle manager?如果你需要你的那些数据按 key 排序了,那么就选 择这种吧,而且要注意,reduce task 的数量应该是超过 200 的,这样 sort、merge(多个文件 合并成一个)的机制,才能生效把(否则 reduce task 的数量少于等于 200,只会将多个文 件合并成一个,而不会进行 sort 操作)。但是这里要注意,你一定要自己考量一下,有没有 必要在 shuffle 的过程中,就做这个事情,毕竟对性能是有影响的。

    spark.shuffle.manager:hash、sort(默认就是)、tungsten-sort

    new SparkConf().set("spark.shuffle.manager", "hash")

    new SparkConf().set("spark.shuffle.manager", "tungsten-sort")

    // 默认就是

    new SparkConf().set("spark.shuffle.manager", "sort")

    new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")

  • 相关阅读:
    事务隔离级别,数据库存储过程,Mysql视图,Mysql语句
    Spring注解; Spring Bean
    Java Thread Api
    Java 年轻代、年老代、GC
    Java 线程同步方式
    HashMap
    ArrayList
    安装zabbix环境
    线上应用——高内存占用
    Python入门笔记
  • 原文地址:https://www.cnblogs.com/guoyu1/p/13803720.html
Copyright © 2011-2022 走看看