由于Spark自己的调优guidance已经覆盖了很多很有价值的点,因此这里直接翻译一份过来。也作为一个积累。
Spark 调优 (Tuning Spark)
由于大多数Spark计算任务是在内存中运行计算,任何集群中的资源限制都可能成为Spark程序的瓶颈,比如:CPU、网络、带宽、内存。通常情况下,如果内存能容纳所处理数据,主要的瓶颈则仅是网络带宽。但有些时候您也需要做一些调优,比如利用RDD序列化存储来降低内存消耗。本手册将会涵盖以下两个大点:数据序列化(对优化网络传输和降低内存开销有显著效果)、内存优化。我们同时会介绍另外的几个小点。
数据序列化 (Data Serialization)
在分布式应用中序列化起着非常重要的作用。序列化的性能较慢或序列化的结果较大都会拖慢了整体计算的性能。通常来说,序列化应是你在调优Spark程序时首要考虑的因素。Spark做了许多努力来平衡序列化的易用性(即方便地将任意Java类型对象序列化)和性能。Spark提供了两种序列化库:
- Java serialization:默认选择。Spark利用Java的
ObjectOutputStream
方式进行序列化,适用于任意实现了java.io.Serializable
接口的类。同时你也可以利用java.io.Externalizable
接口去实现自定义的序列化。Java serialization方式比较灵活可移植,但性能较低,而且序列化后的结果较大。 - Kryo serialization:Spark 同时也支持Kryo库(version 2)去实现更快速的序列化。Kryo一般要比Java自带序列化快10倍,序列化结果也更小,但需要用户手动将需要序列化的类手动注册到Kryo的Register上去使其生效。
配置 sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
可是Kryo生效,该序列化指定会在shuffle阶段和rdd持久序列化(persist)阶段生效。我们推荐您使用Kryo,没有将其作为默认序列化方案的唯一原因是需要用户手动注册。在Spark 2.0.0以后的版本中,Spark内部的shuffle 已使用 Kryo来序列化原始类型、原始类型的数组类型和string类型。
Kryo已在Spark core中,无需另外引入依赖。SparkConf中使用registerKryoClasses
方法即可。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
您可通过Kryo官方文档 来获取更多的高级配置及自定义内容,包括添加自定义的序列化代码。如果您的对象确实很大,您可能需要考虑调整spark.kryoserializer.buffer
参数来保证缓冲区有足够的内存去容纳大对象。
最后,即使用户不手动注册类到Kryo,Kryo依然能够起效,只是这种情况下Kryo会对每个类存储完整的类名,造成不必要的浪费。
内存调优 (Memory Tuning)
内存调优中通常要考虑三个方面:Java对象占用的内存大小、访问这些对象的开销以及GC开销。
通常来说,访问Java对象时很快的,但访问对象内部成员字段的时候却可能会造成2-5倍的损耗,可能原因包括如下:
- Java object都会有个对象头,16字节大小,保存着指向该类的指针。在一些字段很小(比如只有一个int)的情况下,对象头大小甚至比对象数据本身还要大。
- Java String 类型会有40字节左右的额外开销,用于保存长度等信息。同时string内默认用utf-16编码存储,导致存储的开销也比纯粹的char[]要大。
- 集合类型的类(List、Set、Map等)许多用了链式结构或树形结构,相当于每个Entry的一个Wrapper对象。这会导致多余的对象头和链表指针存储的开销。
- 泛型中的原始类型会被强制装箱成对应的对象类型。
本小节会先从一个Spark内存管理的overview引入,然后再讨论用户的优化策略。我们还会着重描述如何计算你的对象所占用内存的大小以及如何去通过改变数据结构或序列化去优化它。接着我们还会讨论一下Spark缓存大小的调优和GC策略的调优。
Spark 内存管理 Overview
Spark的内存主要分为两类:运行与存储。运行内存包括shuffle、join、sort、aggregation那个计算所占用的内存,存储内存包括cache、集群中数据传输所占用的内存。Spark程序里运行和存储实际上是共用一块内存区域 M,即如果运行不怎么消耗内存,那么存储模块课请求所有的可用内存,反之亦然。运行内存在内存不足时会抢占存储内存,除非存储内存已经降低到一个特定阈值 R。换句话说,R代表一块存储区域的大小,这块存储区域是不会被运行占用的。但反过来,由于逻辑过于复杂,存储不会去抢占运行内存。
如上设计保证了几个期望的特性。首先,纯运行类的应用可以尽可能地申请足够多的内存,避免了无谓的内存分割浪费。其次,需要cache内存的应用也能保证一个最小大小的存储不会被清除。最后,这种设计在大多数场景下提供了开箱即用且性能可接受的方案,用户不必过于关注实际spark程序的内存分布。
以下为两个相关的配置,但我们不建议普通用户去主动修改,因为该配置可适用于大多数场景。
spark.memory.fraction
表示 M 相对于Java堆空间的比例大小,默认0.6 。剩下 40% 的内存会给用户的数据结构、Spark内部元数据等,并能作为一个预防因超大对象而导致 OOM的缓冲。该值设定需与JVM的老年代空间(old/tenured)相适应(即不能超过)。spark.memory.storageFraction
表示 R 相对于 M 的比例大小,默认 0.5 。如上文所描述,R 是存储内存保持最小的不会被抢占的阈值。
内存消耗估计
最好的衡量一个dataset内存消耗大小的方法是创建一个rdd,cache到memory,然后再Spark WebUI的Storage页面去观察具体数值。
若要估计一个特定对象的内存消耗,可使用SizeEstimator.estimate()
的方法。这是个有效地估计不同数据结构并优化内存的方法,也可用于去衡量预估广播变量(即java rdd代码块中一些外部final传入的变量)在每个executor的堆内存上的大小。
数据结构类型优化
这里的数据指的是用于传输的数据。降低内存消耗的最首要的方法便是避免Java对象机制带来额外开销,比如引用类型的对象或wrapper类对象。有几种优化方法:
- 优先选择原始类型、数组,而不是集合类。
- 避免嵌套或wrapper结构
- id方面能用数值型的考虑不要用string类型
- 对于32gb内存以下的机器,添加
-XX:+UseCompressedOops
进行指针压缩,把8-byte指针压缩成 4-byte。可在spark-env.sh中配置。
序列化的RDD存储
当你的对象在经过了如上优化后依然较大时,一个更简单的方法是序列化的方式去存储这些对象。这可以通过在cache/persist中设置StorageLevel实现,比如MEMORY_ONLY_SER
。Spark会将对象用一个序列化后的large byte array去存储。当然由于需要即时的反序列化,这会带来访问上的性能开销,因此这方面是需要用户自行平衡的。Spark极力推荐使用Kryo序列化来提高序列化/反序列化性能以及压缩存储的字节数。
垃圾回收调优 (GC调优)
JVM垃圾回收在一些临时对象创建频繁的程序里是一个大问题。JVM GC 大致思路就是JVM寻找不再被引用的对象,通过如分代或标记等机制进行清除释放内存。需要注意的是,Java GC的开销是和Java对象数量成正比的,所以建议尽可能使用简单的数据结构或原始类型来降低这部分开销。另一个更好的方法是,将对象序列化成byte[]保存,则实际上每个RDD分区的主要存储就是一个大型byte数组。因此,在做其他GC参数优化前,若性能影响不大,上一小节提及的RDD序列化存储方案是个强烈建议试用的方案。
用户自己的task执行和RDD的cache缓存的平衡也会产生GC问题(即上文的运行内存与存储内存),下文将会对此进行讨论。
评估GC的影响
GC调优第一步是先收集gc触发的频率、gc耗时、造成程序pause的最长耗时等统计数据,在Java参数中添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
可获得日志 (参见 configuration guide做jvm参数调整)。executor的日志在executor的stdout中查看,而并非driver的log。
调优GC
首先需要知道JVM内存管理的一些信息:
- Java heap被划分为 young 和 old。young负责一些生命周期短的对象,old负责一些生命周期较长以及超过young分配大小的对象。
- young 被划分为 eden, survivor1, survivor2。
- 简单描述GC过程:eden满了,触发minor GC,将对象从eden和survivor1拷贝到survivor2。如果对象存活时间足够长或survivor2满了,则将其刷入old。如果old接近满了,则触发full gc,会同时对young和old进行gc清理。一般认为,full gc会影响整体程序的性能,应尽量的减少。
Spark GC调优的目标是保证只有那些长时间存活的RDD保存在old,同时young能持续有足够空间存储短时间存活的对象。这样可以帮助避免full gc频繁地去回收task 运行阶段创建的临时对象。以下为一些技巧:
- 检查gc统计中是否发生了过多的gc,如果full gc在一个task中发生了过多次,用户需考虑适当添加executor内存。
- 如果minor GC较多,major GC/full GC 较少,尝试分配多点内存给Eden。你可以根据你自己的task的大致使用内存来估计eden区大小 E ,因此 young区大小一般为 4/3 * E 。使用Java参数
-Xmn=4/3*E
。 - 如果GC统计中 old 接近满了,则适当降低
spark.memory.fraction
,毕竟减少点缓存比GC影响执行性能更能让人接受。或者,考虑减少young大小;或者,调大JVM的NewRatio
参数,大部分JVM该值默认为2,表示old 占了2/3的堆内存,这个比例应当足够大,且比spark.memory.fraction
的比例大。 - 尝试 G1 GC
-XX:+UseG1GC
。在一些大堆 JVM的场景下,有助于提高gc性能。G1需要额外的堆空间进行对象移动,考虑调大-XX:G1HeapRegionSize
。 - 举个例子,如果你的task在从HDFS读取数据,task的内存使用可根据读取的HDFS block大小来估计。需注意通常一个解压的block是原block的2-3倍大,假设一个executor有个3-4个task在运行,HDFS block size为128M,那么eden区大小考虑为
4 * 3 * 128 MB
。 - 每当使用新配置后,需继续监控GC的耗时及频率。
Spark executors的GC 参数可通过spark.executor.extraJavaOptions
配置。关于GC还有很多可以探讨的,请大家自行上网去深入学习。
其他优化
并行度
如果并行度设置不够高,集群资源可能不能被较充分利用。Spark默认会根据textFile文件数或parallelize的去设置map task的个数,reduce task的个数默认则使用最大的父RDD的分区数。用户可通过设置大多数并行方法的第二个参数指定并行度或修改默认配置spark.default.parallelism
。我们认为每个CPU核上并行运行2-3个task是推荐配置。
Reduce Task的内存使用
由于你的RDD无法适配空余内存,程序会报OOM。有时候导致这种情况的可能只是你的其中一个数据集。通常的优化方法有增大并行度使得每个分区的输入变小或提供自己的partitioner使得数据更加均匀分布。Spark最小可支持的Task运行时长为200ms(依靠executor的JVM重用以及Spark自身的启动task的低开销),因此用户可以放心的增大并行度/分区数甚至可以超过几倍的core数。
广播大变量
使用 sparkcontext的broadcast函数可以显著减少每个序列化task的大小,以及启动一个job的开销。如果你的task需要使用driver程序中的一些大变量,则考虑把它变为一个广播变量。Spark会吧每个task的序列化大小打印到master的log中,用户可以观察日志来判断自己的tasks是否过大。通常来说,一个task序列化大小超过20KB则被认为应当做优化了。
数据本地性
数据本地性会对spark job的性能产生极大影响。如果代码和数据在一起,则计算会变得很快;但如果两者分开了,则其中一方需移动到另一方的进程去执行。一般来说,迁移代码比迁移数据要快。这是Spark构建数据本地性机制的基本原则。
数据本地性可被表示为数据离代码有多近,从近到远可被表示为几个级别:
- PROCESS_LOCAL 数据在同一个JVM中
- NODE_LOCAL 数据在同一个结点,比如HDFS的同一个DataNode或同结点另一个executor上。
- NO_PREF 数据从任何地方访问速度都一样,无任何本地性偏好。
- RACK_LOCAL 数据在同一个机架上,一般定义即是同一个交换机子网下。
- ANY 数据不在同一个机架上,可能在网络上。
Spark优先调度tasks都有最佳的本地性,但这不总是有可能做到的。如果有空闲的executor和未处理的数据,Spark会降低数据本地性。此时有两个选择:一是等待直到当前节点cpu释放足够起一个新task去处理同结点数据;二是立即在别的结点起个新task兵移动数据。
通常Spark依然采取第一种,等待cpu空闲在本地起新task。如果等待超时了,才执行第二种方案移动数据。每个级别间的等待超时参数可以统一配置也可以分别配置,参见 spark.locality
。如果你的任务的本地性很差并且执行时间较长,你可能需要调大超时时间。一般来说默认配置适用大多数场景。