zoukankan      html  css  js  c++  java
  • spark--job和DAGScheduler源码

     一个job对应一个action操作,action执行会有先后顺序;

    每个job执行会先构建一个DAG路径,一个job会含有多个stage,主要逻辑在DAGScheduler。

    spark提交job的源码见(SparkContext.scala的runJob方法):

      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          resultHandler: (Int, U) => Unit): Unit = {
        if (stopped.get()) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:
    " + rdd.toDebugString)
        }
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }
    

    DAGScheduler--job调度的核心入口:

    private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
    //创建finalStage
    var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted.
    //创建一个stage对象,并且将stage加入到DAGScheduler内存缓存中
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } //创建job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis()
    //将job加入到内存缓存中 jobIdToActiveJob(jobId)
    = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //使用submitStage() 方法提交finalStage
    submitStage(finalStage) }
  • 相关阅读:
    谷歌大规模机器学习:模型训练、特征工程和算法选择 (32PPT下载)
    (转) 深度强化学习综述:从AlphaGo背后的力量到学习资源分享(附论文)
    (转) Supercharging Style Transfer
    Summary on deep learning framework --- TensorFlow
    (转) How a Kalman filter works, in pictures
    Torch 两个矩形框重叠面积的计算 (IoU between tow bounding box)
    C、C++基础和编程风格 (转)
    Linux && shell
    求最短路径的条数
    一个链表中包含环,请找出该链表的环的入口结点。
  • 原文地址:https://www.cnblogs.com/parent-absent-son/p/11747750.html
Copyright © 2011-2022 走看看