1、Shuffle流程
spark的shuffle过程如下图所示,和mapreduce中的类似,但在spark2.0及之后的版本中只存在SortShuffleManager而将原来的HashShuffleManager废弃掉(但是shuffleWriter的子类BypassMergeSortShuffleWriter和已经被废弃掉的HashShuffleWriter类似)。这样,每个mapTask在shuffle的sort阶段只会生成一个结果文件,单个文件按照partitionId分成多个region。reducer阶段根据partitionId来fetch对应的region数据。
整个shuffle过程分为两个阶段,write(核心)和read阶段,其中write阶段比较重要的实现类为ExternalSorter(后面会重点分析该类)。
2、Shuffle Write
- BypassMergeSortShuffleWriter -
这种方式是对partition(对应的reduce)数量较少且不需要map-side aggregation的shuffle优化,将每个partition的数据直接写到对应的文件,在所有数据都写入完成后进行一次合并,下面是部分代码:
[BypassMergeSortShuffleWriter]->write
public void write(Iterator<Product2<K, V>> records) throws IOException {
...
partitionWriters = new DiskBlockObjectWriter[numPartitions];
/**
为每个partition创建一个DiskWriter用于写临时文件
**/
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
...
/**
对每个record用对应的writer进行文件写入操作
**/
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
//flush
for (DiskBlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}
/**
构造最终的输出文件实例,其中文件名为(reduceId为0):
"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
文件所在的local文件夹是根据该文件名的hash值确定。
1、如果运行在yarn上,yarn在启动的时候会根据配置项'LOCAL_DIRS'在本地创建
文件夹
**/
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
//在实际结果文件名后加上uuid用于标识文件正在写入,结束后重命名
File tmp = Utils.tempFileWith(output);
try {
//合并每个partition对应的文件到一个文件中
partitionLengths = writePartitionedFile(tmp);
//将每个partition的offset写入index文件方便reduce端fetch数据
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(),
partitionLengths);
}
-
UnsafeShuffleWriter(详见project tungsten)
该writer可将数据序列化后写入到堆外内存,只需要按照partitionid对地址进行排序,整个过程不涉及反序列化。
条件:
1、使用的序列化类需要支持object relocation.目前只能使用kryoSerializer
2、不需要map side aggregate即不能定义aggregator
3、partition数量不能大于支持的上限(2^24)
内存模型:
每条数据地址由一个64位的指针确定,其构成为:[24 bit partition number][13 bit memory page number][27 bit offset in page]
在内存为非8字节对齐的情况下,每个page的容量为227bits=128Mb,page总数为213,因此每个task可操作内存总量为:227*213bits=1Tb,在内存按字节对齐的情况下允许每个page的size有1g(即128*8,实际64位系统的内存都是8字节对齐的)的容量,数据存放在off heap上。在地址中加入partitionID 是为了排序阶段只需要对record的地址排序。
4、Shuffle过程中涉及到的几个参数
- spark.shuffle.sort.bypassMergeThreshold
当partition的数量小于该值并且不需要进行map-side aggregation时使用BypassMergeSortShuffleWriter来进行shuffle的write操作,默认值为200.
[SortShuffleWriter]->shouldBypassMergeSort
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}```
- *spark.shuffle.compress*、*spark.shuffle.file.buffer*
**[DiskBlockObjectWriter]->open**
def open(): DiskBlockObjectWriter = {
...
/**
'spark.shuffle.compress'-该参数决定是否对写入文件的序列化数据进行压缩。
'spark.shuffle.file.buffer'-设置buffer stream的buffersize,每write
一个byte时会检查当前buffer容量,容量满的时候则会flush到磁盘。该参数值在代码中
会乘以1024转换为字节长度。默认值为'32k',该值太大可能导致内存溢出。
**/
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
...
}```
spark.file.transferTo
决定在使用BypassMergeWriter过程中,最后对文件进行合并时是否使用NIO方式进行file stream的copy。默认为true,在为false的情况下合并文件效率比较低(创建一个大小为8192的字节数组作为buffer,从in stream中读满后写入out stream,单线程读写),版本号为2.6.32的linux内核在使用NIO方式会产生bug,需要将该参数设置为false。
spark.shuffle.spill.numElementsForceSpillThreshold
在使用UnsafeShuffleWriter时,如果内存中的数据超过这个值则对当前内存数据进行排序并写入磁盘临时文件。