zoukankan      html  css  js  c++  java
  • SparkSteaming运行流程分析以及CheckPoint操作

    本文主要通过源码来了解SparkStreaming程序从任务生成到任务完成整个执行流程以及中间伴随的checkpoint操作

    注:下面源码只贴出跟分析内容有关的代码,其他省略

    1 分析流程

    应用程序入口:

    val sparkConf = new SparkConf().setAppName("SparkStreaming")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
    ssc.start()
    ssc.awaitTermination()
    

    一旦ssc.start()调用之后,程序便真正开始运行

    第一步:
    SparkStreamingContext.start()进行如下主要工作:

    • 调用JobScheduler.start()
    • 发送StreamingListenerStreamingStarted消息
    JobScheduler.start()
    
    def start(): Unit = synchronized {
        state match {
          case INITIALIZED =>
            StreamingContext.ACTIVATION_LOCK.synchronized {
              StreamingContext.assertNoOtherContextIsActive()
              try{
                  ...
                  scheduler.start()
                }
                state = StreamingContextState.ACTIVE
                scheduler.listenerBus.post(
                  StreamingListenerStreamingStarted(System.currentTimeMillis()))
              } catch {
                ...
              }
              StreamingContext.setActiveContext(this)
            }
            ...
          case ACTIVE =>
            logWarning("StreamingContext has already been started")
          case STOPPED =>
            throw new IllegalStateException("StreamingContext has already been stopped")
        }
      }
    

    第二步:
    调用JobScheduler.start()执行以下主要操作:

    • 创建EventLoop用于处理接收到的JobSchedulerEvent,processEvent就是实际的处理逻辑
    • 调用jobGenerator.start()
    JobScheduler.start():
    
    def start(): Unit = synchronized {
        if (eventLoop != null) return // scheduler has already been started
    
        logDebug("Starting JobScheduler")
        //创建一个Event监听器并启动
        eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
          override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    
          override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
        }
        eventLoop.start()
        ...
        //启动JobGenerator
        jobGenerator.start()
        ...
      }
    

    第三步:
    JobGenerator.start()执行以下主要操作:

    • 创建EventLoop[JobGeneratorEvent]用于处理JobGeneratorEvent事件
    • 开始执行job的生成工作
      • 创建一个timer周期地执行eventLoop.post(GenerateJobs(new Time(longTime)))
      • JobGenerator.start()中的EventLoop收到GenerateJobs事件后,去执行generateJobs(time)
      • generateJobs()中生成JobSet并调用jobScheduler.submitJobSet()进行提交,然后发送一个DoCheckpointEvent进行checkpoint
    JobGenerator.start()
    
    def start(): Unit = synchronized {
        if (eventLoop != null) return // generator has already been started
        //创建checkpointWriter用于将checkpoint信息持久化
        checkpointWriter
        //创建了Event监听器,用于监听JobGeneratorEvent并处理
        eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
          override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
    
          override protected def onError(e: Throwable): Unit = {
            jobScheduler.reportError("Error in job generator", e)
          }
        }
        eventLoop.start()
    
        if (ssc.isCheckpointPresent) {
          //从checkpoint中恢复
          restart()
        } else {
          //首次创建
          startFirstTime()
        }
    }
    

    首次启动会调用startFirstTime(),在该方法中主要是调用已经初始化好的RecurringTimer.start()进行周期性的发送GenerateJobs事件,这个周期是ssc.graph.batchDuration.milliseconds也就是你所设置的batchTime,JobGenerate.start()中所创建好的EventLoop收到GenerateJobs事件,就会执行processEvent(),从而执行generateJobs(time)来进行Job的生成工作

      private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
        longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
    
      private def startFirstTime() {
        val startTime = new Time(timer.getStartTime())
        graph.start(startTime - graph.batchDuration)
        timer.start(startTime.milliseconds)
        logInfo("Started JobGenerator at " + startTime)
      }
    
      private def processEvent(event: JobGeneratorEvent) {
        logDebug("Got event " + event)
        event match {
          case GenerateJobs(time) => generateJobs(time)
          case ClearMetadata(time) => clearMetadata(time)
          case DoCheckpoint(time, clearCheckpointDataLater) =>
            doCheckpoint(time, clearCheckpointDataLater)
          case ClearCheckpointData(time) => clearCheckpointData(time)
        }
      }
    
    

    generateJobs的主要工作:

    • 生成JobSet并调用jobScheduler.submitJobSet()进行提交
    • 发送一个DoCheckpointEvent进行checkpoint
      private def generateJobs(time: Time) {
        ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
        Try {
          jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
          graph.generateJobs(time) // generate jobs using allocated block
        } match {
          case Success(jobs) =>
            val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
            jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
          case Failure(e) =>
            jobScheduler.reportError("Error generating jobs for time " + time, e)
            PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
        }
        eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
      }
    

    第一个操作:jobScheduler.submitJobSet()中的主要操作是遍历jobSet中的job,并将其作为参数传入JobHandler对象中,并将JobHandler丢到jobExecutor中去执行。JobHandler是实现了Runnable,它的run()主要做了以下三件事

    • 发送JobStarted事件
    • 执行Job.run()
    • 发送JobCompleted事件
    def submitJobSet(jobSet: JobSet) {
        if (jobSet.jobs.isEmpty) {
          logInfo("No jobs added for time " + jobSet.time)
        } else {
          listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
          jobSets.put(jobSet.time, jobSet)
          jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
          logInfo("Added jobs for time " + jobSet.time)
        }
    }
    
    private class JobHandler(job: Job) extends Runnable with Logging {
        import JobScheduler._
        def run() {
          try {
            var _eventLoop = eventLoop
            if (_eventLoop != null) {
              _eventLoop.post(JobStarted(job, clock.getTimeMillis()))//发送JobStarted事件
              SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
                job.run()
              }
              _eventLoop = eventLoop
              if (_eventLoop != null) {
                _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))//发送JobCompleted事件
              }
            } else {
            }
          } finally {
            ssc.sparkContext.setLocalProperties(oldProps)
          }
        }
      }
    

    第二个操作:发送DoCheckpoint事件

    JobScheduler.start()中创建的EventLoop的核心内容是processEvent(event)方法,Event的类型有三种,分别是JobStarted、JobCompleted和ErrorReported,EventLoop收到DoCheckpoint事件后会执行doCheckpoint():

      //JobGenerator.processEvent()
      private def processEvent(event: JobGeneratorEvent) {
        logDebug("Got event " + event)
        event match {
          ...
          case DoCheckpoint(time, clearCheckpointDataLater) =>
            doCheckpoint(time, clearCheckpointDataLater)
          ...
        }
      }
    

    doCheckpoint()调用graph.updateCheckpointData进行checkpoint信息的更新,调用checkpointWriter.write对checkpoint信息进行持久化

      private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
        if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
          logInfo("Checkpointing graph for time " + time)
          //将新的checkpoint写到
          ssc.graph.updateCheckpointData(time)
          //将checkpoint写到文件系统中
          checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
        } else if (clearCheckpointDataLater) {
          markBatchFullyProcessed(time)
        }
      }
    

    checkpoint的update中主要是调用DStreamGraph.updateCheckpointData:

    def updateCheckpointData(time: Time) {
        logInfo("Updating checkpoint data for time " + time)
        this.synchronized {
          outputStreams.foreach(_.updateCheckpointData(time))
        }
        logInfo("Updated checkpoint data for time " + time)
      }
    

    outputStreams.foreach(_.updateCheckpointData(time))则是调用了DStream中的updateCheckpointData方法,而该方法主要是调用checkpointData.update(currentTime)来进行更新,并且调用该DStream所依赖的DStream进行更新。

    private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
    
    private[streaming] def updateCheckpointData(currentTime: Time) {
        logDebug(s"Updating checkpoint data for time $currentTime")
        checkpointData.update(currentTime)
        dependencies.foreach(_.updateCheckpointData(currentTime))
        logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")
      }
    

    我们接下来来看看checkpointData.update(currentTime):我们可以在DStream中看到以下的实现:

    private[streaming] val checkpointData = new DStreamCheckpointData(this)
    

    我们接着找到了:DStreamCheckpointData.update,DStreamCheckpointData有其他子类用于自定义保存的内容和逻辑

      //key为指定时间,value为checkpoint file内容
      @transient private var timeToCheckpointFile = new HashMap[Time, String]
      // key为batchtime,value该batch中最先checkpointed RDD的time
      @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
      protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
    
    def update(time: Time) {
        //从dsteam中获得要checkpoint的RDDs,generatedRDDs就是一个HashMap[Time, RDD[T]]
        val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
                                           .map(x => (x._1, x._2.getCheckpointFile.get))
        logDebug("Current checkpoint files:
    " + checkpointFiles.toSeq.mkString("
    "))
    
        // checkpoint文件添加到最后要进行序列化的HashMap中
        if (!checkpointFiles.isEmpty) {
          currentCheckpointFiles.clear()
          currentCheckpointFiles ++= checkpointFiles
          //更新checkpointfile
          timeToCheckpointFile ++= currentCheckpointFiles
          // key为传入的time,value为最先进行checkpoint的rdd的time
          timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
        }
      }
    

    第四步:任务完成
    在上面generateJobs中所调用的jobScheduler.submitJobSet()中提到每个Job都会作为参数传入JobHandler,而JobHandler会丢到JobExecutor中去执行,而JobHandler的主要工作是发送JobStarted事件,执行完任务后会发送JobCompleted事件,而JobScheduler.EventLoop收到事件后会执行handleJobCompletion方法

     //JobScheduler.processEvent()
     private def processEvent(event: JobSchedulerEvent) {
        try {
          event match {
            case JobStarted(job, startTime) => handleJobStart(job, startTime)
            case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
            case ErrorReported(m, e) => handleError(m, e)
          }
        } catch {
          case e: Throwable =>
            reportError("Error in job scheduler", e)
        }
      }
    

    handleJobCompletion方法会调用jobSet.handleJobCompletion(job),并且在最后会判断jobSet是否已经全部完成,如果是的话会执行jobGenerator.onBatchCompletion(jobSet.time)

    private def handleJobCompletion(job: Job, completedTime: Long) {
        val jobSet = jobSets.get(job.time)
        jobSet.handleJobCompletion(job)
        job.setEndTime(completedTime)
        listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
        logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
        if (jobSet.hasCompleted) {
          listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
        }
        job.result match {
          case Failure(e) =>
            reportError("Error running job " + job, e)
          case _ => //如果所有事件完成则会执行以下操作
            if (jobSet.hasCompleted) {
              jobSets.remove(jobSet.time)
              jobGenerator.onBatchCompletion(jobSet.time)
              logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
                jobSet.totalDelay / 1000.0, jobSet.time.toString,
                jobSet.processingDelay / 1000.0
              ))
            }
        }
      }
    

    此时到JobGenerator中找到onBatchCompletion():

    def onBatchCompletion(time: Time) {
        eventLoop.post(ClearMetadata(time))
    }
    

    JobGenerator.processEvent()执行clearMetadata(time)

    private def processEvent(event: JobGeneratorEvent) {
        logDebug("Got event " + event)
        event match {
          case GenerateJobs(time) => generateJobs(time)
          case ClearMetadata(time) => clearMetadata(time)
          case DoCheckpoint(time, clearCheckpointDataLater) =>
            doCheckpoint(time, clearCheckpointDataLater)
          case ClearCheckpointData(time) => clearCheckpointData(time)
        }
    }
    

    clearMetadata()对原数据进行checkpoint或者删除

    private def clearMetadata(time: Time) {
        ssc.graph.clearMetadata(time)
    
        // If checkpointing is enabled, then checkpoint,
        // else mark batch to be fully processed
        if (shouldCheckpoint) {
          //如果需要进行checkpoint,发送DoCheckpoint事件 
          eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
        } else {
          //将meta数据进行删除
        }
    }
    
    

    2 总结

    到这里SparkStreaming的启动、任务生成、任务结束、Checkpoint操作基本就了解完毕了,最后我们来做一个总结,SparkStreming程序的运行流程如下:

    • SparkStreamingContext.start() 启动 JobScheduler
    • JobScheduler的启动操作
      • JobScheduler 创建了 EventLoop[JobSchedulerEvent] 来处理 JobStarted 和 JobCompleted 事件
      • 启动 JobGenerator
    • JobGenerator 的启动操作
      • JobGenerator 创建了 EventLoop[JobGeneratorEvent] 来处理 GenerateJobs、ClearMetaData、DoCheckPoint和ClearCheckpointData 事件
      • 创建RecurringTimer周期性地发送 GenerateJobs 事件用于任务的周期性创建和执行
    • JobGenerator的任务生成工作
      • 调用 geneateJobs() 来生成 JobSet 并通过 JobScheduler.submitJobset 进行任务的提交
        • submitJobset 将 Job 作为参数传入 JobHandler ,并将 JobHandler 丢进线程池 JobExecutor 中执行
      • 发送 DoCheckPoint 事件进行元数据的 checkpoint
    • 任务完成
      • JobHandler 中任务完成之后会发送 JobCompleted 事件,JobScheduler.EventLoop 接收到该事件后会进行处理,并且判断 JobSet 全部完成之后,发送 ClearMetaData 事件,进行数据的 checkpoint 或者删除

    附:RecurringTimer和EventLoop的源码,并作简单的注释

    RecurringTimer的代码如下:

    private[streaming]
    class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
      extends Logging {
      //创建一个thread,用来执行loop
      private val thread = new Thread("RecurringTimer - " + name) {
        setDaemon(true)
        override def run() { loop }
      }
    
      @volatile private var prevTime = -1L
      @volatile private var nextTime = -1L
      @volatile private var stopped = false
    
      def getStartTime(): Long = {
        (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
      }
    
      def getRestartTime(originalStartTime: Long): Long = {
        val gap = clock.getTimeMillis() - originalStartTime
        (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
      }
    
      //start方法中主要是启动thread,用于执行thread中的loop
      def start(startTime: Long): Long = synchronized {
        nextTime = startTime
        thread.start()
        logInfo("Started timer for " + name + " at time " + nextTime)
        nextTime
      }
    
      def start(): Long = {
        start(getStartTime())
      }
    
      def stop(interruptTimer: Boolean): Long = synchronized {
        if (!stopped) {
          stopped = true
          if (interruptTimer) {
            thread.interrupt()
          }
          thread.join()
          logInfo("Stopped timer for " + name + " after time " + prevTime)
        }
        prevTime
      }
    
      private def triggerActionForNextInterval(): Unit = {
        clock.waitTillTime(nextTime)
        callback(nextTime)
        prevTime = nextTime
        nextTime += period
        logDebug("Callback for " + name + " called at time " + prevTime)
      }
    
      //周期性地执行callback的内容,也就是 
      private def loop() {
        try {
          while (!stopped) {
            triggerActionForNextInterval()
          }
          triggerActionForNextInterval()
        } catch {
          case e: InterruptedException =>
        }
      }
    }
    

    EventLoop的源码:主要功能就是创建一个线程在后台判断是否Event进来,有的话则进行相应的处理

    private[spark] abstract class EventLoop[E](name: String) extends Logging {
    
      private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
    
      private val stopped = new AtomicBoolean(false)
    
      private val eventThread = new Thread(name) {
        setDaemon(true)
    
        override def run(): Unit = {
          try {
            while (!stopped.get) {
              val event = eventQueue.take()
              try {
                onReceive(event)
              } catch {
                case NonFatal(e) =>
                  try {
                    onError(e)
                  } catch {
                    case NonFatal(e) => logError("Unexpected error in " + name, e)
                  }
              }
            }
          } catch {
            case ie: InterruptedException => // exit even if eventQueue is not empty
            case NonFatal(e) => logError("Unexpected error in " + name, e)
          }
        }
    
      }
    
      def start(): Unit = {
        if (stopped.get) {
          throw new IllegalStateException(name + " has already been stopped")
        }
        // Call onStart before starting the event thread to make sure it happens before onReceive
        onStart()
        eventThread.start()
      }
    
      def stop(): Unit = {
        if (stopped.compareAndSet(false, true)) {
          eventThread.interrupt()
          var onStopCalled = false
          try {
            eventThread.join()
            // Call onStop after the event thread exits to make sure onReceive happens before onStop
            onStopCalled = true
            onStop()
          } catch {
            case ie: InterruptedException =>
              Thread.currentThread().interrupt()
              if (!onStopCalled) {
                // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
                // it's already called.
                onStop()
              }
          }
        } else {
          // Keep quiet to allow calling `stop` multiple times.
        }
      }
    
      //将event放进eventQueue中,在eventThread进行相应的处理
      def post(event: E): Unit = {
        eventQueue.put(event)
      }
    
      //返回eventThread是否存活
      def isActive: Boolean = eventThread.isAlive
    
      //eventThread启动前调用
      protected def onStart(): Unit = {}
    
      //在eventThred结束后调用
      protected def onStop(): Unit = {}
    
      //实现类实现Event的处理逻辑
      protected def onReceive(event: E): Unit
    
      //出错时的处理逻辑
      protected def onError(e: Throwable): Unit
    
    }
    
  • 相关阅读:
    2020软件工程作业04
    2020软件工程作业03
    2020软件工程作业02
    2020软件工程作业01
    2020软件工程作业02
    2020软件工程作业01
    WEB学习路线2019完整版(附视频教程+网盘下载地址)
    (2018干货系列七)最新大数据学习路线整合
    (2018干货系列六)最新云计算学习路线整合
    (2018干货系列五)最新UI设计学习路线整合
  • 原文地址:https://www.cnblogs.com/simple-focus/p/8463259.html
Copyright © 2011-2022 走看看