1. 写在前面
之前零散的写了一些spark在某一块的性能优化,比如sparkstreaming的性能优化,参数优化,sparkSQL的优化。本篇博文针对spark一些基本的核心优化做一个介绍分享,当然这里的介绍适合rdd,sparkstreaming,sparkSQL等。当然个人认为不管什么样的优化方案和方式都只是为了提供一个优化参考。具体实际的业务中,优化还是得看具体的实际的情况。还是引用某位大神的一句话,盲目暴力的优化行为都是在耍流氓。
2. 常用参数说明
--driver-memory 4g:driver内存大小,一般没有广播变量(broadcast)时,设置4g足够,
如果有广播变量,视情况而定,可设置6G,8G,12G等均可
--executor-memory 4g:每个executor的内存,正常情况下是4g足够,但有时处理大批量数据时容易内存不足,再多申请一点,如6G
--num-executors 15:总共申请的executor数目,普通任务十几个或者几十个足够了,
若是处理海量数据如百G上T的数据时可以申请多一些,100,200等
--executor-cores 2:每个executor内的核数,即每个executor中的任务task数目,此处设置为2,即2个task共享上面设置的6g内存,
每个map或reduce任务的并行度是executor数目*executor中的任务数yarn集群中一般有资源申请上限,
如,executor-memory*num-executors<400G 等,所以调试参数时要注意这一点
--spark.default.parallelism 200:Spark作业的默认为500~1000个比较合适,如果不设置,
spark会根据底层HDFS的block数量设置task的数量,这样会导致并行度偏少,资源利用不充分。该参数设为num-executors * executor-cores的2~3倍比较合适。
--spark.storage.memoryFraction 0.6:设置RDD持久化数据在Executor内存中能占的最大比例。默认值是0.6
--spark.shuffle.memoryFraction 0.2:设置shuffle过程中一个task拉取到上个stage的task的输出后,
进行聚合操作时能够使用的Executor内存的比例,默认是0.2,如果shuffle聚合时使用的内存超出了这个20%的限制,多余数据会
被溢写到磁盘文件中去,降低shuffle性能
--spark.yarn.executor.memoryOverhead 1G:executor执行的时候,用的内存可能会超过executor-memory,
所以会为executor额外预留一部分内存,spark.yarn.executor.memoryOverhead即代表这部分内存
3. Spark常用编程建议
1> 避免创建重复的RDD,尽量复用同一份数据。
2> 尽量避免使用shuffle类算子,因为shuffle操作是spark中最消耗性能的地方,reduceByKey、join、distinct、repartition等算子都会触发shuffle操作,尽量使用map类的非shuffle算子
3> 用aggregateByKey和reduceByKey替代groupByKey,因为前两个是预聚合操作,会在每个节点本地对相同的key做聚合,等其他节点拉取所有节点上相同的key时,会大大减少磁盘IO以及网络开销。
4> repartition适用于RDD[V], partitionBy适用于RDD[K, V]
5> mapPartitions操作替代普通map,foreachPartitions替代foreach
6> filter操作之后进行coalesce操作,可以减少RDD的partition数量
7> 如果有RDD复用,尤其是该RDD需要花费比较长的时间,建议对该RDD做cache,若该RDD每个partition需要消耗很多内存,建议开启Kryo序列化机制(据说可节省2到5倍空间),若还是有比较大的内存开销,可将storage_level设置为MEMORY_AND_DISK_SER
8> 尽量避免在一个Transformation中处理所有的逻辑,尽量分解成map、filter之类的操作
9> 多个RDD进行union操作时,避免使用rdd.union(rdd).union(rdd).union(rdd)这种多重union,rdd.union只适合2个RDD合并,合并多个时采用SparkContext.union(Array(RDD)),避免union嵌套层数太多,导致的调用链路太长,耗时太久,且容易引发StackOverFlow
10> spark中的Group/join/XXXByKey等操作,都可以指定partition的个数,不需要额外使用repartition和partitionBy函数
11> 尽量保证每轮Stage里每个task处理的数据量>128M
12> 如果2个RDD做join,其中一个数据量很小,可以采用Broadcast Join,将小的RDD数据collect到driver内存中,将其BroadCast到另外以RDD中,其他场景想优化后面会讲
13> 2个RDD做笛卡尔积时,把小的RDD作为参数传入,如BigRDD.certesian(smallRDD)
14> 若需要Broadcast一个大的对象到远端作为字典查询,可使用多executor-cores,大executor-memory。若将该占用内存较大的对象存储到外部系统,executor-cores=1, executor-memory=m(默认值2g),可以正常运行,那么当大字典占用空间为size(g)时,executor-memory为2*size
,executor-cores=size/m
(向上取整)
15> 如果对象太大无法BroadCast到远端,且需求是根据大的RDD中的key去索引小RDD中的key,可使用zipPartitions以hash join的方式实现,具体原理参考下一节的shuffle过程
16> 如果需要在repartition重分区之后还要进行排序,可直接使用repartitionAndSortWithinPartitions,比分解操作效率高,因为它可以一边shuffle一边排序
4. shuffle性能优化
1> 什么是shuffle操作
spark中的shuffle操作功能:将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join操作,类似洗牌的操作。这些分布在各个存储节点上的数据重新打乱然后汇聚到不同节点的过程就是shuffle过程。
2> 哪些操作中包含shuffle操作
RDD的特性是不可变的带分区的记录集合,Spark提供了Transformation和Action两种操作RDD的方式。Transformation是生成新的RDD,包括map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues等;Action只是返回一个结果,包括collect,reduce,count,save,lookupKey等
Spark所有的算子操作中是否使用shuffle过程要看计算后对应多少分区:
若一个操作执行过程中,结果RDD的每个分区只依赖上一个RDD的同一个分区,即属于窄依赖,如map、filter、union等操作,这种情况是不需要进行shuffle的,同时还可以按照pipeline的方式,把一个分区上的多个操作放在同一个Task中进行
若结果RDD的每个分区需要依赖上一个RDD的全部分区,即属于宽依赖,如repartition相关操作(repartition,coalesce)、 * ByKey操作(groupByKey,ReduceByKey,combineByKey、aggregateByKey等)、join相关操作(cogroup,join)、distinct操作,这种依赖是需要进行shuffle操作的
3> shuffle操作过程
shuffle过程分为shuffle write和shuffle read两部分
shuffle write:分区数由上一阶段的RDD分区数控制,shuffle write过程主要是将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上(当前stage结束之后,每个task处理的数据按key进行分类,数据先写入内存缓冲区,缓冲区满,溢写spill到磁盘文件,最终相同key被写入同一个磁盘文件)创建的磁盘文件数量=当前stage中task数量 * 下一个stage的task数量
shuffle read:从上游stage的所有task节点上拉取属于自己的磁盘文件,每个read task会有自己的buffer缓冲,每次只能拉取与buffer缓冲相同大小的数据,然后聚合,聚合完一批后拉取下一批,边拉取边聚合。分区数由Spark提供的一些参数控制,如果这个参数值设置的很小,同时shuffle read的数据量很大,会导致一个task需要处理的数据非常大,容易发生JVM crash,从而导致shuffle数据失败,同时executor也丢失了,就会看到Failed to connect to host 的错误(即executor lost)。
shuffle过程中,各个节点会通过shuffle write过程将相同key都会先写入本地磁盘文件中,然后其他节点的shuffle read过程通过网络传输拉取各个节点上的磁盘文件中的相同key。这其中大量数据交换涉及到的网络传输和文件读写操作是shuffle操作十分耗时的根本原因
4> spark的shuffle类型
参数spark.shuffle.manager用于设置ShuffleManager的类型。Spark1.5以后,该参数有三个可选项:hash、sort和tungsten-sort。
HashShuffleManager是Spark1.2以前的默认值,Spark1.2之后的默认值都是SortShuffleManager。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
由于SortShuffleManager默认会对数据进行排序,因此如果业务需求中需要排序的话,使用默认的SortShuffleManager就可以;但如果不需要排序,可以通过bypass机制或设置HashShuffleManager避免排序,同时也能提供较好的磁盘读写性能。
HashShuffleManager流程:
SortShuffleManager流程:
5> 如何开启bypass机制
bypass机制通过参数spark.shuffle.sort.bypassMergeThreshold设置,默认值是200,表示当ShuffleManager是SortShuffleManager时,若shuffle read task的数量小于这个阈值(默认200)时,则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式写数据,但最后会将每个task产生的所有临时磁盘文件合并成一个文件,并创建索引文件。
这里给出的调优建议是,当使用SortShuffleManager时,如果的确不需要排序,可以将这个参数值调大一些,大于shuffle read task的数量。那么此时就会自动开启bypass机制,map-side就不会进行排序了,减少排序的性能开销,提升shuffle操作效率。但这种方式并没有减少shuffle write过程产生的磁盘文件数量,所以写的性能没有改变。
6> HashShuffleManager优化建议
如果使用HashShuffleManager,可以设置spark.shuffle.consolidateFiles参数。该参数默认为false,只有当使用HashShuffleManager且该参数设置为True时,才会开启consolidate机制,大幅度合并shuffle write过程产生的输出文件,对于shuffle read task 数量特别多的情况下,可以极大地减少磁盘IO开销,提升shuffle性能。参考社区同学给出的数据,consolidate性能比开启bypass机制的SortShuffleManager高出10% ~ 30%。
7>shuffle调优建议
除了上述的几个参数调优,shuffle过程还有一些参数可以提高性能
--spark.shuffle.file.buffer : 默认32M,shuffle Write阶段写文件
时的buffer大小,若内存资源比较充足,可适当将其值调大一些(如64M),
减少executor的IO读写次数,提高shuffle性能
--spark.shuffle.io.maxRetries :默认3次,Shuffle Read阶段取数据
的重试次数,若shuffle处理的数据量很大,可适当将该参数调大。
9>处理shuffle类操作的注意事项
减少shuffle数据量:在shuffle前过滤掉不必要的数据,只选取需要的字段处理
针对SparkSQL和DataFrame的join、group by等操作:可以通过 spark.sql.shuffle.partitions控制分区数,默认设置为200,可根据shuffle的量以及计算的复杂度提高这个值,如2000等
RDD的join、group by、reduceByKey
等操作:通过spark.default.parallelism
控制shuffle
read与reduce处理的分区数,默认为运行任务的core总数,官方建议为设置成运行任务的core的2~3
倍
提高executor的内存:即spark.executor.memory的值
分析数据验证是否存在数据倾斜的问题:如空值如何处理,异常数据(某个key对应的数据量特别大)时是否可以单独处理,可以考虑自定义数据分区规则,如何自定义可以参考下面的join优化环节
5. join性能优化
Spark所有的操作中,join操作是最复杂、代价最大的操作,也是大部分业务场景的性能瓶颈所在。所以针对join操作的优化是使用spark必须要学会的技能。
spark的join操作也分为Spark SQL的join和Spark RDD的join
5.1 Spark SQL的join操作
5.1.1 Hash Join
Hash Join的执行方式是先将小表映射成Hash Table的方式,再将大表使用相同方式映射到Hash Table,在同一个hash分区内做join匹配。
hash join又分为broadcast hash join和shuffle hash join两种。其中Broadcast hash join,顾名思义,就是把小表广播到每一个节点上的内存中,大表按Key保存到各个分区中,小表和每个分区的大表做join匹配。这种情况适合一个小表和一个大表做join且小表能够在内存中保存的情况。如下图所示:
当Hash Join不能适用的场景就需要Shuffle Hash Join了,Shuffle Hash Join的原理是按照join Key分区,key相同的数据必然分配到同一分区中,将大表join分而治之,变成小表的join,可以提高并行度。执行过程也分为两个阶段:
shuffle阶段:分别将两个表按照join key进行分区,将相同的join key数据重分区到同一节点
hash join阶段:每个分区节点上的数据单独执行单机hash join算法
Shuffle Hash Join的过程如下图所示:
5.1.2 Sort-Merge Join
SparkSQL针对两张大表join的情况提供了全新的算法——Sort-merge join,整个过程分为三个步骤:
Shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式进行处理
sort阶段:对单个分区节点的两表数据,分别进行排序
merge阶段:对排好序的两张分区表数据执行join操作。分别遍历两个有序序列,遇到相同的join key就merge输出,否则继续取更小一边的key,即合并两个有序列表的方式。
sort-merge join流程如下图所示。
5.2 Spark RDD的join操作
Spark的RDD join没有上面这么多的分类,但是面临的业务需求是一样的。如果是大表join小表的情况,则可以将小表声明为broadcast变量,使用map操作快速实现join功能,但又不必执行Spark core中的join操作。
如果是两个大表join,则必须依赖Spark Core中的join操作了。Spark RDD Join的过程可以自行阅读源码了解,这里只做一个大概的讲解。
spark的join过程中最核心的函数是cogroup方法,这个方法中会判断join的两个RDD所使用的partitioner是否一样,如果分区相同,即存在OneToOneDependency依赖,不用进行hash分区,可直接join;如果要关联的RDD和当前RDD的分区不一致时,就要对RDD进行重新hash分区,分到正确的分区中,即存在ShuffleDependency,需要先进行shuffle操作再join。因此提升join效率的一个思路就是使得两个RDD具有相同的partitioners。
所以针对Spark RDD的join操作的优化建议是:
如果需要join的其中一个RDD比较小,可以直接将其存入内存,使用broadcast hash join
在对两个RDD进行join操作之前,使其使用同一个partitioners,避免join操作的shuffle过程
如果两个RDD其一存在重复的key也会导致join操作性能变低,因此最好先进行key值的去重处理
5.3 数据倾斜优化
均匀数据分布的情况下,前面所说的优化建议就足够了。但存在数据倾斜时,仍然会有性能问题。主要体现在绝大多数task执行得都非常快,个别task执行很慢,拖慢整个任务的执行进程,甚至可能因为某个task处理的数据量过大而爆出OOM错误。
5.4 分析数据分布
如果是Spark SQL中的group by、join语句导致的数据倾斜,可以使用SQL分析执行SQL中的表的key分布情况;如果是Spark RDD执行shuffle算子导致的数据倾斜,可以在Spark作业中加入分析Key分布的代码,使用countByKey()统计各个key对应的记录数