CP的步骤
1. 首先如果RDD需要CP, 调用RDD.checkpoint()来mark
注释说了, 这个需要在Job被执行前被mark, 原因后面看, 并且最好选择persist这个RDD, 否则在存CP文件时需要重新computeRDD内容
并且当RDD被CP后, 所有dependencies都会被清除, 因为既然RDD已经被CP, 那么就可以直接从文件读取, 没有必要保留之前的parents的dependencies(保留只是为了replay)
2. 在SparkContext.runJob中, 最后会调用rdd.doCheckpoint()
如果前面已经mark过, 那么这里就会将rdd真正CP到文件中去, 这也是前面为什么说, mark必须在run job之前完成
def runJob[T, U: ClassManifest]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.get) rdd.doCheckpoint() result }
3. 在RDDCheckpointData.doCheckpoint中
会调用rdd.markCheckpointed(newRDD), 清除dependencies信息
并最终将状态设为, Checkpointed, 以表示完成CP
4. Checkpoint如何使用, 在RDD.computeOrReadCheckpoint中, 看到如果已经完成CP, 会直接从firstParent中读数据, 刚开始会觉得比较奇怪
if (isCheckpointed) {
firstParent[T].iterator(split, context)
}
RDD.firstParent的定义如下, 就是从dependencies中取第一个dependency的rdd
dependencies.head.rdd.asInstanceOf[RDD[U]]
而RDD.dependencies的定义如下, 可用看到在完成CP的情况下, 从dependencies中读到的其实就是CP RDD, 所以可以直接用
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {dependencies_}
RDD
/** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None // Avoid handling doCheckpoint multiple times to prevent excessive recursion private var doCheckpointCalled = false
/** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with SparkContext.setCheckpointDir() and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint() { if (context.checkpointDir.isEmpty) { throw new Exception("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new RDDCheckpointData(this)) // 创建RDDCheckpointData, 记录关于CP的所有信息 checkpointData.get.markForCheckpoint() // 标记为Marked } }
/** * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler * after a job using this RDD has completed (therefore the RDD has been materialized and * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs. */ private[spark] def doCheckpoint() { if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { checkpointData.get.doCheckpoint() // 调用RDDCheckpointData.doCheckpoint } else { dependencies.foreach(_.rdd.doCheckpoint()) // 当checkpointData没有被创建, 就checkpoint所有的父RDD } } }
/** * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`) * created from the checkpoint file, and forget its old dependencies and partitions. */ private[spark] def markCheckpointed(checkpointRDD: RDD[_]) { clearDependencies() // dependencies_ = null partitions_ = null deps = null // Forget the constructor argument for dependencies too }
/** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointed) { firstParent[T].iterator(split, context) } else { compute(split, context) } }
RDDCheckpointData
/** * Enumeration to manage state transitions of an RDD through checkpointing * [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] */ private[spark] object CheckpointState extends Enumeration { // 定义Checkpoint过程中的各个状态 type CheckpointState = Value val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value } /** * This class contains all the information related to RDD checkpointing. Each instance of this class * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as, * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations * of the checkpointed RDD. */ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) extends Logging with Serializable { import CheckpointState._ // The checkpoint state of the associated RDD. var cpState = Initialized // cp状态,先设为Initialized // The file to which the associated RDD has been checkpointed to @transient var cpFile: Option[String] = None // The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD. var cpRDD: Option[RDD[T]] = None // Mark the RDD for checkpointing def markForCheckpoint() { RDDCheckpointData.synchronized { // 先mark, 防止两个同时做cp if (cpState == Initialized) cpState = MarkedForCheckpoint } } // Is the RDD already checkpointed def isCheckpointed: Boolean = { RDDCheckpointData.synchronized { cpState == Checkpointed } } // Get the file to which this RDD was checkpointed to as an Option def getCheckpointFile: Option[String] = { RDDCheckpointData.synchronized { cpFile } } // Do the checkpointing of the RDD. Called after the first job using that RDD is over. def doCheckpoint() { // If it is marked for checkpointing AND checkpointing is not already in progress, // then set it to be in progress, else return RDDCheckpointData.synchronized { if (cpState == MarkedForCheckpoint) { // 只有已经是MarkedForCheckpoint状态, 才能继续CP cpState = CheckpointingInProgress } else { return } } // Create the output path for the checkpoint val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id) val fs = path.getFileSystem(new Configuration()) if (!fs.mkdirs(path)) { throw new SparkException("Failed to create checkpoint path " + path) } // Save to file, and reload it as an RDD rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _) // 将RDD存入文件, 完成cp val newRDD = new CheckpointRDD[T](rdd.context, path.toString) // 将RDD从文件重新载入 // Change the dependencies and partitions of the RDD RDDCheckpointData.synchronized { cpFile = Some(path.toString) cpRDD = Some(newRDD) rdd.markCheckpointed(newRDD) // 调用rdd.markCheckpointed清除deps cpState = Checkpointed // 将状态置为Checkpointed RDDCheckpointData.clearTaskCaches() logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id) } } // Get preferred location of a split after checkpointing def getPreferredLocations(split: Partition): Seq[String] = { RDDCheckpointData.synchronized { cpRDD.get.preferredLocations(split) } } def getPartitions: Array[Partition] = { RDDCheckpointData.synchronized { cpRDD.get.partitions } } def checkpointRDD: Option[RDD[T]] = { RDDCheckpointData.synchronized { cpRDD } } }
CheckpointRDD
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} /** * This RDD represents a RDD checkpoint file (similar to HadoopRDD). */ private[spark] class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String) extends RDD[T](sc, Nil) { @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) override def getPartitions: Array[Partition] = { // 应该是每个partitions是一个文件, 所以就是看cp目录里面的文件个数 val cpath = new Path(checkpointPath) val numPartitions = // listStatus can throw exception if path does not exist. if (fs.exists(cpath)) { val dirContents = fs.listStatus(cpath) val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted val numPart = partitionFiles.size if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) || ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) { throw new SparkException("Invalid checkpoint directory: " + checkpointPath) } numPart } else 0 Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i)) } checkpointData = Some(new RDDCheckpointData[T](this)) checkpointData.get.cpFile = Some(checkpointPath) override def getPreferredLocations(split: Partition): Seq[String] = { val status = fs.getFileStatus(new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))) val locations = fs.getFileBlockLocations(status, 0, status.getLen) locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost") } override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)) CheckpointRDD.readFromFile(file, context) // compute就是从cp文件里面把RDD读取出来 } override def checkpoint() { // Do nothing. CheckpointRDD should not be checkpointed. } }