zoukankan      html  css  js  c++  java
  • Apache Spark-1.0.0浅析(四):资源调度——Stage划分和提交

    回到dagScheduler.runJob,submit提交作业返回waiter,waiter.awaitResult阻塞线程,判断Job是否执行成功

    def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: String,
          allowLocal: Boolean,
          resultHandler: (Int, U) => Unit,
          properties: Properties = null)
      {
        val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
        waiter.awaitResult() match {
          case JobSucceeded => {}
          case JobFailed(exception: Exception) =>
            logInfo("Failed to run " + callSite)
            throw exception
        }
      }

    DAGScheduler中submitJob如下,submitJob中实例化JobWaiter最后返回,通过eventProcessActor发送JobSubmitted消息

    def submitJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: String,
          allowLocal: Boolean,
          resultHandler: (Int, U) => Unit,
          properties: Properties = null): JobWaiter[U] =
      {
        // Check to make sure we are not launching a task on a partition that does not exist.
        val maxPartitions = rdd.partitions.length
        partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
          throw new IllegalArgumentException(
            "Attempting to access a non-existent partition: " + p + ". " +
              "Total number of partitions: " + maxPartitions)
        }
    
        val jobId = nextJobId.getAndIncrement()
        if (partitions.size == 0) {
          return new JobWaiter[U](this, jobId, 0, resultHandler)
        }
    
        assert(partitions.size > 0)
        val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
        val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
        eventProcessActor ! JobSubmitted(
          jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
        waiter
      }

    receive接收JobSubmitted消息,执行dagScheduler.handleJobSubmitted

    /**
       * The main event loop of the DAG scheduler.
       */
      def receive = {
        case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
            listener, properties)
    
        case StageCancelled(stageId) =>
          dagScheduler.handleStageCancellation(stageId)
    
        case JobCancelled(jobId) =>
          dagScheduler.handleJobCancellation(jobId)
    
        case JobGroupCancelled(groupId) =>
          dagScheduler.handleJobGroupCancelled(groupId)
    
        case AllJobsCancelled =>
          dagScheduler.doCancelAllJobs()
    
        case ExecutorAdded(execId, host) =>
          dagScheduler.handleExecutorAdded(execId, host)
    
        case ExecutorLost(execId) =>
          dagScheduler.handleExecutorLost(execId)
    
        case BeginEvent(task, taskInfo) =>
          dagScheduler.handleBeginEvent(task, taskInfo)
    
        case GettingResultEvent(taskInfo) =>
          dagScheduler.handleGetTaskResult(taskInfo)
    
        case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
          dagScheduler.handleTaskCompletion(completion)
    
        case TaskSetFailed(taskSet, reason) =>
          dagScheduler.handleTaskSetFailed(taskSet, reason)
    
        case ResubmitFailedStages =>
          dagScheduler.resubmitFailedStages()
      }

    handleJobSubmitted中,通过newStage创建finalStage,如果finalStage不为空,则以此finalStage继续实例化ActiveJob。判断job是否为short actions,是则本地执行,否则存入jobId和ActiveJob的对应关系,增加activeJobs,然后增加resultStage和Job对应关系,SparkListenerJobStart发送到listenerBus消息队列,执行SubmitStage提交Job,最后还需要submitWaitingStages

    private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          allowLocal: Boolean,
          callSite: String,
          listener: JobListener,
          properties: Properties = null)
      {
        var finalStage: Stage = null
        try {
          // New stage creation may throw an exception if, for example, jobs are run on a
          // HadoopRDD whose underlying HDFS files have been deleted.
          finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
        } catch {
          case e: Exception =>
            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
            listener.jobFailed(e)
            return
        }
        if (finalStage != null) {
          val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
          clearCacheLocs()
          logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
            job.jobId, callSite, partitions.length, allowLocal))
          logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
          logInfo("Parents of final stage: " + finalStage.parents)
          logInfo("Missing parents: " + getMissingParentStages(finalStage))
          if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
            // Compute very short actions like first() or take() with no parent stages locally.
            listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
            runLocally(job)
          } else {
            jobIdToActiveJob(jobId) = job
            activeJobs += job
            resultStageToJob(finalStage) = job
            listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
              properties))
            submitStage(finalStage)
          }
        }
        submitWaitingStages()
      }

    首先,看一下newStage,用以创建新的Stage,注释中说明了如果需要创建shuffle map stages必须用newOrStage

    /**
       * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
       * of a shuffle map stage in newOrUsedStage.  The stage will be associated with the provided
       * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
       * directly.
       */
      private def newStage(
          rdd: RDD[_],
          numTasks: Int,
          shuffleDep: Option[ShuffleDependency[_,_]],
          jobId: Int,
          callSite: Option[String] = None)
        : Stage =
      {
        val id = nextStageId.getAndIncrement()
        val stage =
          new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
        stageIdToStage(id) = stage
        updateJobIdStageIdMaps(jobId, stage)
        stageToInfos(stage) = StageInfo.fromStage(stage)
        stage
      }

    newStage中实例化Stage类,参数中包含了stage的id,numTasks,shuffleDep,parents等,而且stage分为两类:shuffle map stage和result stage

    /**
     * A stage is a set of independent tasks all computing the same function that need to run as part
     * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
     * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
     * DAGScheduler runs these stages in topological order.
     *
     * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
     * another stage, or a result stage, in which case its tasks directly compute the action that
     * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
     * that each output partition is on.
     *
     * Each Stage also has a jobId, identifying the job that first submitted the stage.  When FIFO
     * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
     * faster on failure.
     */
    private[spark] class Stage(
        val id: Int,
        val rdd: RDD[_],
        val numTasks: Int,
        val shuffleDep: Option[ShuffleDependency[_,_]],  // Output shuffle if stage is a map stage
        val parents: List[Stage],
        val jobId: Int,
        callSite: Option[String])
      extends Logging

    在Stage类中,parents的获得是通过getParentStages得到的,如果是shuffleDep,则getShuffleMapStage,否则,前溯上一个RDD

    /**
       * Get or create the list of parent stages for a given RDD. The stages will be assigned the
       * provided jobId if they haven't already been created with a lower jobId.
       */
      private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
        val parents = new HashSet[Stage]
        val visited = new HashSet[RDD[_]]
        def visit(r: RDD[_]) {
          if (!visited(r)) {
            visited += r
            // Kind of ugly: need to register RDDs with the cache here since
            // we can't do it in its constructor because # of partitions is unknown
            for (dep <- r.dependencies) {
              dep match {
                case shufDep: ShuffleDependency[_,_] =>
                  parents += getShuffleMapStage(shufDep, jobId)
                case _ =>
                  visit(dep.rdd)
              }
            }
          }
        }
        visit(rdd)
        parents.toList
      }

    getShuffleMapStage如下,调用NewOrUsedStage来创建shuffle map stage

    /**
       * Get or create a shuffle map stage for the given shuffle dependency's map side.
       * The jobId value passed in will be used if the stage doesn't already exist with
       * a lower jobId (jobId always increases across jobs.)
       */
      private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
        shuffleToMapStage.get(shuffleDep.shuffleId) match {
          case Some(stage) => stage
          case None =>
            val stage =
              newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
            shuffleToMapStage(shuffleDep.shuffleId) = stage
            stage
        }
      }

    如此,将Job依据RDD之间的依赖关系,stage划分完成

    进入submitStage,首先通过stage获得对应jobID,判断jobID是否存在,不存在abortStage,存在判断该stage是否WaitingStage、runningStage、failedStage,首先获得missingParentStages,如果不存在missing依赖,执submitMissingTasks,准备提交依赖tasks,如果存在,则递归调用submitStage,并将该Stage加入到waitingStages,直到初始stage,最终程序执行submitMissingTasks提交tasks

    /** Submits stage, but first recursively submits any missing parents. */
      private def submitStage(stage: Stage) {
        val jobId = activeJobForStage(stage)
        if (jobId.isDefined) {
          logDebug("submitStage(" + stage + ")")
          if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            if (missing == Nil) {
              logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
              submitMissingTasks(stage, jobId.get)
              runningStages += stage
            } else {
              for (parent <- missing) {
                submitStage(parent)
              }
              waitingStages += stage
            }
          }
        } else {
          abortStage(stage, "No active job for stage " + stage.id)
        }
      }

    getMissingParentStages如下,遍历找到所有父Stages

    private def getMissingParentStages(stage: Stage): List[Stage] = {
        val missing = new HashSet[Stage]
        val visited = new HashSet[RDD[_]]
        def visit(rdd: RDD[_]) {
          if (!visited(rdd)) {
            visited += rdd
            if (getCacheLocs(rdd).contains(Nil)) {
              for (dep <- rdd.dependencies) {
                dep match {
                  case shufDep: ShuffleDependency[_,_] =>
                    val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                    if (!mapStage.isAvailable) {
                      missing += mapStage
                    }
                  case narrowDep: NarrowDependency[_] =>
                    visit(narrowDep.rdd)
                }
              }
            }
          }
        }
        visit(stage.rdd)
        missing.toList
      }

    最后,看一下submitWaitingStages,其作用在于检查等待或失败的stages,重新submitStage提交,每个事件循环都会执行

    /**
       * Check for waiting or failed stages which are now eligible for resubmission.
       * Ordinarily run on every iteration of the event loop.
       */
      private def submitWaitingStages() {
        // TODO: We might want to run this less often, when we are sure that something has become
        // runnable that wasn't before.
        logTrace("Checking for newly runnable parent stages")
        logTrace("running: " + runningStages)
        logTrace("waiting: " + waitingStages)
        logTrace("failed: " + failedStages)
        val waitingStagesCopy = waitingStages.toArray
        waitingStages.clear()
        for (stage <- waitingStagesCopy.sortBy(_.jobId)) {
          submitStage(stage)
        }
      }

    至此,Stage划分提交完成。

    END

  • 相关阅读:
    (项目实战三)响应式首页内容介绍
    (项目实战二)响应式轮播图
    (一)简介
    (项目实战一)响应式导航
    HttpURLConnection 411错误解决
    JMeter使用(Linux)
    性能测试工具比较
    Java常见加密算法
    HttpURLConnection发送请求
    Java AES加密案例
  • 原文地址:https://www.cnblogs.com/kevingu/p/4678797.html
Copyright © 2011-2022 走看看