Shuffle是性能调优的重点,Shuffle是为了汇聚有共同特征的一类数据到一个计算节点上进行计算。
Shuffle过程非常复杂:数据量大、将数据汇聚到正确的Partition和节点时产生多次磁盘交互、节省带宽而无可避免的压缩、网络传输所需的序列化
Shuffle需要持久化计算的中间结果,因为一旦数据丢失就要重新计算所有依赖的RDD
所以主要分析如何持久化(Shuffle Write),使下游的Task顺利获得(Shuffle Read),Shuffle Map Task是如何计算结果的
Hash Based Shuffle Write
Spark1.0只支持这种shuffle,因为不需要运用到排序。
每个Shuffle Map Task根据key的哈希值,计算每个key需要写入的partition,将数据单独写入一个文件,这个partition对应了下游的shuffle map task或者result task,下游的task读取这个文件并进行计算。
Basic Shuffle Writer
在Executor上执行shuffle Map Task时,最终调用了scheduler.ShuffleMapTask#runTask,其核心逻辑为:
1 从SparkEnv中获得shuffleManager,不仅支持Hash和Sort,还支持自定义的shuffle
2 从manager中取得writer,hash情况下获得的是shuffle.hash.HashShuffleWriter
3 调用rdd运算,运算结果通过writer进行持久化,也就是HashShuffleWriter#write,通过ShuffleDependency定义是否有Aggregator来确定是否作Map端聚合,将原始结果或聚合结果通过shuffle.FileShuffleBlockManager#forMapTask方法写入,写完后将元数据信息写入scheduler.MapStatus,下游的Task根据MapStatus取得需要处理的数据。
书本还对步骤3进行了详细的解释
存在的问题:
每个Shuffle Map Task需要为每个下游的Task创建独立的文件,这样会导致文件数量很多
1 每个节点同时打开多个文件,需要巨大的内存
2 随机写入硬盘的性能比顺序写入差
Shuffle Consolidate Writer
解决Shuffle过程中产生文件过多的问题。在Spark0.8.1加入Shuffle Consolidate Files机制。将shuffle.consolidateFiles设置为true开启。
也就是运行在同一个core里的shuffle map task,会共用一个文件,由第一个task创建,之后的task都追加在这个文件上,当shuffle map task的数量明显多于core时,此种writer就比basic的文件数少很多
源码上,与basic不同的地方就是forMaskTask#writers的实现不同。在同一个core上运行的shuffle map task相当于写了这个文件的不同部分。使用这种writer时下游的task通过shuffle.FileShuffleBlockManager.ShuffleFileGroup#getFileSegmentFor来区分。
虽然Shuffle Consolidate机制缓解了文件过多的问题,但没办法解决同时打开多个文件造成内存使用过大,以及随机读影响性能的问题。
为了解决hash based shuffle的缺陷:
spark1.0开始建立了shuffle pluggable,可自定义external shuffle service
spark1.1实现了sort based shuffle,1.2版本sort based shuffle成为shuffle默认选项
Shuffle Pluggable框架:
需要实现新的shuffle机制,就要实现以下接口:
org.apache.spark.shuffle.ShuffleManager
Driver和Executor都有一个ShuffleManager,通过配置项spark.shuffle.manager制定,由SparkEnv创建。
Driver的ShuffleManager负责注册Shuffle元数据(shuffleId、Map Task数量),Executor的ShuffleManager负责读写Shuffle数据
需要实现的功能有:
1 Driver注册元数据信息
2 获得Shuffle Writer,根据Shuffle Map Task的ID创建Shuffle Writer
3 获得Shuffle Reader,根据shuffleId和Partition的ID创建Shuffle Reader
4 为数据成员shuffleBlockManager赋值,保存实际的ShuffleBlockManager
5 删除本地的Shuffle元数据
6 停止Shuffle Manager
源码可看shuffle.sort.SortShuffleManager和hash.HashShuffleManger两种shuffle方案的做法
org.apache.spark.shuffle.ShuffleBlockManager
主要是从本地读取Shuffle数据的功能,接口通过spark.storage.BlockManager调用
主要功能有:
1 def getBytes,输入ShuffleBlockId,输出ByteBuffer
2 def getBlockData,输入ShuffleBlocked,输出ManagedBuffer,就是用来读取的方法
3 def stop,停止Manager
源码看shuffle.IndexShuffleBlockManager和shuffle.FileShuffleBlockManager
org.apache.spark.shuffle.ShuffleReader
实现了下游Task如何读取上游ShuffleMapTask的输出,通过spark.MapOutputTracker获得数据位信息,在本地则调用spark.storage.BlockManager的getBlockManager(其实也调用了shuffle.ShuffleBlockManager的getBlockManager)
只要实现了上面的模块就能实现shuffle机制,实现时要考虑到超大规模数据场景下的性能问题和资源消耗问题
Sort Based Write
1.2中默认为Sort Based Write,shuffle manager从hash换成了sort
Hash based shuffle设计目的是避免排序,但处理超大规模数据集时产生了大量的IO和内存消耗,而sort based shuffle将所有结果写到一个文件里,同时生成一个index文件,reducer通过index文件取得需要处理的数据,这样就节省了内存的使用和disk带来的高延时,减少了GC的风险和频率,避免同时读写多个文件的压力。
Shuffle Map Task按照key相应的partition ID进行sort,同一个partition的key不进行sort
这个过程中内存若不够用,就把排序好的内容写入外部存储,结束时进行一次归并排序
Index文件记录不同partition的位置信息,spark.storage.BlockManager实现了这种寻址方式,task能够获取到所需的partition
SortShuffleWriter的实现逻辑:
1 对于每个Partition创建一个scala.Array存储key/value对
2 scala.Array超过阈值时将内存数据写入外部存储文件,文件开始部分记录partition ID和数据条数
3 将所有外部存储的文件进行归并排序,一般每次同时打开10~100个文件,太多会产生内存溢出或垃圾回收,太少会提高延时
4 生成数据文件时同时生成Index索引文件,记录不同partition的起始位置
至于SortShuffleManager书本上没有详细讲,可在gitHub的apache.spark.sparkCore中看源码。
Shuffle Map Task运算结果处理:
分为两部分:Executor端直接处理Task结果、Driver端接到Task运行结束信息时对Shuffle Write的结果处理,使下游Task能得到需要的数据
与第6章executor的工作类似,就不重复看了
Shuffle Read
在Spark1.2中,不管Hash Based Shuffle还是Sort Based Shuffle,内置的Shuffle Reader都是spark.shuffle.hash.HashShuffleReader
整体流程是从ShuffledRDD#compute开始,调用ShuffleManager的getReader方法获取到ShuffleReader,调用read()方法进行读取。
storage.ShuffleBlockFetcherIterator通过splitLocalRemoteBlocks划分数据读取策略:
如果数据在本地,可以直接从BlockManager获取,如果需要从其他节点获取,就通过网络,由于Shuffle数据量很大,分为以下几种网络读取策略:
1 每次最多启动5个线程到最多5个节点上读取数据
2 每次请求的数据大小不超过spark.reducer.maxMbInFlight(默认为48MB)的五分之一
避免目标机器占用过多带宽,而且请求数据可以平行化
本地读取:
FetchLocalBlocks()负责本地Block获取,其实就是调用ShuffleBlockManager中的getBlockData方法
远程读取:
两种方式,netty和nio,通过spark.shuffle.blockTransferService来设置
其中storage.ShuffleBlockFetcherIterator#sendRequest向远程节点发起读取Block请求
书中还有很多具体实现的源码
性能调优:
spark.shuffle.manager
选择hash还是sort,取决于内存、排序、文件操作综合因素:
对于不需要排序且shuffle产生文件数量不多的情况下,hash更优,因为sort会按照reducer的partition进行排序浪费时间
sort的优势在于可扩展性,还在不断演进中
spark.shuffle.spill
参数默认为true,指定Shuffle过程中如果内存中的数据超过阈值是否将部分数据临时写入外部存储。设置为false会一直使用内存,有内存溢出风险
为了防止内存溢出,hash包装了spark.util.collection.ExternalAppendOnlyMap和spark.util.collection.ExternalSorter
Shuffle Read在聚合时也可能将数据写入外部以防止内存溢出
spark.shuffle.memoryFraction和spark.shuffle.safetyFraction
spark.shuffle.memoryFraction决定shuffle使用的内存到达总内存多少比例时开始写入外部存储,默认为0.2,它影响了写入外部存储的频率和垃圾回收的频率
为了降低写入外部存储的频率,适当提高spark.shuffle.memoryFraction,而且为了避免内存溢出,增加它时需要减少RDD cache的内存,即减少spark.storage.memoryFraction
为了处理实际使用的内存比估算值要大的情况,spark.shuffle.safetyFraction可降低实际shuffle过程中用到的内存,防止超出配置值
spark.shuffle.sort.bypassMergeThreshold
设置reducer的partition少于一定数量时(默认为200),sort based shuffle内部不使用归并排序,而是直接将每个partition写入单独文件,类似于hash方式,区别在于最后还是会合成一个文件,并通过index文件标记位置
存在同时打开文件过多导致内存占用过大的风险,如果内存比较紧张,可降低这个值
spark.shuffle.blockTransferService
spark1.2默认为netty,之前版本是nio,netty实现更加简洁
spark.shuffle.consolidateFiles
默认为false,处理hash方式产生过多文件的问题,若选为true,就是同一个core上运行的shuffle map task共用同一个文件。这个策略没什么用,并不能减少内存消耗
Spark.shuffle.compress和spark.shuffle.spill.cpmpress
默认为true,都是设置shuffle过程中是否对数据进行压缩的参数。前者针对最终写入本地文件系统的输出文件,而后者针对处理过程中需要写入外部存储的中间数据
前者主要考虑到网络IO是否成为瓶颈,若网络比较慢,则需要进行压缩,如果计算是CPU密集型的,就设置为false
后者应对CPU的压缩解压时间和Disk IO的时间进行比较,若Disk IO较慢时应设置为true,若硬盘时SSD,false可能更快
spark.reducer.maxMbInFlight
限制Reducer Task向其他Executor请求Shuffle数据所占最大内存,默认为48MB,主要考虑网络带宽问题
总结:
Spark 0.6和0.7,shuffle结果都先放在内存中,导致了OC和OOM概率非常大。在0.8增加了shuffle结果写入磁盘,并且为下游task生成单独的文件,解决了结果都需要存入内存的问题,但引入了文件过多的问题。0.8.1引入FileConsolidation机制减少了文件数。1.0引入Shuffle Pluggabkle框架,允许自定义shuffle机制。1.1引入sort based shuffle。1.2版本sort变成了默认。