zoukankan      html  css  js  c++  java
  • Spark分析之DAGScheduler

    DAGScheduler概述:是一个面向Stage层面的调度器;

    主要入参有:

    dagScheduler.runJob(rddcleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)

    rdd: final RDD;

    cleanedFunc: 计算每个分区的函数;

    resultHander: 结果侦听器;

    主要功能如下:

    1、接收用户提交的job;

    2、将job根据类型划分为不同的stage,记录哪些RDD、Stage被物化,并在每一个stage内产生一系列的task,并封装成TaskSet;

    3、决定每个Task的最佳位置(任务在数据所在的节点上运行),并结合当前的缓存情况;将TaskSet提交给TaskScheduler;

    4、重新提交Shuffle输出丢失的Stage给TaskScheduler;

      注:一个Stage内部的错误不是由shuffle输出丢失造成的,DAGScheduler是不管的,由TaskScheduler负责尝试重新提交task执行;

    以如下示例描述Job提交过程:

    val sc = new SparkContext("local[2]", "WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
    val textFile = sc.textFile("xxx")
    val result = textFile.flatMap(line => line.split("	")).map(word => (word, 1)).reduceByKey(_ + _)
    result.collect

    RDD.collect

      ==>sc.runJob                  #####至此完成了将RDD提交DAGScheduler#####

        val results = new Array[U](partitions.size) //result存放的是返回值,数组大小为最后一个RDD的partition的个数

        ==>dagScheduler.runJob(rdd, func, partitions, resultHandler......)     //DAGScheduler的输入:RDD and partitions to compute

          ==>dagScheduler.submitJob

            ==>eventProcessActor ! JobSubmitted

    def receive = {
        case JobSubmitted(jobId, rdd, func, partitions, allowLocal...) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal...)
    }
    
    
    //完成job到stage的转换,生成finalStage并提交
    private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          allowLocal: Boolean...){
         //注意:该RDD是final RDD,而不是一系列的RDD,用finalRDD来创建finalStage
         //newStage操作对应会生成新的result stage或者shuffle stage:内部有一个isShuffleMap变量来标识该stage是shuffle or result
         var finalStage: Stage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
     
        //使用finalStage来构建job
        val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
    
        //对于简单的job,没有依赖关系并且只有一个partition,该类job会使用local thread处理而并非提交到TaskScheduler上处理
        if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
            runLocally(job)
        } else {
            submitStage(finalStage) //提交finalStage
        }
    }

    handleJobSubmitted方法完成了job到stage的转换,生成finalStage;每个job都有一个finalStage。

    newStage()方法分析:根据finalRDD生成finalStage

    private def newStage(
          rdd: RDD[_],  numTasks: Int,     //task个数就是partitions个数
          shuffleDep: Option[ShuffleDependency[_,_]],
          jobId: Int, callSite: Option[String] = None) : Stage = {
        val id = nextStageId.getAndIncrement() 
        val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
       ......
    }
    
    
    private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
        val parents = new HashSet[Stage]
        val visited = new HashSet[RDD[_]]
        def visit(r: RDD[_]) {
          if (!visited(r)) {
            visited += r
            for (dep <- r.dependencies) {
              dep match {
                case shufDep: ShuffleDependency[_,_] =>
                  parents += getShuffleMapStage(shufDep, jobId)
                case _ =>
                  visit(dep.rdd)
              }
            }
          }
        }
        visit(rdd)
        parents.toList
    }
    
    private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
        shuffleToMapStage.get(shuffleDep.shuffleId) match {
          case Some(stage) => stage
          case None =>
            val stage =
              newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
            shuffleToMapStage(shuffleDep.shuffleId) = stage
            stage
        }

    newStage()后产生的finalStage中已经包含了该stage的所有依赖的父Stage;

    通过getParentStages()方法构建该stage的依赖关系;反向visit RDD DAG图,遇到窄依赖就将依赖的RDD加入到stage,遇到宽依赖就切开并递归宽依赖的stage;

    生成stage实例,stage的id通过nextStageId的值加一得到,task的个数就是partitions的个数;

    有两种类型的Stage:ShuffleStage和ResultStage;

    Stage内部有一个isShuffleMap变量标识该Stage是shuffle还是result类型;

    Spark对stage的划分是按照宽依赖来进行区分的:根据RDD的依赖关系,如果遇到宽依赖则创建ShuffleStage;

    submitStage()方法分析:计算stage之间的依赖关系(Stage DAG)并对依赖关系进行处理

    private def submitStage(stage: Stage) { 
     if (!waiting(stage) && !running(stage) && !failed(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)  //根据final stage发现是否有parent stage
      if (missing == Nil) { // 如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task
       submitMissingTasks(stage, jobId.get)
       running += stage //设置当前的stage为running,因为当前的stage没有未处理完的依赖的stage
      } else { //如果有parent stage,需要先submit parent, 因为stage之间需要顺序执行
       for (parent <- missing) {
        submitStage(parent)
       }
       waiting += stage   //当前stage放入到waiting列表中,表示该stage需要等待parent先执行完成
      }
     }
    }
    
    //根据final stage的parents找出所有的parent stage
    private def getMissingParentStages(stage: Stage): List[Stage] = { 
     ......
     dep match {
      //如果是ShuffleDependency,则新建一个shuffle map stage,且该stage是可用的话则加入missing中
      case shufDep: ShuffleDependency[_,_] =>  //ShuffleDependecy
       val mapStage = getShuffleMapStage(shufDep, stage.jobId)
       if (!mapStage.isAvailable) {
        missing += mapStage
       }
      case narrowDep: NarrowDependency[_] =>  //NarrowDependecy
       visit(narrowDep.rdd)
     }
    }

    getMissParentStages(stage)处理步骤:

    1、根据该stage得到该stage的parent,也就是RDD的依赖关系,生成parentStage是通过RDD的dependencies;

    2、如果依赖关系是宽依赖,则生成一个mapStage来作为finalStage的parent;也就是说对于需要shuffle操作的job,会生成mapStage和finalStage进行处理

    3、如果依赖关系是窄依赖,不会生成新的stage。也就是说对于不需要shuffle的job只需要一个finalStage;

    注意:getMissParentStages(stage)得到的结果集是按照stageid的降序排列的

    submitStage()处理步骤:

    1、计算该stage的getMissParentStages(),如果当前stage没有任何依赖或者所有的依赖都已执行完,则提交该stage;

    2、如果发现该stage有依赖的stage未执行,则先执行完所有依赖的父stage(根据getMissParentStages()方法得到的结果集降序来执行stage);

    submitMissingTasks()方法分析:把stage根据parition拆分成task(决定每个Task的最佳位置)生成TaskSet,并提交到TaskScheduler

    private def submitMissingTasks(stage: Stage, jobId: Int) {
     //首先根据stage所依赖的RDD的partition的分布,会产生出与partition数量相等的task
     var tasks = ArrayBuffer[Task[_]]()
    
     //对于finalStage或是mapStage会产生不同的task。
     //检查该stage时是否ShuffleMap,如果是则生成ShuffleMapTask
     if (stage.isShuffleMap) { //mapStage:表示还有其他stage依赖此stage
      for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
       //task根据partition的locality进行分布
       val locs = getPreferredLocs(stage.rdd, p)
       tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
      }
     } else { //finalStage:该类型stage直接输出结果生成ResultTask
      val job = resultStageToJob(stage)
      for (id <- 0 until job.numPartitions if !job.finished(id)) {
       val partition = job.partitions(id)
       val locs = getPreferredLocs(stage.rdd, partition)
       //由于是ResultTask,因此需要传入定义的func,也就是如果处理结果返回
       tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
      }
     }
     //向TaskSchuduler提交任务,以stage为单位,一个stage对应一个TaskSet
     taskSched.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
    }

     submitMissingTask()方法的处理步骤:

    1、通过stage.isShuffleMap来决定生成的是ShuffleMapTask还是ResultTask;

    2、如果是ShuffleMapTask则根据stage所依赖的RDD的partition分布,产生和partition数量相同的task,这些task根据partition的locality进行分布’

    3、把stage对应生成所有的task封装到一个TaskSet中,提交给TaskScheduler的submitTasks()方法进行调度;

    重新提交shuffle输出丢失的stage

    case ResubmitFailedStages =>
          dagScheduler.resubmitFailedStages()
    
    private[scheduler] def resubmitFailedStages() {
        if (failedStages.size > 0) {
          logInfo("Resubmitting failed stages")
          clearCacheLocs()
          val failedStagesCopy = failedStages.toArray
          failedStages.clear()
          for (stage <- failedStagesCopy.sortBy(_.jobId)) {
            submitStage(stage)
          }
        }
        submitWaitingStages()
    }

    ####至此完成了DAGScheduler提交TaskSet到TaskSchuduler#####

    Job的生成:

    一旦driver程序中出现action,就会生成一个job,比如:count等,向DAGScheduler提交job;如果driver程序后面还有action,那么其他action也会对应生成相应的job;

    所以:driver有多少个action就会生成多少个job。为什么spark将driver程序称为application而不是job的原因,估计就是这吧。

    每一个job可能会包含多个stage,最后一个stage产生result。在提交job过程中,DAGScheduler会首先划分stage,然后先提交无parent stage的stages,并在提交过程中计算该stage的task数目和类型,并提交具体的task;无parent stage的stage提交完后,依赖该stage的stage才能提交。

  • 相关阅读:
    【Qt】splitter
    android 使用AsyncHttpClient框架上传文件以及使用HttpURLConnection下载文件
    poj 1879 Truck History
    在LinuxMint中对firefox进行手动安装flash插件
    王立平--include在Android中的应用
    【IPC进程间通讯之二】管道Pipe
    我组织类时无意间遵守了依赖倒置原则
    百度2016笔试(算法春招实习)
    制作翻转效果动画
    vim常用命令行备忘总结
  • 原文地址:https://www.cnblogs.com/luogankun/p/3826245.html
Copyright © 2011-2022 走看看