zoukankan      html  css  js  c++  java
  • spark checkpoint机制

    首先rdd.checkpoint()本身并没有执行任何的写操作,只是做checkpointDir是否为空,然后生成一个ReliableRDDCheckpointData对象checkpointData,这个对象完成checkpoint的大部分工作。

    /**
        * 只是生成了一个ReliableRDDCheckpointData的对象,并没有具体的实质操作
        * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
        * directory set with `SparkContext#setCheckpointDir` and all references to its parent
        * RDDs will be removed. This function must be called before any job has been
        * executed on this RDD. It is strongly recommended that this RDD is persisted in
        * memory, otherwise saving it on a file will require recomputation.
        */
      def checkpoint(): Unit = RDDCheckpointData.synchronized {
        // NOTE: we use a global lock here due to complexities downstream with ensuring
        // children RDD partitions point to the correct parent partitions. In the future
        // we should revisit this consideration.
        if (context.checkpointDir.isEmpty) {
          throw new SparkException("Checkpoint directory has not been set in the SparkContext")
        } else if (checkpointData.isEmpty) {
          checkpointData = Some(new ReliableRDDCheckpointData(this))
        }
      }

    真正触发checkpoint操作的是rdd调用完checkpoint之后执行完的第一个action操作。

      /**
        * Run a function on a given set of partitions in an RDD and pass the results to the given
        * handler function. This is the main entry point for all actions in Spark.
        */
      def runJob[T, U: ClassTag](
                                  rdd: RDD[T],
                                  func: (TaskContext, Iterator[T]) => U,
                                  partitions: Seq[Int],
                                  resultHandler: (Int, U) => Unit): Unit = {
        if (stopped.get()) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:
    " + rdd.toDebugString)
        }
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }

    其中调用rdd.doCheckpoint(),doCheckpoint代码如下:

    /**
        * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
        * has completed (therefore the RDD has been materialized and potentially stored in memory).
        * doCheckpoint() is called recursively on the parent RDDs.
        *
        * checkpointData.get.checkpoint()方法执行具体的写操作,由sc的action触发。如果本身没有checkpoint就根据依赖关系依次往上找。
        */
      private[spark] def doCheckpoint(): Unit = {
        RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
          if (!doCheckpointCalled) {
            doCheckpointCalled = true
            if (checkpointData.isDefined) {
              if (checkpointAllMarkedAncestors) {
                // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
                // them in parallel.
                // Checkpoint parents first because our lineage will be truncated after we
                // checkpoint ourselves
                dependencies.foreach(_.rdd.doCheckpoint())
              }
              checkpointData.get.checkpoint()
            } else {
              dependencies.foreach(_.rdd.doCheckpoint())
            }
          }
        }
      }

    其中checkpointData.get.checkpoint执行了最基本的写任务,docheckpoint的任务职能是如果该rdd执行过checkpoint操作,如果是把该RDD的祖先都checkpoint了,那么就根据依赖关系一次checkpoint操作。如果RDD本身没有

    调用过checkpoint操作,那么就根据依赖关系一次checkpoint操作。

    接下来看checkpointData.get.checkpoint的具体实现,其中主要功能在于ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)方法。

      /**
        * Materialize this RDD and write its content to a reliable DFS.
        * This is called immediately after the first action invoked on this RDD has completed.
        *
        * writeRDDToCheckpointDirectory方法将RDD写到指定目录
        */
      protected override def doCheckpoint(): CheckpointRDD[T] = {
        val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    
        // Optionally clean our checkpoint files if the reference is out of scope
        if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
          rdd.context.cleaner.foreach { cleaner =>
            cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
          }
        }
    
        logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
        newRDD
      }

    以下是ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)的方法实现。主要包含两本分,写partition数据和写partitioner。具体如下:

      /**
        * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.
        * 写RDD到hdfs,包括partition数据和partitioner数据
        */
      def writeRDDToCheckpointDirectory[T: ClassTag](
                                                      originalRDD: RDD[T],
                                                      checkpointDir: String,
                                                      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    
        val sc = originalRDD.sparkContext
    
        // Create the output path for the checkpoint
        val checkpointDirPath = new Path(checkpointDir)
        val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
        if (!fs.mkdirs(checkpointDirPath)) {
          throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
        }
    
        // Save to file, and reload it as an RDD
        val broadcastedConf = sc.broadcast(
          new SerializableConfiguration(sc.hadoopConfiguration))
        // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
        sc.runJob(originalRDD,
          writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
    
        if (originalRDD.partitioner.nonEmpty) {
          writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
        }
    
        val newRDD = new ReliableCheckpointRDD[T](
          sc, checkpointDirPath.toString, originalRDD.partitioner)
        if (newRDD.partitions.length != originalRDD.partitions.length) {
          throw new SparkException(
            s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
              s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
        }
        newRDD
      }

    写partition数据:

    sc.runJob(originalRDD,
          writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
    /**
        * Write an RDD partition's data to a checkpoint file.
        */
      def writePartitionToCheckpointFile[T: ClassTag](
                                                       path: String,
                                                       broadcastedConf: Broadcast[SerializableConfiguration],
                                                       blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
        val env = SparkEnv.get
        val outputDir = new Path(path)
        val fs = outputDir.getFileSystem(broadcastedConf.value.value)
    
        val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
        val finalOutputPath = new Path(outputDir, finalOutputName)
        val tempOutputPath =
          new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
    
        val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    
        val fileOutputStream = if (blockSize < 0) {
          fs.create(tempOutputPath, false, bufferSize)
        } else {
          // This is mainly for testing purpose
          fs.create(tempOutputPath, false, bufferSize,
            fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
        }
        val serializer = env.serializer.newInstance()
        val serializeStream = serializer.serializeStream(fileOutputStream)
        Utils.tryWithSafeFinally {
          serializeStream.writeAll(iterator)
        } {
          serializeStream.close()
        }
    
        if (!fs.rename(tempOutputPath, finalOutputPath)) {
          if (!fs.exists(finalOutputPath)) {
            logInfo(s"Deleting tempOutputPath $tempOutputPath")
            fs.delete(tempOutputPath, false)
            throw new IOException("Checkpoint failed: failed to save output of task: " +
              s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
          } else {
            // Some other copy of this task must've finished before us and renamed it
            logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
            if (!fs.delete(tempOutputPath, false)) {
              logWarning(s"Error deleting ${tempOutputPath}")
            }
          }
        }
      }

    111

    写partitioner如下:

    /**
        * Write a partitioner to the given RDD checkpoint directory. This is done on a best-effort
        * basis; any exception while writing the partitioner is caught, logged and ignored.
        */
      private def writePartitionerToCheckpointDir(
                                                   sc: SparkContext, partitioner: Partitioner, checkpointDirPath: Path): Unit = {
        try {
          val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName)
          val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
          val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
          val fileOutputStream = fs.create(partitionerFilePath, false, bufferSize)
          val serializer = SparkEnv.get.serializer.newInstance()
          val serializeStream = serializer.serializeStream(fileOutputStream)
          Utils.tryWithSafeFinally {
            serializeStream.writeObject(partitioner)
          } {
            serializeStream.close()
          }
          logDebug(s"Written partitioner to $partitionerFilePath")
        } catch {
          case NonFatal(e) =>
            logWarning(s"Error writing partitioner $partitioner to $checkpointDirPath")
        }
      }
  • 相关阅读:
    利用freopen()函数和fc命令简化程序调试
    A Guide to the Multiboot Process
    略谈cpu架构种类
    RHEL与Centos
    九度 1470 调整方阵
    九度 1481 Is It A Tree?
    九度 1548 平面上的点(技巧题)
    九度 1547 出入栈(递推DP)
    2014年王道论坛研究生机试练习赛(一) set 1 GrassLand密码
    13年10月 月赛第一场 set 4 迷宫问题
  • 原文地址:https://www.cnblogs.com/fantiantian/p/9473160.html
Copyright © 2011-2022 走看看