spark的持久化机制做的相对隐晦一些,没有一个显示的调用入口。
首先通过rdd.persist(newLevel: StorageLevel)对此rdd的StorageLevel进行赋值,同checkpoint一样,本身没有进行之久化操作。真正进行持久化操作实在之后的第一个action 中通过iterator方法进行调用:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }
其中调用过持久化的rdd的StorageLevel不为NONE,所以会执行getOrCompute方法
if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) }
/** * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */ private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { //TODO block和partition的关系 val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { readCachedBlock = false computeOrReadCheckpoint(partition, context) }) match { case Left(blockResult) => if (readCachedBlock) { val existingMetrics = context.taskMetrics().inputMetrics existingMetrics.incBytesRead(blockResult.bytes) new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) { override def next(): T = { existingMetrics.incRecordsRead(1) delegate.next() } } } else { new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) } case Right(iter) => new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) } }
getOrCompute方法中,调用了blockManager.getOrElseUpdate方法实现了block的读取和持久化操作:
SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { readCachedBlock = false computeOrReadCheckpoint(partition, context) })
在getOrElseUpdate中调用doPutIterator方法,具体实现存储方式和级别的逻辑判断进而调用相应的存储实现MemoryStore或者DiskStore进行具体实现。
private def doPutIterator[T]( blockId: BlockId, iterator: () => Iterator[T], level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = { doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None // Size of the block in bytes var size = 0L if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. if (level.deserialized) { memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match { case Right(s) => size = s case Left(iter) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(iter) } } } else { // !level.deserialized memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match { case Right(s) => size = s case Left(partiallySerializedValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { fileOutputStream => partiallySerializedValues.finishWritingToStream(fileOutputStream) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator) } } } } else if (level.useDisk) { diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag) } size = diskStore.getSize(blockId) } val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory or disk store, tell the master about it. info.size = size if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, putBlockStatus) } addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis val bytesToReplicate = doGetLocalBytes(blockId, info) // [SPARK-16550] Erase the typed classTag when using default serialization, since // NettyBlockRpcServer crashes when deserializing repl-defined classes. // TODO(ekl) remove this once the classloader issue on the remote end is fixed. val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) { scala.reflect.classTag[Any] } else { classTag } try { replicate(blockId, bytesToReplicate, level, remoteClassTag) } finally { bytesToReplicate.unmap() } logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) } } assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty) iteratorFromFailedMemoryStorePut } }