首先rdd.checkpoint()本身并没有执行任何的写操作,只是做checkpointDir是否为空,然后生成一个ReliableRDDCheckpointData对象checkpointData,这个对象完成checkpoint的大部分工作。
/** * 只是生成了一个ReliableRDDCheckpointData的对象,并没有具体的实质操作 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with `SparkContext#setCheckpointDir` and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint(): Unit = RDDCheckpointData.synchronized { // NOTE: we use a global lock here due to complexities downstream with ensuring // children RDD partitions point to the correct parent partitions. In the future // we should revisit this consideration. if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new ReliableRDDCheckpointData(this)) } }
真正触发checkpoint操作的是rdd调用完checkpoint之后执行完的第一个action操作。
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies: " + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
其中调用rdd.doCheckpoint(),doCheckpoint代码如下:
/** * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD * has completed (therefore the RDD has been materialized and potentially stored in memory). * doCheckpoint() is called recursively on the parent RDDs. * * checkpointData.get.checkpoint()方法执行具体的写操作,由sc的action触发。如果本身没有checkpoint就根据依赖关系依次往上找。 */ private[spark] def doCheckpoint(): Unit = { RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) { if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { if (checkpointAllMarkedAncestors) { // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint // them in parallel. // Checkpoint parents first because our lineage will be truncated after we // checkpoint ourselves dependencies.foreach(_.rdd.doCheckpoint()) } checkpointData.get.checkpoint() } else { dependencies.foreach(_.rdd.doCheckpoint()) } } } }
其中checkpointData.get.checkpoint执行了最基本的写任务,docheckpoint的任务职能是如果该rdd执行过checkpoint操作,如果是把该RDD的祖先都checkpoint了,那么就根据依赖关系一次checkpoint操作。如果RDD本身没有
调用过checkpoint操作,那么就根据依赖关系一次checkpoint操作。
接下来看checkpointData.get.checkpoint的具体实现,其中主要功能在于ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)方法。
/** * Materialize this RDD and write its content to a reliable DFS. * This is called immediately after the first action invoked on this RDD has completed. * * writeRDDToCheckpointDirectory方法将RDD写到指定目录 */ protected override def doCheckpoint(): CheckpointRDD[T] = { val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir) // Optionally clean our checkpoint files if the reference is out of scope if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) { rdd.context.cleaner.foreach { cleaner => cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id) } } logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}") newRDD }
以下是ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)的方法实现。主要包含两本分,写partition数据和写partitioner。具体如下:
/** * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD. * 写RDD到hdfs,包括partition数据和partitioner数据 */ def writeRDDToCheckpointDirectory[T: ClassTag]( originalRDD: RDD[T], checkpointDir: String, blockSize: Int = -1): ReliableCheckpointRDD[T] = { val sc = originalRDD.sparkContext // Create the output path for the checkpoint val checkpointDirPath = new Path(checkpointDir) val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) if (!fs.mkdirs(checkpointDirPath)) { throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") } // Save to file, and reload it as an RDD val broadcastedConf = sc.broadcast( new SerializableConfiguration(sc.hadoopConfiguration)) // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) if (originalRDD.partitioner.nonEmpty) { writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) } val newRDD = new ReliableCheckpointRDD[T]( sc, checkpointDirPath.toString, originalRDD.partitioner) if (newRDD.partitions.length != originalRDD.partitions.length) { throw new SparkException( s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " + s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})") } newRDD }
写partition数据:
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
/** * Write an RDD partition's data to a checkpoint file. */ def writePartitionToCheckpointFile[T: ClassTag]( path: String, broadcastedConf: Broadcast[SerializableConfiguration], blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) val fs = outputDir.getFileSystem(broadcastedConf.value.value) val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId()) val finalOutputPath = new Path(outputDir, finalOutputName) val tempOutputPath = new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}") val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileOutputStream = if (blockSize < 0) { fs.create(tempOutputPath, false, bufferSize) } else { // This is mainly for testing purpose fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication(fs.getWorkingDirectory), blockSize) } val serializer = env.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) Utils.tryWithSafeFinally { serializeStream.writeAll(iterator) } { serializeStream.close() } if (!fs.rename(tempOutputPath, finalOutputPath)) { if (!fs.exists(finalOutputPath)) { logInfo(s"Deleting tempOutputPath $tempOutputPath") fs.delete(tempOutputPath, false) throw new IOException("Checkpoint failed: failed to save output of task: " + s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath") } else { // Some other copy of this task must've finished before us and renamed it logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") if (!fs.delete(tempOutputPath, false)) { logWarning(s"Error deleting ${tempOutputPath}") } } } }
111
写partitioner如下:
/** * Write a partitioner to the given RDD checkpoint directory. This is done on a best-effort * basis; any exception while writing the partitioner is caught, logged and ignored. */ private def writePartitionerToCheckpointDir( sc: SparkContext, partitioner: Partitioner, checkpointDirPath: Path): Unit = { try { val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName) val bufferSize = sc.conf.getInt("spark.buffer.size", 65536) val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) val fileOutputStream = fs.create(partitionerFilePath, false, bufferSize) val serializer = SparkEnv.get.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) Utils.tryWithSafeFinally { serializeStream.writeObject(partitioner) } { serializeStream.close() } logDebug(s"Written partitioner to $partitionerFilePath") } catch { case NonFatal(e) => logWarning(s"Error writing partitioner $partitioner to $checkpointDirPath") } }