zoukankan      html  css  js  c++  java
  • spark streaming job生成与运行

    spark streaming job生成

    spark Streaming每次提交job的时候,会提交几个呢?

    DStreamGraph#
      def generateJobs(time: Time): Seq[Job] = {
        logDebug("Generating jobs for time " + time)
        val jobs = this.synchronized {
          outputStreams.flatMap { outputStream =>
            val jobOption = outputStream.generateJob(time)
            jobOption.foreach(_.setCallSite(outputStream.creationSite))
            jobOption
          }
        }
        logDebug("Generated " + jobs.length + " jobs for time " + time)
        jobs
      }
    

    分别根据每个outputStream生成job,也就是说有多少个outputStream,就会有多少job

    outputStream如何生成呢

    DStream####
      private def foreachRDD(
          foreachFunc: (RDD[T], Time) => Unit,
          displayInnerRDDOps: Boolean): Unit = {
        new ForEachDStream(this,
          context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
      }
    

    这里通过register方法来注册outputStream

    DStream#
      /**
       * Register this streaming as an output stream. This would ensure that RDDs of this
       * DStream will be generated.
       */
      private[streaming] def register(): DStream[T] = {
        ssc.graph.addOutputStream(this)
        this
      }
    

    体现在代码级别上,每执行一个foreach方法,提交job的时候就会有新增一个job,如果整个应用中没有foreach,也就是说没有 outputStream的话,会触发异常。

    DStreamGraph#
      def validate() {
        this.synchronized {
          require(batchDuration != null, "Batch duration has not been set")
          // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
          // " is very low")
          require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute")
        }
      }
    

    生成job

    DStream#
       * Generate a SparkStreaming job for the given time. This is an internal method that
       * should not be called directly. This default implementation creates a job
       * that materializes the corresponding RDD. Subclasses of DStream may override this
       * to generate their own jobs.
       */
      private[streaming] def generateJob(time: Time): Option[Job] = {
        getOrCompute(time) match {
          case Some(rdd) => {
            val jobFunc = () => {
              val emptyFunc = { (iterator: Iterator[T]) => {} }
              context.sparkContext.runJob(rdd, emptyFunc)
            }
            Some(new Job(time, jobFunc))
          }
          case None => None
        }
      }
    

    此处有jobFunc,直接调用的spark的runJob方法,runJob的分析,可以参考我另一篇博客。

    job提交

    JobGenerator#
      /** Generate jobs and perform checkpoint for the given `time`.  */
      private def generateJobs(time: Time) {
        // Set the SparkEnv in this thread, so that job generation code can access the environment
        // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
        // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
        SparkEnv.set(ssc.env)
        Try {
          jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
          //此处生成job
          graph.generateJobs(time) // generate jobs using allocated block
        } match {
          case Success(jobs) =>
            val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
            //此处提交job
            jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
          case Failure(e) =>
            jobScheduler.reportError("Error generating jobs for time " + time, e)
        }
        eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
      }
    
    
    JobScheduler#
      def submitJobSet(jobSet: JobSet) {
        if (jobSet.jobs.isEmpty) {
          logInfo("No jobs added for time " + jobSet.time)
        } else {
          listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
          jobSets.put(jobSet.time, jobSet)
          jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
          logInfo("Added jobs for time " + jobSet.time)
        }
      }
    
    

    内部有线程池,提交JobHandler

      private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
      
      private val jobExecutor =
        ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
    
      private class JobHandler(job: Job) extends Runnable with Logging {
        import JobScheduler._
    
        def run() {
          try {
    			....
              PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
              //此处就是job的最终运行的地方
                job.run()
              }
              ....
        }
      }
    
    Job#
      def run() {
        _result = Try(func())
      }
    

    //此处func方法就是在job生成时的jobFunc,调用的runJob方法。

  • 相关阅读:
    (Good Bye 2019) Codeforces 1270B Interesting Subarray
    (Good Bye 2019) Codeforces 1270A Card Game
    Codeforces 1283D Christmas Trees(BFS)
    Codeforces 1283C Friends and Gifts
    Codeforces 1283B Candies Division
    1095 Cars on Campus (30)
    1080 Graduate Admission (30)
    1099 Build A Binary Search Tree (30)
    1018 Public Bike Management (30)
    1087 All Roads Lead to Rome (30)
  • 原文地址:https://www.cnblogs.com/luckuan/p/5386209.html
Copyright © 2011-2022 走看看