zoukankan      html  css  js  c++  java
  • [Spark源代码剖析] DAGScheduler提交stage

    转载请标明出处:http://blog.csdn.net/bigbigdata/article/details/47310657

    DAGScheduler通过调用submitStage来提交stage。实现例如以下:

      private def submitStage(stage: Stage) {
        val jobId = activeJobForStage(stage)
        if (jobId.isDefined) {
          logDebug("submitStage(" + stage + ")")
          if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            //< 获取该stage未提交的父stages,并按stage id从小到大排序
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            if (missing.isEmpty) {
              logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
              //< 若无未提交的父stage, 则提交该stage相应的tasks
              submitMissingTasks(stage, jobId.get)
            } else {
              //< 若存在未提交的父stage, 依次提交全部父stage (若父stage也存在未提交的父stage, 则提交之, 依次类推); 并把该stage增加到等待stage队列中
              for (parent <- missing) {
                submitStage(parent)
              }
              waitingStages += stage
            }
          }
        } else {
          abortStage(stage, "No active job for stage " + stage.id)
        }
      }

    submitStage先调用getMissingParentStages来获取參数stageX(这里为了区分,取名为stageX)是否有未提交的父stages,若有。则依次递归(按stage id从小到大排列。也就是stage是从后往前提交的)提交父stages,并将stageX增加到waitingStages: HashSet[Stage]中。对于要依次提交的父stage。也是如此。

    getMissingParentStagesDAGScheduler划分stage中介绍的getParentStages有点像,但不同的是不再须要划分stage,并对每一个stage的状态做了推断,源代码及凝视例如以下:

    //< 以參数stage为起点,向前遍历全部stage,推断stage是否为未提交,若使则增加missing中
      private def getMissingParentStages(stage: Stage): List[Stage] = {
        //< 未提交的stage
        val missing = new HashSet[Stage]
        //< 存储已经被訪问到得RDD
        val visited = new HashSet[RDD[_]]
    
        val waitingForVisit = new Stack[RDD[_]]
        def visit(rdd: RDD[_]) {
          if (!visited(rdd)) {
            visited += rdd
            if (getCacheLocs(rdd).contains(Nil)) {
              for (dep <- rdd.dependencies) {
                dep match {
                  //< 若为宽依赖。生成新的stage
                  case shufDep: ShuffleDependency[_, _, _] =>
                    //< 这里调用getShuffleMapStage不像在getParentStages时须要划分stage。而是直接依据shufDep.shuffleId获取相应的ShuffleMapStage
                    val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                    if (!mapStage.isAvailable) {
                      // 若stage得状态为available则为未提交stage
                      missing += mapStage
                    }
                  //< 若为窄依赖,那就属于同一个stage。并将依赖的RDD放入waitingForVisit中,以可以在以下的while中继续向上visit。直至遍历了整个DAG图
                  case narrowDep: NarrowDependency[_] =>
                    waitingForVisit.push(narrowDep.rdd)
                }
              }
            }
          }
        }
        waitingForVisit.push(stage.rdd)
        while (waitingForVisit.nonEmpty) {
          visit(waitingForVisit.pop())
        }
        missing.toList
      }

    上面提到,若stageX存在未提交的父stages。则先提交父stages;那么,假设stageX没有未提交的父stage呢(比方。包括从HDFS读取数据生成HadoopRDD的那个stage是没有父stage的)?

    这时会调用submitMissingTasks(stage, jobId.get),參数就是stageX及其相应的jobId.get。这个函数便是我们时常在其它文章或书籍中看到的将stage与taskSet相应起来,然后DAGScheduler将taskSet提交给TaskScheduler去运行的实施者。

    这个函数的实现比較长。以下分段说明。

    Step1: 得到RDD中须要计算的partition

    对于Shuffle类型的stage,须要推断stage中是否缓存了该结果;对于Result类型的Final Stage。则推断计算Job中该partition是否已经计算完毕。

    这么做(没有直接提交全部tasks)的原因是,stage中某个task运行失败其它运行成功的时候就须要找出这个失败的task相应要计算的partition而不是要计算全部partition

      private def submitMissingTasks(stage: Stage, jobId: Int) {
        stage.pendingTasks.clear()
    
    
        //< 首先得到RDD中须要计算的partition
        //< 对于Shuffle类型的stage,须要推断stage中是否缓存了该结果;
        //< 对于Result类型的Final Stage,则推断计算Job中该partition是否已经计算完毕
        //< 这么做的原因是。stage中某个task运行失败其它运行成功地时候就须要找出这个失败的task相应要计算的partition而不是要计算全部partition
        val partitionsToCompute: Seq[Int] = {
          stage match {
            case stage: ShuffleMapStage =>
              (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)
            case stage: ResultStage =>
              val job = stage.resultOfJob.get
              (0 until job.numPartitions).filter(id => !job.finished(id))
          }
        }

    Step2: 序列化task的binary

    Executor可以通过广播变量得到它。每一个task运行的时候首先会反序列化

    var taskBinary: Broadcast[Array[Byte]] = null
        try {
          // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
          // For ResultTask, serialize and broadcast (rdd, func).
          val taskBinaryBytes: Array[Byte] = stage match {
            case stage: ShuffleMapStage =>
              //< 对于ShuffleMapTask,将rdd及其依赖关系序列化。在Executor运行task之前会反序列化
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
              //< 对于ResultTask,对rdd及要在每一个partition上运行的func
            case stage: ResultStage =>
              closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
          }
    
          //< 将序列化好的信息广播给全部的executor
          taskBinary = sc.broadcast(taskBinaryBytes)
        } catch {
          // In the case of a failure during serialization, abort the stage.
          case e: NotSerializableException =>
            abortStage(stage, "Task not serializable: " + e.toString)
            runningStages -= stage
    
            // Abort execution
            return
          case NonFatal(e) =>
            abortStage(stage, s"Task serialization failed: $e
    ${e.getStackTraceString}")
            runningStages -= stage
            return
        }

    Step3: 为每一个须要计算的partiton生成一个task

    ShuffleMapStage相应的task全是ShuffleMapTask; ResultStage相应的全是ResultTask。task继承Serializable,要确保task是可序列化的。

    val tasks: Seq[Task[_]] = stage match {
          case stage: ShuffleMapStage =>
            partitionsToCompute.map { id =>
              val locs = getPreferredLocs(stage.rdd, id)
              //< RDD相应的partition
              val part = stage.rdd.partitions(id)
              new ShuffleMapTask(stage.id, taskBinary, part, locs)
            }
    
          case stage: ResultStage =>
            val job = stage.resultOfJob.get
            //< id为输出分区索引,表示reducerID
            partitionsToCompute.map { id =>
              val p: Int = job.partitions(id)
              val part = stage.rdd.partitions(p)
              val locs = getPreferredLocs(stage.rdd, p)
              new ResultTask(stage.id, taskBinary, part, locs, id)
            }
        }

    Step4: 提交tasks

    先用tasks来初始化一个TaskSet对象。再调用TaskScheduler.submitTasks提交

    stage.pendingTasks ++= tasks
          logDebug("New pending tasks: " + stage.pendingTasks)
          //< 提交TaskSet至TaskScheduler
          taskScheduler.submitTasks(
            new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
          //< 记录stage提交task的时间
          stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
        } else {

    以上,介绍了提交stage和提交tasks的实现。本文若有纰漏,请批评指正。

  • 相关阅读:
    5.19 省选模拟赛 T1 小B的棋盘 双指针 性质
    5.15 省选模拟赛 容斥 生成函数 dp
    5.15 省选模拟赛 T1 点分治 FFT
    5.15 牛客挑战赛40 B 小V的序列 关于随机均摊分析 二进制
    luogu P4929 【模板】舞蹈链 DLX
    CF 878E Numbers on the blackboard 并查集 离线 贪心
    5.10 省选模拟赛 拍卖 博弈 dp
    5.12 省选模拟赛 T2 贪心 dp 搜索 差分
    5.10 省选模拟赛 tree 树形dp 逆元
    luogu P6088 [JSOI2015]字符串树 可持久化trie 线段树合并 树链剖分 trie树
  • 原文地址:https://www.cnblogs.com/cynchanpin/p/7019668.html
Copyright © 2011-2022 走看看