  • Spark源码分析 – Checkpoint


    1. 首先如果RDD需要CP, 调用RDD.checkpoint()来mark
    注释说了, 这个需要在Job被执行前被mark, 原因后面看, 并且最好选择persist这个RDD, 否则在存CP文件时需要重新computeRDD内容
    并且当RDD被CP后, 所有dependencies都会被清除, 因为既然RDD已经被CP, 那么就可以直接从文件读取, 没有必要保留之前的parents的dependencies(保留只是为了replay)

    2. 在SparkContext.runJob中, 最后会调用rdd.doCheckpoint()
    如果前面已经mark过, 那么这里就会将rdd真正CP到文件中去, 这也是前面为什么说, mark必须在run job之前完成

      def runJob[T, U: ClassManifest](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          allowLocal: Boolean,
          resultHandler: (Int, U) => Unit) {
        val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,

    3. 在RDDCheckpointData.doCheckpoint中

    会调用rdd.markCheckpointed(newRDD), 清除dependencies信息
    并最终将状态设为, Checkpointed, 以表示完成CP

    4. Checkpoint如何使用, 在RDD.computeOrReadCheckpoint中, 看到如果已经完成CP, 会直接从firstParent中读数据, 刚开始会觉得比较奇怪

        if (isCheckpointed) {
          firstParent[T].iterator(split, context)

    RDD.firstParent的定义如下, 就是从dependencies中取第一个dependency的rdd


    而RDD.dependencies的定义如下, 可用看到在完成CP的情况下, 从dependencies中读到的其实就是CP RDD, 所以可以直接用

     checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {dependencies_}


      /** An Option holding our checkpoint RDD, if we are checkpointed */
      private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
      private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
      // Avoid handling doCheckpoint multiple times to prevent excessive recursion
      private var doCheckpointCalled = false
       * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
       * directory set with SparkContext.setCheckpointDir() and all references to its parent
       * RDDs will be removed. This function must be called before any job has been
       * executed on this RDD. It is strongly recommended that this RDD is persisted in
       * memory, otherwise saving it on a file will require recomputation.
      def checkpoint() {
        if (context.checkpointDir.isEmpty) {
          throw new Exception("Checkpoint directory has not been set in the SparkContext")
        } else if (checkpointData.isEmpty) {
          checkpointData = Some(new RDDCheckpointData(this)) // 创建RDDCheckpointData, 记录关于CP的所有信息
          checkpointData.get.markForCheckpoint()  // 标记为Marked
       * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
       * after a job using this RDD has completed (therefore the RDD has been materialized and
       * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
      private[spark] def doCheckpoint() {
        if (!doCheckpointCalled) {
          doCheckpointCalled = true
          if (checkpointData.isDefined) { 
            checkpointData.get.doCheckpoint() // 调用RDDCheckpointData.doCheckpoint
          } else {
            dependencies.foreach(_.rdd.doCheckpoint()) // 当checkpointData没有被创建, 就checkpoint所有的父RDD
       * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
       * created from the checkpoint file, and forget its old dependencies and partitions.
      private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
        clearDependencies()  // dependencies_ = null
        partitions_ = null
        deps = null    // Forget the constructor argument for dependencies too
       * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
      private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
        if (isCheckpointed) {
          firstParent[T].iterator(split, context)
        } else {
          compute(split, context)




     * Enumeration to manage state transitions of an RDD through checkpointing
     * [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]
    private[spark] object CheckpointState extends Enumeration { // 定义Checkpoint过程中的各个状态
      type CheckpointState = Value
      val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value
     * This class contains all the information related to RDD checkpointing. Each instance of this class
     * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
     * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
     * of the checkpointed RDD.
    private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
      extends Logging with Serializable {
      import CheckpointState._
      // The checkpoint state of the associated RDD.
      var cpState = Initialized  // cp状态,先设为Initialized 
      // The file to which the associated RDD has been checkpointed to
      @transient var cpFile: Option[String] = None
      // The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.
      var cpRDD: Option[RDD[T]] = None
      // Mark the RDD for checkpointing
      def markForCheckpoint() {
        RDDCheckpointData.synchronized { // 先mark, 防止两个同时做cp
          if (cpState == Initialized) cpState = MarkedForCheckpoint
      // Is the RDD already checkpointed
      def isCheckpointed: Boolean = {
        RDDCheckpointData.synchronized { cpState == Checkpointed }
      // Get the file to which this RDD was checkpointed to as an Option
      def getCheckpointFile: Option[String] = {
        RDDCheckpointData.synchronized { cpFile }
      // Do the checkpointing of the RDD. Called after the first job using that RDD is over.
      def doCheckpoint() {
        // If it is marked for checkpointing AND checkpointing is not already in progress,
        // then set it to be in progress, else return
        RDDCheckpointData.synchronized {
          if (cpState == MarkedForCheckpoint) { // 只有已经是MarkedForCheckpoint状态, 才能继续CP
            cpState = CheckpointingInProgress
          } else {
        // Create the output path for the checkpoint
        val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
        val fs = path.getFileSystem(new Configuration())
        if (!fs.mkdirs(path)) {
          throw new SparkException("Failed to create checkpoint path " + path)
        // Save to file, and reload it as an RDD
        rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)  // 将RDD存入文件, 完成cp
        val newRDD = new CheckpointRDD[T](rdd.context, path.toString) // 将RDD从文件重新载入
        // Change the dependencies and partitions of the RDD
        RDDCheckpointData.synchronized {
          cpFile = Some(path.toString)
          cpRDD = Some(newRDD)
          rdd.markCheckpointed(newRDD)   // 调用rdd.markCheckpointed清除deps
          cpState = Checkpointed // 将状态置为Checkpointed 
          logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
      // Get preferred location of a split after checkpointing
      def getPreferredLocations(split: Partition): Seq[String] = {
        RDDCheckpointData.synchronized {
      def getPartitions: Array[Partition] = {
        RDDCheckpointData.synchronized {
      def checkpointRDD: Option[RDD[T]] = {
        RDDCheckpointData.synchronized {




    private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
     * This RDD represents a RDD checkpoint file (similar to HadoopRDD).
    class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
      extends RDD[T](sc, Nil) {
      @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
      override def getPartitions: Array[Partition] = { // 应该是每个partitions是一个文件, 所以就是看cp目录里面的文件个数
        val cpath = new Path(checkpointPath)
        val numPartitions =
        // listStatus can throw exception if path does not exist.
        if (fs.exists(cpath)) {
          val dirContents = fs.listStatus(cpath)
          val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
          val numPart =  partitionFiles.size
          if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
              ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
            throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
        } else 0
        Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i))
      checkpointData = Some(new RDDCheckpointData[T](this))
      checkpointData.get.cpFile = Some(checkpointPath)
      override def getPreferredLocations(split: Partition): Seq[String] = {
        val status = fs.getFileStatus(new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)))
        val locations = fs.getFileBlockLocations(status, 0, status.getLen)
        locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
      override def compute(split: Partition, context: TaskContext): Iterator[T] = {
        val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
        CheckpointRDD.readFromFile(file, context) // compute就是从cp文件里面把RDD读取出来
      override def checkpoint() {
        // Do nothing. CheckpointRDD should not be checkpointed.
