Shuffle过程主要分为Shuffle write和Shuffle read两个阶段,2.0版本之后hash shuffle被删除,只保留sort shuffle,下面结合代码分析:
1.ShuffleManager
Spark在初始化SparkEnv的时候,会在create()方法里面初始化ShuffleManager
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
这里可以看到包含sort和tungsten-sort两种shuffle,通过反射创建了ShuffleManager,ShuffleManager是一个特质,核心方法有下面几个:
private[spark] trait ShuffleManager {
/**
* 注册一个shuffle返回句柄
*/
def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** 获取一个Writer根据给定的分区,在executors执行map任务时被调用 */
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
/**
* 获取一个Reader根据reduce分区的范围,在executors执行reduce任务时被调用
*/
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
...
}
2.SortShuffleManager
SortShuffleManager是ShuffleManager的唯一实现类,对于以上三个方法的实现如下:
2.1 registerShuffle
/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
// 1.首先检查是否符合BypassMergeSort
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
// 2.否则检查是否能够序列化
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, dependency)
}
}
1.首先检查是否符合BypassMergeSort,这里需要满足两个条件,首先是当前shuffle依赖中没有map端的聚合操作,其次是分区数要小于spark.shuffle.sort.bypassMergeThreshold的值,默认为200,如果满足这两个条件,会返回BypassMergeSortShuffleHandle,启用bypass merge-sort shuffle机制
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
false
} else {
// 默认值为200
val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
2.如果不满足上面条件,检查是否满足canUseSerializedShuffle()方法,如果满足该方法中的3个条件,则会返回SerializedShuffleHandle,启用tungsten-sort shuffle机制
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
// 序列化器需要支持Relocation
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
// 不能有map端聚合操作
} else if (dependency.mapSideCombine) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
s"map-side aggregation")
false
// 分区数不能大于16777215+1
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}
3.如果以上两个条件都不满足的话,会返回BaseShuffleHandle,采用基本sort shuffle机制
2.2 getReader
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startPartition, endPartition)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}
这里返回BlockStoreShuffleReader
2.3 getWriter
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
val env = SparkEnv.get
// 根据handle获取不同ShuffleWrite
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics,
shuffleExecutorComponents)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
bypassMergeSortHandle,
mapId,
env.conf,
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(
shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
}
}
这里会根据handle获取不同ShuffleWrite,如果是SerializedShuffleHandle,使用UnsafeShuffleWriter,如果是BypassMergeSortShuffleHandle,采用BypassMergeSortShuffleWriter,否则使用SortShuffleWriter
3.三种Writer的实现
如上文所说,当开启bypass机制后,会使用BypassMergeSortShuffleWriter,如果serializer支持relocation并且map端没有聚合同时分区数目不大于16777215+1三个条件都满足,使用UnsafeShuffleWriter,否则使用SortShuffleWriter
3.1 BypassMergeSortShuffleWriter
BypassMergeSortShuffleWriter继承ShuffleWriter,用java实现,会将map端的多个输出文件合并为一个文件,同时生成一个索引文件,索引记录到每个分区的初始地址,write()方法如下:
@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
// 新建一个ShuffleMapOutputWriter
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, numPartitions);
try {
// 如果没有数据的话
if (!records.hasNext()) {
// 返回所有分区的写入长度
partitionLengths = mapOutputWriter.commitAllPartitions();
// 更新mapStatus
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
// 创建和分区数相等的DiskBlockObjectWriter FileSegment
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
// 对于每个分区
for (int i = 0; i < numPartitions; i++) {
// 创建一个临时的block
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
// 获取temp block的file和id
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
// 对于每个分区,创建一个DiskBlockObjectWriter
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
// 创建文件和写入文件都需要大量时间,也需要包含在shuffle写入时间里面
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
// 如果有数据的话
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
// 对于每条数据按key写入相应分区对应的文件
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (int i = 0; i < numPartitions; i++) {
try (DiskBlockObjectWriter writer = partitionWriters[i]) {
// 提交
partitionWriterSegments[i] = writer.commitAndGet();
}
}
// 将所有分区文件合并成一个文件
partitionLengths = writePartitionedData(mapOutputWriter);
// 更新mapStatus
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
} catch (Exception e2) {
logger.error("Failed to abort the writer after failing to write map output.", e2);
e.addSuppressed(e2);
}
throw e;
}
}
合并文件的方法writePartitionedData()如下,默认采用零拷贝的方式来合并文件:
private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
// Track location of the partition starts in the output file
if (partitionWriters != null) {
// 开始时间
final long writeStartTime = System.nanoTime();
try {
for (int i = 0; i < numPartitions; i++) {
// 获取每个文件
final File file = partitionWriterSegments[i].file();
ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
if (file.exists()) {
// 采取零拷贝方式
if (transferToEnabled) {
// Using WritableByteChannelWrapper to make resource closing consistent between
// this implementation and UnsafeShuffleWriter.
Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
// 在这里会调用Utils.copyFileStreamNIO方法,最终调用FileChannel.transferTo方法拷贝文件
if (maybeOutputChannel.isPresent()) {
writePartitionedDataWithChannel(file, maybeOutputChannel.get());
} else {
writePartitionedDataWithStream(file, writer);
}
} else {
// 否则采取流的方式拷贝
writePartitionedDataWithStream(file, writer);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
}
} finally {
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
}
return mapOutputWriter.commitAllPartitions();
}
3.2 UnsafeShuffleWriter
UnsafeShuffleWriter也是继承ShuffleWriter,用java实现,write方法如下:
@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
// 跟踪异常
boolean success = false;
try {
while (records.hasNext()) {
// 将数据插入ShuffleExternalSorter进行外部排序
insertRecordIntoSorter(records.next());
}
// 合并并输出文件
closeAndWriteOutput();
success = true;
} finally {
if (sorter != null) {
try {
sorter.cleanupResources();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during " +
"cleanup.", e);
}
}
}
}
}
这里主要有两个方法:
3.2.1 insertRecordIntoSorter()
@VisibleForTesting
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
assert(sorter != null);
// 获取key和分区
final K key = record._1();
final int partitionId = partitioner.getPartition(key);
// 重置缓冲区
serBuffer.reset();
// 将key和value写入缓冲区
serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
// 获取序列化数据大小
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
// 将序列化后的数据插入ShuffleExternalSorter处理
sorter.insertRecord(
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}
该方法会将数据进行序列化,并且将序列化后的数据通过insertRecord()方法插入外部排序器中,insertRecord()方法如下:
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
throws IOException {
// for tests
assert(inMemSorter != null);
// 如果数据条数超过溢写阈值,直接溢写磁盘
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
numElementsForSpillThreshold);
spill();
}
// Checks whether there is enough space to insert an additional record in to the sort pointer
// array and grows the array if additional space is required. If the required space cannot be
// obtained, then the in-memory data will be spilled to disk.
// 检查是否有足够的空间插入额外的记录到排序指针数组中,如果需要额外的空间对数组进行扩容,如果空间不够,内存中的数据将会被溢写到磁盘上
growPointerArrayIfNecessary();
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 or 8 bytes to store the record length.
// 需要额外的4或8个字节存储数据长度
final int required = length + uaoSize;
// 如果需要更多的内存,会想TaskMemoryManager申请新的page
acquireNewPageIfNecessary(required);
assert(currentPage != null);
final Object base = currentPage.getBaseObject();
//Given a memory page and offset within that page, encode this address into a 64-bit long.
//This address will remain valid as long as the corresponding page has not been freed.
// 通过给定的内存页和偏移量,将当前数据的逻辑地址编码成一个long型
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
// 写长度值
UnsafeAlignedOffset.putSize(base, pageCursor, length);
// 移动指针
pageCursor += uaoSize;
// 写数据
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
// 移动指针
pageCursor += length;
// 将编码的逻辑地址和分区id传给ShuffleInMemorySorter进行排序
inMemSorter.insertRecord(recordAddress, partitionId);
}
在这里对于数据的缓存和溢写不借助于其他高级数据结构,而是直接操作内存空间
growPointerArrayIfNecessary()方法如下:
/**
* Checks whether there is enough space to insert an additional record in to the sort pointer
* array and grows the array if additional space is required. If the required space cannot be
* obtained, then the in-memory data will be spilled to disk.
*/
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
// 如果没有空间容纳新的数据
if (!inMemSorter.hasSpaceForAnotherRecord()) {
// 获取当前内存使用量
long used = inMemSorter.getMemoryUsage();
LongArray array;
try {
// could trigger spilling
// 分配给缓存原来两倍的容量
array = allocateArray(used / 8 * 2);
} catch (TooLargePageException e) {
// The pointer array is too big to fix in a single page, spill.
// 如果超出了一页的大小,直接溢写,溢写方法见后面
// 一页的大小为128M,在PackedRecordPointer类中
// static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes
spill();
return;
} catch (SparkOutOfMemoryError e) {
// should have trigger spilling
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.error("Unable to grow the pointer array");
throw e;
}
return;
}
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
// 如果有了剩余空间,则表明没必要扩容,释放分配的空间
freeArray(array);
} else {
// 否则把原来的数组复制到新的数组
inMemSorter.expandPointerArray(array);
}
}
}
spill()方法如下:
@Override
public long spill(long size, MemoryConsumer trigger) throws IOException {
if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
return 0L;
}
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
spills.size(),
spills.size() > 1 ? " times" : " time");
// Sorts the in-memory records and writes the sorted records to an on-disk file.
// This method does not free the sort data structures.
// 对内存中的数据进行排序并且将有序记录写到一个磁盘文件中,这个方法不会释放排序的数据结构
writeSortedFile(false);
final long spillSize = freeMemory();
// 重置ShuffleInMemorySorter
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory
// pages, we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize;
}
writeSortedFile()方法:
private void writeSortedFile(boolean isLastFile) {
// This call performs the actual sort.
// 返回一个排序好的迭代器
final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
inMemSorter.getSortedIterator();
// If there are no sorted records, so we don't need to create an empty spill file.
if (!sortedRecords.hasNext()) {
return;
}
final ShuffleWriteMetricsReporter writeMetricsToUse;
// 如果为true,则为输出文件,否则为溢写文件
if (isLastFile) {
// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
writeMetricsToUse = writeMetrics;
} else {
// We're spilling, so bytes written should be counted towards spill rather than write.
// Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
// them towards shuffle bytes written.
writeMetricsToUse = new ShuffleWriteMetrics();
}
// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
// data through a byte array. This array does not need to be large enough to hold a single
// record;
// 创建一个字节缓冲数组,大小为1m
final byte[] writeBuffer = new byte[diskWriteBufferSize];
// Because this output will be read during shuffle, its compression codec must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more details.
// 创建一个临时的shuffle block
final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempShuffleBlock();
// 获取文件和id
final File file = spilledFileInfo._2();
final TempShuffleBlockId blockId = spilledFileInfo._1();
final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
// Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
// Our write path doesn't actually use this serializer (since we end up calling the `write()`
// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
// around this, we pass a dummy no-op serializer.
// 不做任何转换的序列化器,因为需要一个实例来构造DiskBlockObjectWriter
final SerializerInstance ser = DummySerializerInstance.INSTANCE;
int currentPartition = -1;
final FileSegment committedSegment;
try (DiskBlockObjectWriter writer =
blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) {
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
// 遍历
while (sortedRecords.hasNext()) {
sortedRecords.loadNext();
final int partition = sortedRecords.packedRecordPointer.getPartitionId();
assert (partition >= currentPartition);
if (partition != currentPartition) {
// Switch to the new partition
// 如果切换到了新的分区,提交当前分区,并且记录当前分区大小
if (currentPartition != -1) {
final FileSegment fileSegment = writer.commitAndGet();
spillInfo.partitionLengths[currentPartition] = fileSegment.length();
}
// 然后切换到下一个分区
currentPartition = partition;
}
// 获取指针,通过指针获取页号和偏移量
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
final Object recordPage = taskMemoryManager.getPage(recordPointer);
final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
// 获取剩余数据
int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
// 跳过数据前面存储的长度
long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
while (dataRemaining > 0) {
final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
// 将数据拷贝到缓冲数组中
Platform.copyMemory(
recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
// 从缓冲数组中转入DiskBlockObjectWriter
writer.write(writeBuffer, 0, toTransfer);
// 更新位置
recordReadPosition += toTransfer;
// 更新剩余数据
dataRemaining -= toTransfer;
}
writer.recordWritten();
}
// 提交
committedSegment = writer.commitAndGet();
}
// If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
// then the file might be empty. Note that it might be better to avoid calling
// writeSortedFile() in that case.
// 记录溢写文件的列表
if (currentPartition != -1) {
spillInfo.partitionLengths[currentPartition] = committedSegment.length();
spills.add(spillInfo);
}
// 如果是溢写文件,更新溢写的指标
if (!isLastFile) {
writeMetrics.incRecordsWritten(
((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(
((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
}
}
encodePageNumberAndOffset()方法如下:
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
// 如果开启了堆外内存,偏移量为绝对地址,可能需要64位进行编码,由于页大小限制,将其减去当前页的基地址,变为相对地址
if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
// In off-heap mode, an offset is an absolute address that may require a full 64 bits to
// encode. Due to our page size limitation, though, we can convert this into an offset that's
// relative to the page's base offset; this relative offset will fit in 51 bits.
offsetInPage -= page.getBaseOffset();
}
return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
}
@VisibleForTesting
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";
// 高13位为页号,低51位为偏移量
// 页号左移51位,再拼偏移量和上一个低51位都为1的掩码0x7FFFFFFFFFFFFL
return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
}
ShuffleInMemorySorter的insertRecord()方法如下:
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
throw new IllegalStateException("There is no space for new record");
}
array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
pos++;
}
PackedRecordPointer.packPointer()方法:
public static long packPointer(long recordPointer, int partitionId) {
assert (partitionId <= MAXIMUM_PARTITION_ID);
// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
// 将页号右移24位,和低27位拼在一起,这样逻辑地址被压缩成40位
final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
// 将分区号放在高24位上
return (((long) partitionId) << 40) | compressedAddress;
}
getSortedIterator()方法:
public ShuffleSorterIterator getSortedIterator() {
int offset = 0;
// 使用基数排序对内存分区ID进行排序。基数排序要快得多,但是在添加指针时需要额外的内存作为保留内存
if (useRadixSort) {
offset = RadixSort.sort(
array, pos,
PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
// 否则采用timSort排序
} else {
MemoryBlock unused = new MemoryBlock(
array.getBaseObject(),
array.getBaseOffset() + pos * 8L,
(array.size() - pos) * 8L);
LongArray buffer = new LongArray(unused);
Sorter<PackedRecordPointer, LongArray> sorter =
new Sorter<>(new ShuffleSortDataFormat(buffer));
sorter.sort(array, 0, pos, SORT_COMPARATOR);
}
return new ShuffleSorterIterator(pos, array, offset);
}
3.2.2 closeAndWriteOutput()
@VisibleForTesting
void closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
// 获取溢写文件
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
try {
// 合并溢写文件
partitionLengths = mergeSpills(spills);
} finally {
// 删除溢写文件
for (SpillInfo spill : spills) {
if (spill.file.exists() && !spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
// 更新mapstatus
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
}
mergeSpills()方法:
private long[] mergeSpills(SpillInfo[] spills) throws IOException {
long[] partitionLengths;
// 如果没有溢写文件,创建空的
if (spills.length == 0) {
final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
return mapWriter.commitAllPartitions();
// 如果只有一个溢写文件,将它合并输出
} else if (spills.length == 1) {
Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter =
shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);
if (maybeSingleFileWriter.isPresent()) {
// Here, we don't need to perform any metrics updates because the bytes written to this
// output file would have already been counted as shuffle bytes written.
partitionLengths = spills[0].partitionLengths;
maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths);
} else {
partitionLengths = mergeSpillsUsingStandardWriter(spills);
}
// 如果有多个,合并输出,合并的时候有NIO和BIO两种方式
} else {
partitionLengths = mergeSpillsUsingStandardWriter(spills);
}
return partitionLengths;
}
3.3 SortShuffleWriter
SortShuffleWriter会使用PartitionedAppendOnlyMap或PartitionedPariBuffer在内存中进行排序,如果超过内存限制,会溢写到文件中,在全局输出有序文件的时候,对之前的所有输出文件和当前内存中的数据进行全局归并排序,对key相同的元素会使用定义的function进行聚合,入口为write()方法:
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 创建一个外部排序器,如果map端有预聚合,就传入aggregator和keyOrdering,否则不需要传入
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
// 将数据放入ExternalSorter进行排序
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
// 创建一个输出Wrtier
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
dep.shuffleId, mapId, dep.partitioner.numPartitions)
// 将外部排序的数据写入Writer
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
val partitionLengths = mapOutputWriter.commitAllPartitions()
// 更新mapstatus
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}
insertAll()方法:
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
// 是否需要map端聚合
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
// 使用AppendOnlyMap在内存中聚合values
// 获取mergeValue()函数,将新值合并到当前聚合结果中
val mergeValue = aggregator.get.mergeValue
// 获取createCombiner()函数,创建聚合初始值
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
// 如果一个key当前有聚合值,则合并,如果没有创建初始值
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
// 遍历
while (records.hasNext) {
// 增加读取记录数
addElementsRead()
kv = records.next()
// map为PartitionedAppendOnlyMap,将分区和key作为key,聚合值作为value
map.changeValue((getPartition(kv._1), kv._1), update)
// 是否需要溢写到磁盘
maybeSpillCollection(usingMap = true)
}
// 如果不需要map端聚合
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
// buffer为PartitionedPairBuffer,将分区和key加进去
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
// 是否需要溢写到磁盘
maybeSpillCollection(usingMap = false)
}
}
}
该方法主要是判断在插入数据时,是否需要在map端进行预聚合,分别采用两种数据结构来保存
maybeSpillCollection()方法里面会调用maybeSpill()方法检查是否需要溢写,如果发生溢写,重新构造一个map或者buffer结构从头开始缓存,如下:
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
// 判断是否需要溢写
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
// 判断是否需要溢写
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
// 如果读取的记录数是32的倍数,并且预估map或者buffer内存占用大于默认的5m阈值
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
// 尝试申请2*currentMemory-5m的内存
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
// 更新阈值
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
// 判断,如果还是不够,确定溢写
shouldSpill = currentMemory >= myMemoryThreshold
}
// 如果shouldSpill为false,但是读取的记录数大于Integer.MAX_VALUE,也是需要溢写
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
// 溢写次数+1
_spillCount += 1
logSpillage(currentMemory)
// 溢写缓存的集合
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
// 释放内存
releaseMemory()
}
shouldSpill
}
maybeSpill()方法里面会调用spill()进行溢写,如下:
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
// 根据给定的比较器进行排序,返回排序结果的迭代器
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
// 将迭代器中的数据溢写到磁盘文件中
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
// ArrayBuffer记录所有溢写的文件
spills += spillFile
}
spillMemoryIteratorToDisk()方法如下:
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
: SpilledFile = {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
// 创建一个临时块
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
// These variables are reset after each flush
var objectsWritten: Long = 0
val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
// 创建溢写文件的DiskBlockObjectWriter
val writer: DiskBlockObjectWriter =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
// List of batch sizes (bytes) in the order they are written to disk
// 记录写入批次大小
val batchSizes = new ArrayBuffer[Long]
// How many elements we have in each partition
// 记录每个分区条数
val elementsPerPartition = new Array[Long](numPartitions)
// Flush the disk writer's contents to disk, and update relevant variables.
// The writer is committed at the end of this process.
// 将内存中的数据按批次刷写到磁盘中
def flush(): Unit = {
val segment = writer.commitAndGet()
batchSizes += segment.length
_diskBytesSpilled += segment.length
objectsWritten = 0
}
var success = false
try {
// 遍历map或者buffer中的记录
while (inMemoryIterator.hasNext) {
val partitionId = inMemoryIterator.nextPartition()
require(partitionId >= 0 && partitionId < numPartitions,
s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
// 写入并更新计数值
inMemoryIterator.writeNext(writer)
elementsPerPartition(partitionId) += 1
objectsWritten += 1
// 写入条数达到10000条时,将这批刷写到磁盘
if (objectsWritten == serializerBatchSize) {
flush()
}
}
// 遍历完以后,将剩余的刷写到磁盘
if (objectsWritten > 0) {
flush()
} else {
writer.revertPartialWritesAndClose()
}
success = true
} finally {
if (success) {
writer.close()
} else {
// This code path only happens if an exception was thrown above before we set success;
// close our stuff and let the exception be thrown further
writer.revertPartialWritesAndClose()
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting ${file}")
}
}
}
}
// 返回溢写文件
SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
}
接下来就是排序合并操作,调用ExternalSorter.writePartitionedMapOutput()方法:
def writePartitionedMapOutput(
shuffleId: Int,
mapId: Long,
mapOutputWriter: ShuffleMapOutputWriter): Unit = {
var nextPartitionId = 0
// 如果没有发生溢写
if (spills.isEmpty) {
// Case where we only have in-memory data
val collection = if (aggregator.isDefined) map else buffer
// 根据指定的比较器进行排序
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext()) {
val partitionId = it.nextPartition()
var partitionWriter: ShufflePartitionWriter = null
var partitionPairsWriter: ShufflePartitionPairsWriter = null
TryUtils.tryWithSafeFinally {
partitionWriter = mapOutputWriter.getPartitionWriter(partitionId)
val blockId = ShuffleBlockId(shuffleId, mapId, partitionId)
partitionPairsWriter = new ShufflePartitionPairsWriter(
partitionWriter,
serializerManager,
serInstance,
blockId,
context.taskMetrics().shuffleWriteMetrics)
// 将分区内的数据依次取出
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(partitionPairsWriter)
}
} {
if (partitionPairsWriter != null) {
partitionPairsWriter.close()
}
}
nextPartitionId = partitionId + 1
}
// 如果发生溢写,将溢写文件和缓存数据进行归并排序,排序完成后按照分区依次写入ShufflePartitionPairsWriter
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
// 这里会进行归并排序
for ((id, elements) <- this.partitionedIterator) {
val blockId = ShuffleBlockId(shuffleId, mapId, id)
var partitionWriter: ShufflePartitionWriter = null
var partitionPairsWriter: ShufflePartitionPairsWriter = null
TryUtils.tryWithSafeFinally {
partitionWriter = mapOutputWriter.getPartitionWriter(id)
partitionPairsWriter = new ShufflePartitionPairsWriter(
partitionWriter,
serializerManager,
serInstance,
blockId,
context.taskMetrics().shuffleWriteMetrics)
if (elements.hasNext) {
for (elem <- elements) {
partitionPairsWriter.write(elem._1, elem._2)
}
}
} {
if (partitionPairsWriter != null) {
partitionPairsWriter.close()
}
}
nextPartitionId = id + 1
}
}
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
}
partitionedIterator()方法:
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
if (spills.isEmpty) {
// Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
// we don't even need to sort by anything other than partition ID
// 如果没有溢写,并且没有排序,只按照分区id排序
if (ordering.isEmpty) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
// 如果没有溢写但是排序,先按照分区id排序,再按key排序
} else {
// We do need to sort by both partition ID and key
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// Merge spilled and in-memory data
// 如果有溢写,就将溢写文件和内存中的数据归并排序
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
归并方法如下:
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
// 读取溢写文件
val readers = spills.map(new SpillReader(_))
val inMemBuffered = inMemory.buffered
// 遍历分区
(0 until numPartitions).iterator.map { p =>
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
// 合并溢写文件和内存中的数据
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
// 如果有聚合逻辑,按分区聚合,对key按照keyComparator排序
if (aggregator.isDefined) {
// Perform partial aggregation across partitions
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
// 如果没有聚合,但是有排序逻辑,按照ordering做归并
} else if (ordering.isDefined) {
// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
// sort the elements without trying to merge them
(p, mergeSort(iterators, ordering.get))
// 什么都没有直接归并
} else {
(p, iterators.iterator.flatten)
}
}
}
在write()方法中调用commitAllPartitions()方法输出数据,其中调用writeIndexFileAndCommit()方法写出数据和索引文件,如下:
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Long,
lengths: Array[Long],
dataTmp: File): Unit = {
// 创建索引文件和临时索引文件
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
// 获取shuffle data file
val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
// 对于每个executor只有一个IndexShuffleBlockResolver,确保原子性
synchronized {
// 检查索引是否和数据文件已经有了对应关系
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
// so just use the existing partition lengths and delete our temporary map outputs.
// 如果存在对应关系,说明shuffle write已经完成,删除临时索引文件
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
} else {
// 如果不存在,创建一个BufferedOutputStream
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
// 获取每个分区的大小,累加偏移量,写入临时索引文件
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}
// 删除可能存在的其他索引文件
if (indexFile.exists()) {
indexFile.delete()
}
// 删除可能存在的其他数据文件
if (dataFile.exists()) {
dataFile.delete()
}
// 将临时文件重命名成正式文件
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
} finally {
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
}
}
}
4.小结
-
Spark在初始化SparkEnv的时候,会在create()方法里面初始化ShuffleManager,包含sort和tungsten-sort两种shuffle
-
ShuffleManager是一个特质,核心方法有registerShuffle()、getReader()、getWriter(),
-
SortShuffleManager是ShuffleManager的唯一实现类,在registerShuffle()方法里面选择采用哪种shuffle机制,getReader()方法只会返回一种BlockStoreShuffleReader,getWriter()方法根据不同的handle选择不同的Writer,共有三种
-
BypassMergeSortShuffleWriter:如果当前shuffle依赖中没有map端的聚合操作,并且分区数小于spark.shuffle.sort.bypassMergeThreshold的值,默认为200,启用bypass机制,核心方法有:write()、writePartitionedData()(合并所有分区文件,默认采用零拷贝方式)
-
UnsafeShuffleWriter:如果serializer支持relocation并且map端没有聚合同时分区数目不大于16777215+1三个条件都满足,采用该Writer,核心方法有:write()、insertRecordIntoSorter()(将数据插入外部选择器排序)、closeAndWriteOutput()(合并并输出文件),前一个方法里核心方法有:insertRecord()(将序列化数据插入外部排序器)、growPointerArrayIfNecessary()(如果需要额外空间需要对数组扩容或溢写到磁盘)、spill()(溢写到磁盘)、writeSortedFile()(将内存中的数据进行排序并写出到磁盘文件中)、encodePageNumberAndOffset()(对当前数据的逻辑地址进行编码,转成long型),后面的方法里核心方法有:mergeSpills()(合并溢写文件),合并文件的时候有BIO和NIO两种方式
-
SortShuffleWriter:如果上面两者都不满足,采用该Writer,该Writer会使用PartitionedAppendOnlyMap或PartitionedPariBuffer在内存中进行排序,如果超过内存限制,会溢写到文件中,在全局输出有序文件的时候,对之前的所有输出文件和当前内存中的数据进行全局归并排序,对key相同的元素会使用定义的function进行聚合核心方法有:write()、insertAll()(将数据放入ExternalSorter进行排序)、maybeSpillCollection()(是否需要溢写到磁盘)、maybeSpill()、spill()、spillMemoryIteratorToDisk()(将内存中数据溢写到磁盘)、writePartitionedMapOutput()、commitAllPartitions()里面调用writeIndexFileAndCommit()方法写出数据和索引文件