zoukankan      html  css  js  c++  java
  • spark作业运行过程之--DAGScheduler

    DAGScheduler--stage划分和创建以及stage的提交

    本篇,我会从一次spark作业的运行为切入点,将spark运行过程中涉及到的各个步骤,包括DAG图的划分,任务集的创建,资源分配,任务序列化,任务分发到各个executor,任务执行,任务结果回传driver等等各个环节串联起来,以整个任务运行的调用链为线索,将spark-core中的各个基础设施联系起来,这样我们就能对spark的各个基础设施模块的作用有一个整体的认识,然后有了对spark整体框架的印象,再对其中的各个模块各个击破,分别深入研究,通过这种循序渐进的方式,最后才能对spark-core有一个比较深入而全面的掌握。当然,这篇文章的主要目的是理清spark作业的整个运行流程。

    入口:SparkContext.runJob

    我们知道spark中的作业执行时懒执行的,懒执行最大的好处是可以把一些算子向流水线一样chain在一起,从而形成流式的计算模式,个人认为这个特点也是spark比mapreduce性能高的一种重要原因,至于后来的一些基于mapreduce优化的框架如tez, mahout等实际上一个重要的优化手段也是把一些能够流水线式执行的算子chain在一起,避免中间多次落盘。扯远了,我们回到这个方法,通过方法注释可以看出来,这个方法是spark中所有行动算子的入口。

    /**
    * Run a function on a given set of partitions in an RDD and pass the results to the given
    * handler function. This is the main entry point for all actions in Spark.
    *
    * @param rdd           target RDD to run tasks on
    * @param func          a function to run on each partition of the RDD
    * @param partitions    set of partitions to run on; some jobs may not want to compute on all
    *                      partitions of the target RDD, e.g. for operations like `first()`
    * @param resultHandler callback to pass each result to
    */
    

    def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    // 调用DAGScheduler的runJob方法
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    // 更新控制台打印的进度条信息
    progressBar.foreach(_.finishAll())
    // 处理RDD的checkpoint
    rdd.doCheckpoint()
    }

    • 首先清除闭包的一些不必要的引用,这一步主要是为了方便序列化,因为一些不必要的引用可能引用了不可序列化的对象,这会导致函数不可序列化。很多时候,用户写的代码并不是很靠谱,spark考虑到这一点,所以这也是为了尽量减少用户的开发难度。
    • 调用DAGScheduler执行提交任务的逻辑

    这个方法很简单,不必赘述。

    DAGScheduler.submitJob

    经过一些调用,最终会调用到这个方法。

    def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    // 检查是否有非法的partition
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }
    
    // nextJobId每次自增1
    val jobId = nextJobId.getAndIncrement()
    // 如果要运行的分区数为0,那么就没必要运行,直接返回成功就行了
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }
    
    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    // 向DAG的事件处理器投递一个任务提交的事件
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
    

    }

    这个方法的逻辑也很简单。首先做一些检查,然后向DAG调度器内部的一个事件处理器投递一个作业提交的事件。DAGScheduler自己有一个事件处理器,是很常规的事件循环处理,使用单线程的方法循环处理事件队列中的事件,逻辑很简单,所以这里不再展开。投递任务提交任务后,最终会调用DAGScheduler的handleJobSubmitted方法。我们可以看到,DAGScheduler中还有很多其他类似的处理方法,对应了不同的事件类型,事件分发逻辑在DAGSchedulerEventProcessLoop.doOnReceive方法中,不再展开。我们仍然回到作业运行这条主线上来,继续看handleJobSubmitted。

    handleJobSubmitted

    private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      // 创建最后一个stage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    
    // 设置活跃的任务
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))
    
    // 更新一些簿记量
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    // 向事件总线中投递一个事件
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    // 提交最后一个stage
    submitStage(finalStage)
    

    }

    • 涉及到的一些簿记量的更新就不再展开了。

    • 创建最后一个stage,这一步其实会根据shuffle依赖关系对整个RDD的计算关系图(DAG)进行划分,形成不同的stage, 最后一步行动算子会创建ResultStage, 然后提交最后一个stage。

    接下来的小结我们重点分析一下DAG图的划分以及stage的创建,这也是DAGScheduler的主要功能。

    stage的划分和创建

    DAGScheduler.createResultStage

    private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    // 首先创建依赖的父stage
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    // 有了父stage,就可以创建最后一个stage了
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
    }
    

    重点在于创建父stage。

    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
    }
    

    getShuffleDependencies

    这个方法用一个栈实现对rdd的深度优先遍历,可以看到在找到shuffle依赖时就记录下来,并且不再继续寻找shuffle依赖前面的依赖。
    所以这个方法只会在整个DAG图上找到这个rdd的上一级所有的shuffle依赖,而不会跨越多级shuffle依赖。
    private[scheduler] def getShuffleDependencies(
    rdd: RDD[]): HashSet[ShuffleDependency[, , ]] = {
    val parents = new HashSet[ShuffleDependency[
    , , ]]
    val visited = new HashSet[RDD[
    ]]
    // 用栈实现深度优先遍历
    val waitingForVisit = new ArrayStack[RDD[
    ]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.pop()
    if (!visited(toVisit)) {
    visited += toVisit
    toVisit.dependencies.foreach {
    // 如果是shuffle依赖,记录下来,并没有继续向上寻找shuffle依赖的依赖
    case shuffleDep: ShuffleDependency[
    , _, _] =>
    parents += shuffleDep
    case dependency =>
    // 对于窄依赖,
    waitingForVisit.push(dependency.rdd)
    }
    }
    }
    parents
    }

    getOrCreateShuffleMapStage

    我们继续看另一个重要的方法,创建shuffle的stage。

    private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage
    
      case None =>
        // Create stages for all missing ancestor shuffle dependencies.
        // 获取所有还没有创建stage的祖先shuffle依赖
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // Finally, create a stage for the given shuffle dependency.
        createShuffleMapStage(shuffleDep, firstJobId)
    }
    }
    

    可以看到,这个方法会将所有还没创建stage的祖先shuffle依赖全部创建出来。
    我们看一下,创建ShuffleMapStage的具体过程:

    def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    // 这里可以看出,一个ShuffleStage的rdd是shuffle输入侧的rdd
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.length
    // 这里调用了获取父Stage的方法,实际上这几个方法会形成递归调用
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    // 一个Stage就是对一些引用的封装,其中比较重要的是mapOutputTracker
    val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
    
    // 更新一些簿记量
    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)
    
    // 在map输出追踪器中注册这个shuffle
    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
    }
    

    其中比较关键的步骤有:

    • 创建所有的父stage
    • 封装一个ShuffleMapStage对象,比较重要的是mapOutputTracker对象的引用。这个对象主要作用是追踪shuffle过程中map阶段的输出的位置信息,后面我们会讲到map输出是通过shuffleManager对map输出数据进行分区和排序处理并序列化,然后blockManager进行存储,而map输出的位置信息是通过blockId标识,并且都会传回driver,在driver中有一个MapOutputTrackerMaster组件专门负责维护所有stage的所有map任务的输出的位置信息。
    • 在mpOutputTrackerMaster注册新创建的stage,其实就是在映射结构里加一条数据

    小结

    对于stage的创建过程做一个小结:这里涉及到几个方法形成的递归调用;在遍历rdd依赖的过程中按深度优先遍历,每遇到一个shuffle依赖就创建一个stage,所有上游的stage创建完成后,最后再创建一个ResultStage。

    stage提交

    接下来,我们看一下在作业运行的过程中DAGScheduler负责的最后一步:stage提交

    submitStage

    首先是submitStage方法。

    private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
    }
    

    这个方法比较简单:

    • 首先是提交还没有运行过多父stage,把自身放到等待队列中

    • 如果父stage都已经运行完成了,或者不存在父stage,那么提交当前stage,即调用submitMissingTasks

    submitMissingTasks

    private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    
    // First figure out the indexes of partition ids to compute.
    // 首先是找出还没有计算的partition有哪些
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    
    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    val properties = jobIdToActiveJob(jobId).properties
    
    // 更新簿记量
    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    // outputCommitCoordinator内部簿记量的更新
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    
    // 找出每个Task的偏向位置,对于一般的shuffle stage,通过mapOutputTracker来计算Task的偏向位置
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e
    ${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
    
    // 更新stage最近一次的尝试的信息
    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
    
    // If there are tasks to execute, record the submission time of the stage. Otherwise,
    // post the even without the submission time, which indicates that this stage was
    // skipped.
    if (partitionsToCompute.nonEmpty) {
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
    // 向事件总线投递一个事件
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    
    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    // 对任务进行序列化,这里对RDD和ShuffleDependency对象进行序列化
    var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] = stage match {
        case stage: ShuffleMapStage =>
          JavaUtils.bufferToArray(
            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
        case stage: ResultStage =>
          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
      }
    
      // RDD和ShuffleDependency的序列化数据是通过广播变量传输到executor端的
      // 广播变量实际上也是先将数据通过blockManager写入内存或磁盘,然后executor端通过rpc远程拉取数据
      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
        runningStages -= stage
    
        // Abort execution
        return
      case NonFatal(e) =>
        abortStage(stage, s"Task serialization failed: $e
    ${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
    
    val tasks: Seq[Task[_]] = try {
      // 对任务运行的统计量累加器对象的序列化
      // 累加器对象序列化有一个比较有意思的地方,在readObject方法中,可以看一下
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          // 每个分区创建一个Task
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }
    
        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e
    ${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
    
    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      // 从这里DAGScheduler把接力棒交给了Task调度器
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)
    
      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
      }
      logDebug(debugString)
    
      submitWaitingChildStages(stage)
    }
    }
    

    这个方法比较长,但是应该说是在DAG调度器提交作业的过程中最重要的方法了。主要做的事情其实就是根据要提交的stage创建一个任务集,每个partition创建一个Task,所有要计算的Task形成一个任务集。

    • 更新一些簿记量
    • 找出每个Task的偏向位置,对于一般的shuffle stage,通过mapOutputTracker来计算Task的偏向位置
    • 向事件总线投递一个stage提交的事件
    • 对RDD和ShuffleDependency或者ResultStage的计算函数func进行序列化,以用于传输
    • 序列化任务运行统计量的累加器对象,加器对象序列化有一个比较有意思的地方,在readObject方法中,可以看一下
    • 对每个要计算的分区创建一个Task,根据stage类型分为ShuffleMapTask和ResultTask两种
    • 最后调用TaskScheduler的方法提交任务

    至此,DAGScheduler完成了他的使命,成功将接力棒交给了TaskScheduler,接下来就是TaskScheduler的表演了。
    下一篇,我们会继续分析TaskSchedulerImpl这个类对于任务提交所做的一些工作,主要是资源分配的工作,需要考虑本地性,黑名单,均衡性等问题。

    遗留的问题

    • 如何计算任务的偏向位置?
    • outputCommitCoordinator的作用?
    • 广播变量的底层机制是什么?这个后面会专门分析广播变量,其实就是利用块管理器blockManager(块管理器应该是最重要的基础设施了)
  • 相关阅读:
    how to uninstall devkit
    asp.net中bin目录下的 dll.refresh文件
    查找2个分支的共同父节点
    Three ways to do WCF instance management
    WCF Concurrency (Single, Multiple, and Reentrant) and Throttling
    检查string是否为double
    How to hide TabPage from TabControl
    获取当前系统中的时区
    git svn cygwin_exception
    lodoop打印控制具体解释
  • 原文地址:https://www.cnblogs.com/zhuge134/p/10961742.html
Copyright © 2011-2022 走看看