zoukankan      html  css  js  c++  java
  • [Spark源代码剖析] DAGScheduler提交stage

    转载请标明出处:http://blog.csdn.net/bigbigdata/article/details/47310657

    DAGScheduler通过调用submitStage来提交stage。实现例如以下:

      private def submitStage(stage: Stage) {
        val jobId = activeJobForStage(stage)
        if (jobId.isDefined) {
          logDebug("submitStage(" + stage + ")")
          if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            //< 获取该stage未提交的父stages,并按stage id从小到大排序
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            if (missing.isEmpty) {
              logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
              //< 若无未提交的父stage, 则提交该stage相应的tasks
              submitMissingTasks(stage, jobId.get)
            } else {
              //< 若存在未提交的父stage, 依次提交全部父stage (若父stage也存在未提交的父stage, 则提交之, 依次类推); 并把该stage增加到等待stage队列中
              for (parent <- missing) {
                submitStage(parent)
              }
              waitingStages += stage
            }
          }
        } else {
          abortStage(stage, "No active job for stage " + stage.id)
        }
      }

    submitStage先调用getMissingParentStages来获取參数stageX(这里为了区分,取名为stageX)是否有未提交的父stages,若有。则依次递归(按stage id从小到大排列。也就是stage是从后往前提交的)提交父stages,并将stageX增加到waitingStages: HashSet[Stage]中。对于要依次提交的父stage。也是如此。

    getMissingParentStagesDAGScheduler划分stage中介绍的getParentStages有点像,但不同的是不再须要划分stage,并对每一个stage的状态做了推断,源代码及凝视例如以下:

    //< 以參数stage为起点,向前遍历全部stage,推断stage是否为未提交,若使则增加missing中
      private def getMissingParentStages(stage: Stage): List[Stage] = {
        //< 未提交的stage
        val missing = new HashSet[Stage]
        //< 存储已经被訪问到得RDD
        val visited = new HashSet[RDD[_]]
    
        val waitingForVisit = new Stack[RDD[_]]
        def visit(rdd: RDD[_]) {
          if (!visited(rdd)) {
            visited += rdd
            if (getCacheLocs(rdd).contains(Nil)) {
              for (dep <- rdd.dependencies) {
                dep match {
                  //< 若为宽依赖。生成新的stage
                  case shufDep: ShuffleDependency[_, _, _] =>
                    //< 这里调用getShuffleMapStage不像在getParentStages时须要划分stage。而是直接依据shufDep.shuffleId获取相应的ShuffleMapStage
                    val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                    if (!mapStage.isAvailable) {
                      // 若stage得状态为available则为未提交stage
                      missing += mapStage
                    }
                  //< 若为窄依赖,那就属于同一个stage。并将依赖的RDD放入waitingForVisit中,以可以在以下的while中继续向上visit。直至遍历了整个DAG图
                  case narrowDep: NarrowDependency[_] =>
                    waitingForVisit.push(narrowDep.rdd)
                }
              }
            }
          }
        }
        waitingForVisit.push(stage.rdd)
        while (waitingForVisit.nonEmpty) {
          visit(waitingForVisit.pop())
        }
        missing.toList
      }

    上面提到,若stageX存在未提交的父stages。则先提交父stages;那么,假设stageX没有未提交的父stage呢(比方。包括从HDFS读取数据生成HadoopRDD的那个stage是没有父stage的)?

    这时会调用submitMissingTasks(stage, jobId.get),參数就是stageX及其相应的jobId.get。这个函数便是我们时常在其它文章或书籍中看到的将stage与taskSet相应起来,然后DAGScheduler将taskSet提交给TaskScheduler去运行的实施者。

    这个函数的实现比較长。以下分段说明。

    Step1: 得到RDD中须要计算的partition

    对于Shuffle类型的stage,须要推断stage中是否缓存了该结果;对于Result类型的Final Stage。则推断计算Job中该partition是否已经计算完毕。

    这么做(没有直接提交全部tasks)的原因是,stage中某个task运行失败其它运行成功的时候就须要找出这个失败的task相应要计算的partition而不是要计算全部partition

      private def submitMissingTasks(stage: Stage, jobId: Int) {
        stage.pendingTasks.clear()
    
    
        //< 首先得到RDD中须要计算的partition
        //< 对于Shuffle类型的stage,须要推断stage中是否缓存了该结果;
        //< 对于Result类型的Final Stage,则推断计算Job中该partition是否已经计算完毕
        //< 这么做的原因是。stage中某个task运行失败其它运行成功地时候就须要找出这个失败的task相应要计算的partition而不是要计算全部partition
        val partitionsToCompute: Seq[Int] = {
          stage match {
            case stage: ShuffleMapStage =>
              (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)
            case stage: ResultStage =>
              val job = stage.resultOfJob.get
              (0 until job.numPartitions).filter(id => !job.finished(id))
          }
        }

    Step2: 序列化task的binary

    Executor可以通过广播变量得到它。每一个task运行的时候首先会反序列化

    var taskBinary: Broadcast[Array[Byte]] = null
        try {
          // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
          // For ResultTask, serialize and broadcast (rdd, func).
          val taskBinaryBytes: Array[Byte] = stage match {
            case stage: ShuffleMapStage =>
              //< 对于ShuffleMapTask,将rdd及其依赖关系序列化。在Executor运行task之前会反序列化
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
              //< 对于ResultTask,对rdd及要在每一个partition上运行的func
            case stage: ResultStage =>
              closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
          }
    
          //< 将序列化好的信息广播给全部的executor
          taskBinary = sc.broadcast(taskBinaryBytes)
        } catch {
          // In the case of a failure during serialization, abort the stage.
          case e: NotSerializableException =>
            abortStage(stage, "Task not serializable: " + e.toString)
            runningStages -= stage
    
            // Abort execution
            return
          case NonFatal(e) =>
            abortStage(stage, s"Task serialization failed: $e
    ${e.getStackTraceString}")
            runningStages -= stage
            return
        }

    Step3: 为每一个须要计算的partiton生成一个task

    ShuffleMapStage相应的task全是ShuffleMapTask; ResultStage相应的全是ResultTask。task继承Serializable,要确保task是可序列化的。

    val tasks: Seq[Task[_]] = stage match {
          case stage: ShuffleMapStage =>
            partitionsToCompute.map { id =>
              val locs = getPreferredLocs(stage.rdd, id)
              //< RDD相应的partition
              val part = stage.rdd.partitions(id)
              new ShuffleMapTask(stage.id, taskBinary, part, locs)
            }
    
          case stage: ResultStage =>
            val job = stage.resultOfJob.get
            //< id为输出分区索引,表示reducerID
            partitionsToCompute.map { id =>
              val p: Int = job.partitions(id)
              val part = stage.rdd.partitions(p)
              val locs = getPreferredLocs(stage.rdd, p)
              new ResultTask(stage.id, taskBinary, part, locs, id)
            }
        }

    Step4: 提交tasks

    先用tasks来初始化一个TaskSet对象。再调用TaskScheduler.submitTasks提交

    stage.pendingTasks ++= tasks
          logDebug("New pending tasks: " + stage.pendingTasks)
          //< 提交TaskSet至TaskScheduler
          taskScheduler.submitTasks(
            new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
          //< 记录stage提交task的时间
          stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
        } else {

    以上,介绍了提交stage和提交tasks的实现。本文若有纰漏,请批评指正。

  • 相关阅读:
    在DataList控件中删除数据记录
    java中进行二进制,八进制,十六进制,十进制间进行相互转换
    Java中重载重写
    WCF 第七章 寄宿 在Windows 进程激活服务中寄宿服务
    .NET 中的十进制浮点类型(译文)
    .NET 中的二进制浮点类型(译文)
    WCF 第六章 序列化和编码 总结
    WCF 第六章 序列化和编码 为自定义序列化使用XmlSerializer
    WCF 第七章 寄宿 在IIS7中寄宿服务
    WCF 第七章 寄宿
  • 原文地址:https://www.cnblogs.com/cynchanpin/p/7019668.html
Copyright © 2011-2022 走看看