zoukankan      html  css  js  c++  java
  • spark--job和DAGScheduler源码

     一个job对应一个action操作,action执行会有先后顺序;

    每个job执行会先构建一个DAG路径,一个job会含有多个stage,主要逻辑在DAGScheduler。

    spark提交job的源码见(SparkContext.scala的runJob方法):

      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          resultHandler: (Int, U) => Unit): Unit = {
        if (stopped.get()) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:
    " + rdd.toDebugString)
        }
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }
    

    DAGScheduler--job调度的核心入口:

    private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
    //创建finalStage
    var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted.
    //创建一个stage对象,并且将stage加入到DAGScheduler内存缓存中
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } //创建job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis()
    //将job加入到内存缓存中 jobIdToActiveJob(jobId)
    = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //使用submitStage() 方法提交finalStage
    submitStage(finalStage) }
  • 相关阅读:
    git版本库管理介绍,撤销git pull操作
    【laravel5.4】自定义404、503等页面
    【laravel5.4】{{$name}}、{{name}}、@{{$name}} 和 @{{name}} 的区别
    python 了解一点属性的延迟计算
    python 了解一下__dict__
    excel怎么把一个sheet的 全部内容打印到一页纸上
    python 简单了解一下 描述器
    python 调用父类方法, 重写父类构造方法, 不显式调用,会报错
    Python 今天抽空学习了@Property
    python 语法糖是什么意思
  • 原文地址:https://www.cnblogs.com/parent-absent-son/p/11747750.html
Copyright © 2011-2022 走看看