  • spark-streaming的checkpoint机制源码分析

    http://www.cnblogs.com/dongxiao-yang/p/7994357.html

         spark-streaming定时对 DStreamGraph 和 JobScheduler 做 Checkpoint,来记录整个 DStreamGraph 的变化和每个 batch 的 job 的完成情况,Checkpoint 发起的间隔默认的是和 batchDuration 一致;即每次 batch 发起、提交了需要运行的 job 后就做 Checkpoint。另外在 job 完成了更新任务状态的时候再次做一下 Checkpoint。

    一 checkpoint生成


      private def generateJobs(time: Time) {
        // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
        // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
        ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
        Try {
          jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
          graph.generateJobs(time) // generate jobs using allocated block
        } match {
          case Success(jobs) =>
            val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
            jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
          case Failure(e) =>
            jobScheduler.reportError("Error generating jobs for time " + time, e)
        eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

     job 完成

      private def clearMetadata(time: Time) {
        // If checkpointing is enabled, then checkpoint,
        // else mark batch to be fully processed
        if (shouldCheckpoint) {
          eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
        } else {
          // If checkpointing is not enabled, then delete metadata information about
          // received blocks (block data not saved in any case). Otherwise, wait for
          // checkpointing of this batch to complete.
          val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
          jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
          jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)

    上文里面的eventLoop是JobGenerator内部的一个消息事件队列的封装,eventLoop内部会有一个后台线程不断的去消费事件,所以DoCheckpoint这种类型的事件会经过processEvent ->

    doCheckpoint  由checkpointWriter把生成的Checkpoint对象写到外部存储:

      /** Processes all events */
      private def processEvent(event: JobGeneratorEvent) {
        logDebug("Got event " + event)
        event match {
          case GenerateJobs(time) => generateJobs(time)
          case ClearMetadata(time) => clearMetadata(time)
          case DoCheckpoint(time, clearCheckpointDataLater) =>
            doCheckpoint(time, clearCheckpointDataLater)
          case ClearCheckpointData(time) => clearCheckpointData(time)
      /** Perform checkpoint for the give `time`. */
      private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
        if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
          logInfo("Checkpointing graph for time " + time)
          checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

    doCheckpoint在调用checkpointWriter写数据到hdfs之前,首先会运行一下ssc.graph.updateCheckpointData(time),这个方法的主要作用是更新DStreamGraph里所有input和output stream对应的checkpointData属性,调用链路为DStreamGraph.updateCheckpointData -> Dstream.updateCheckpointData -> checkpointData.update

      def updateCheckpointData(time: Time) {
        logInfo("Updating checkpoint data for time " + time)
        this.synchronized {
        logInfo("Updated checkpoint data for time " + time)
      private[streaming] def updateCheckpointData(currentTime: Time) {
        logDebug(s"Updating checkpoint data for time $currentTime")
        logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")
      class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
        def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
          data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
        override def update(time: Time): Unit = {
          generatedRDDs.foreach { kv =>
            val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray
            batchForTime += kv._1 -> a
        override def cleanup(time: Time): Unit = { }
        override def restore(): Unit = {
          batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
             logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
             generatedRDDs += t -> new KafkaRDD[K, V](
               // during restore, it's possible same partition will be consumed from multiple
               // threads, so dont use cache



    class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
      extends Logging with Serializable {
      val master = ssc.sc.master
      val framework = ssc.sc.appName
      val jars = ssc.sc.jars
      val graph = ssc.graph
      val checkpointDir = ssc.checkpointDir
      val checkpointDuration = ssc.checkpointDuration
      val pendingTimes = ssc.scheduler.getPendingTimes().toArray
      val sparkConfPairs = ssc.conf.getAll

    二 从checkpoint恢复服务



      def getOrCreate(
          checkpointPath: String,
          creatingFunc: () => StreamingContext,
          hadoopConf: Configuration = SparkHadoopUtil.get.conf,
          createOnError: Boolean = false
        ): StreamingContext = {
        val checkpointOption = CheckpointReader.read(
          checkpointPath, new SparkConf(), hadoopConf, createOnError)
        checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
      def read(
          checkpointDir: String,
          conf: SparkConf,
          hadoopConf: Configuration,
          ignoreReadError: Boolean = false): Option[Checkpoint] = {
        val checkpointPath = new Path(checkpointDir)
        val fs = checkpointPath.getFileSystem(hadoopConf)
        // Try to find the checkpoint files
        val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse
        if (checkpointFiles.isEmpty) {
          return None
        // Try to read the checkpoint files in the order
        logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}")
        var readError: Exception = null
        checkpointFiles.foreach { file =>
          logInfo(s"Attempting to load checkpoint from file $file")
          try {
            val fis = fs.open(file)
            val cp = Checkpoint.deserialize(fis, conf)
            logInfo(s"Checkpoint successfully loaded from file $file")
            logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}")
            return Some(cp)
          } catch {
            case e: Exception =>
              readError = e
              logWarning(s"Error reading checkpoint from file $file", e)
        // If none of checkpoint files could be read, then throw exception
        if (!ignoreReadError) {
          throw new SparkException(
            s"Failed to read checkpoint from directory $checkpointPath", readError)

     在从checkpoint恢复的过程中DStreamGraph由checkpoint恢复,下文的代码调用路径StreamingContext.graph->DStreamGraph.restoreCheckpointData ->   DStream.restoreCheckpointData->checkpointData.restore

      private[streaming] val graph: DStreamGraph = {
        if (isCheckpointPresent) {
        } else {
          require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
          val newGraph = new DStreamGraph()
      def restoreCheckpointData() {
        logInfo("Restoring checkpoint data")
        this.synchronized {
        logInfo("Restored checkpoint data")
      private[streaming] def restoreCheckpointData() {
        if (!restoredFromCheckpointData) {
          // Create RDDs from the checkpoint data
          logInfo("Restoring checkpoint data")
          restoredFromCheckpointData = true
          logInfo("Restored checkpoint data")
        override def restore(): Unit = {
          batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
             logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
             generatedRDDs += t -> new KafkaRDD[K, V](
               // during restore, it's possible same partition will be consumed from multiple
               // threads, so dont use cache


      private def restart() {
        // If manual clock is being used for testing, then
        // either set the manual clock to the last checkpointed time,
        // or if the property is defined set it to that time
        if (clock.isInstanceOf[ManualClock]) {
          val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
          val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
          clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
        val batchDuration = ssc.graph.batchDuration
        // Batches when the master was down, that is,
        // between the checkpoint and current restart time
        val checkpointTime = ssc.initialCheckpoint.checkpointTime
        val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
        val downTimes = checkpointTime.until(restartTime, batchDuration)
        logInfo("Batches during down time (" + downTimes.size + " batches): "
          + downTimes.mkString(", "))
        // Batches that were unprocessed before failure
        val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
        logInfo("Batches pending processing (" + pendingTimes.length + " batches): " +
          pendingTimes.mkString(", "))
        // Reschedule jobs for these times
        val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }
        logInfo("Batches to reschedule (" + timesToReschedule.length + " batches): " +
          timesToReschedule.mkString(", "))
        timesToReschedule.foreach { time =>
          // Allocate the related blocks when recovering from failure, because some blocks that were
          // added but not allocated, are dangling in the queue after recovering, we have to allocate
          // those blocks to the next batch, which is the batch they were supposed to go.
          jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
          jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
        // Restart the timer
        logInfo("Restarted JobGenerator at " + restartTime)




