在做spark作业调优的方向应该首先考虑资源分配和任务并行度,当我们资源足够的情况下,再采取考虑其他边边角角的调优
一、分配更多的资源
1、分配哪些资源 cpu,memory。
2、怎么分配
1)、在给driver分配内存的时候,因为driver只是做任务的提交,所以内存一般不需要太大,通常给1-2G即可。
2)、在给executor分配内存的时候,需要根据任务计算数据大小,同时还需要考虑是否对rdd进行了持久化,在executor中有20%的内存使用来执行task,60%内存是用来保存rdd数据,20%的内存
是用来拉取上一个stage计算完的结果即shuffle read
3)、cpu---->一个core决定同一时间能够执行的任务数量,在给executor分配core的时候,一般不要超过集群的总的core的1/3,executor分配的core越多,意味着同一时间并行执行的task数量就越多,
这样就大大提高了作业的运行效率。但是我们这里要注意并不是executor的core越多越好,还是需要根据任务的实际情况进行分配。
3、分配之后的效果
1)、给executor分配的内存越大,可以减少任务频繁GC,因为一旦GC我们的作业性能会立马下降。
2)、给executor分配的内存越大,可以减少RDD在做持久化的时候将部分数据写入到磁盘,提高我们的作业的性能。
3)、给executor分配的内存越大,意味着在shuffle阶段,拉取上一个stage的数据时,也会减少shuffle数据的落地,提高作业性能。
4)、给executor分配的内存越大,意味着同一时间并发执行task数量就多
二、调节任务并行度
如果分配的资源足够,但是你的任务并行度不够,那么性能还是上不来,并且浪费资源。这种是典型的运算资源和task任务不匹配的状况。
假如,给任务分配了10个executor,每个executor有5个core,那么这个任务总共有50个core,也就是统一时间能够并行执行50个task,假如你的任务只有30个task,30个task全部进行分配的话,也就20core会浪费掉。
针对以上情况,我们应该合理的调整每个阶段stage任务task的数量,spark官方建议stage阶段的任务数量为500-1000
怎么设置呢?可以通过这个参数来调整 sparkConf.set("spark.default.parallelism","500")
我们在代码中还可以动态的去调整并行度。
coalesce
reduceBykey(_+_,partition)
三、对公用的RDD进行持久化
1、概念
一个RDD,执行多次操作,每次操作都需要计算得到这个RDD,
持久化就是第一次计算的时候,把这个执行多次的RDD持久化到内存或磁盘,这个RDD就只计算一次,以后的多次操作都只需要从内存或磁盘读取这个RDD就可以了
那么我们就不需要多次计算同一个RDD,从而在很多场景下,可以大幅度提升我们spark应用程序的性能
官方文档说,合理使用RDD持久化机制,甚至可以提升spark应用程序的性能,10倍
2、实现持久化的方法
RDD.cache() cache()的底层就是调用了persist()方法的无参版本
RDD.persist(StorageLevel.MEMORY_ONLY) MEMORY_ONLY----持久化的级别
3、持久化级别
spark提供的多种持久化级别,主要是为了在cpu和内存消耗之间进行取舍
(1)、MEMORY_ONLY,persist()方法默认的策略,也是持久化优先选择的策略,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗cpu进行反序列化操作。
(2)、MEMORY_ONLY_SER,这种策略如果缓存不了所有数据的话,会进行序列化存储,纯内存操作还是非常快的,只是要消耗cpu进行反序列化
(3)、后缀为_2,这种策略会进行数据的备份,可以进行快速的失败恢复,这样在失败时,就不需要重新计算了
(4)、DISK,持久化到磁盘,能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次
四、广播大变量
1、广播变量的使用场景
如果在rdd中使用了外部变量,那么每个task都会拷贝外部变量的一个副本到executor上去执行。
例如,一个job会产生100个task,这100个task加入都发送到同一个executor上去执行,那么意味着每个task都需要带一个外部变量的副本到executor上执行,如果这个外部变量大小是1G,就意味着总共会有100G的数据通过网络传输到executor上执行,这样会大大
消耗网络流量和浪费executor的内存,降低Spark的运行效率,增加了spark作业的运行时间
2、使用广播变量的优点
如果把外部变量做成广播变量,同样的一个job产生的100个task,发送到executor上执行,此时spark会将这个广播变量保存到每个executor的blockManager中,那么当task执行的时候就不需要从driver上拉取这个外部变量,这样的话会大大降低传输到executor上
的数据(100G--->1G),降低网络传输流量和降低executor的内存消耗
3、广播变量的运行机制
(1)、当executor上的第一个task运行的时候,首先去executor的blockManager上获取这个广播变量,如果不存在,会去邻居的executor的blockManager中获取,如果都获取不到,就会去Driver端拉取这个广播变量,然后将其保存到executor的blockManager中
(2)、当executor在执行之后的task任务时,则直接从executor的blockManager中获取这个广播变量
4、注意
广播变量是只读的,不可进行修改
五、使用Kryo序列化
默认情况下,Spark内部是使用Java的序列化机制,ObjectOutputStream/ObjectInputStream,对象输入输出流机制,来进行序列化,但是默认的序列化机制的效率不高,序列化的速度比较慢,序列化以后的数据,占用的内存空间相对
还是比较大的。
Spark支持使用Kryo序列化机制,Kryo序列化机制,比默认的Jave序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。
所以Kryo序列化优化以后,可以让网络传输的数据变小,在集群中耗费的内存资源大大减少。
Kryo序列化机制,一旦启用后,会生效的几个地方:
1、算子函数中使用到的外部变量:优化网络传输的性能,可以优化集群中内存的占用和消耗
2、持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER:优化集群中内存的占用和消耗,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。
3、shuffle:可以优化网络传输的性能
六、减少Shuffle的发生
1、尽量避免使用会发生shuffle的算子
2、在数据源头,对我们的数据提前进行聚合。
七、优化数据结构
1、避免对象套对象
2、减少数据集合的使用,尽量使用数组
3、因为对象需要序列化,能不用对象就尽量不用,建议使用字符串拼接
4、可以使用第三方提供的占用内存小,序列化速度快数据结构类库
八、避免数据倾斜
1、数据倾斜的概念
有的时候,我们可能会遇到大数据计算中一个棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多,数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题。以保证Spark作业的性能。
数据倾斜发生时的现象
绝大多数task执行的都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。整个Spark作业的运行进度是由运行时间最长的那个task决定的。
原本能够正常执行的Spark作业,某天突然报出00M(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。
2、如何定位导致数据倾斜的代码
数据倾斜只会发生在shuffle过程中,这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等,出现数据倾斜时,可能就是你的代码中使用
了这些算子中的某一个所导致的。