zoukankan      html  css  js  c++  java
  • 17、stage划分算法原理及DAGScheduler源码分析

    一、stage划分算法原理

    1、图解

    image

    Job->Stage->Task
    
    开发完一个应用以后,把这个应用提交到Spark集群,这个应用叫Application。这个应用里面开发了很多代码,这些代码里面凡是遇到一个action操作,就会产生一个job任务。
    
    一个Application有一个或多个job任务。job任务被DAGScheduler划分为不同stage去执行,stage是一组Task任务。Task分别计算每个分区partition上的数据,
    Task数量=分区partition数量。
    
    stage划分原理:
    DAGScheduler的stage划分算法总结:会从触发action操作的那个rdd开始往前倒推,首先会为最后一个rdd创建一个stage,然后往前倒推的时候,如果发现对某个rdd是宽依赖,
    那么就会将宽依赖的那个rdd创建一个新的stage,那个rdd就是新的stage的最后一个rdd,然后依次类,继续往前倒推,根据窄依赖,或者宽依赖,进行stage的划分,直到所有
    的rdd全部遍历完为止;

    总结:遇到一个宽依赖就分一个stage

    二、DAGScheduler源码分析

    1、

    ###org.apache.spark/SparkContext.scala
    
    // 调用SparkContext,之前初始化时创建的dagScheduler的runJob()方法
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
          resultHandler, localProperties.get)
    
    
    
    
    ###org.apache.spark.scheduler/DAGScheduler.scala
    
    /**
        * DAGScheduler的job调度的核心入口
        */
      private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          allowLocal: Boolean,
          callSite: CallSite,
          listener: JobListener,
          properties: Properties = null)
      {
        // 第一步,使用触发job的最后一个RDD,创建finalStage
        var finalStage: Stage = null
        try {
          // New stage creation may throw an exception if, for example, jobs are run on a
          // HadoopRDD whose underlying HDFS files have been deleted.
          // 创建一个stage对象,并且将stage加入DAGScheduler内部缓存中
          finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
        } catch {
          case e: Exception =>
            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
            listener.jobFailed(e)
            return
        }
        if (finalStage != null) {
          // 第二步,用finalStage创建一个job,这个job的最后一个stage,就是finalStage
          val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
          clearCacheLocs()
          logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
            job.jobId, callSite.shortForm, partitions.length, allowLocal))
          logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
          logInfo("Parents of final stage: " + finalStage.parents)
          logInfo("Missing parents: " + getMissingParentStages(finalStage))
          val shouldRunLocally =
            localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
          val jobSubmissionTime = clock.getTimeMillis()
          if (shouldRunLocally) {
            // Compute very short actions like first() or take() with no parent stages locally.
            listenerBus.post(
              SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
            runLocally(job)
          } else {
            // 第三步,将job加入内存缓存中
            jobIdToActiveJob(jobId) = job
            activeJobs += job
            finalStage.resultOfJob = Some(job)
            val stageIds = jobIdToStageIds(jobId).toArray
            val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
            listenerBus.post(
              SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
            // 第四步,使用submitStage()方法提交finalStage
            // 这个方法的调用,其实会导致第一个stage提交,并且导致其他所有的stage,都给放入waitingStages队列里了
            submitStage(finalStage)
            // stage划分算法,实在太重要了,必须对stage划分算法很清晰,知道自己编写的spark application被划分了几个job,每个job被划分成了几个stage
            // 每个stage,包括了你的那些代码,只有知道了那个stage包括了哪些自己的代码之后,在线上,如果发现某个stage执行特别慢
            // 或者某个stage一直报错,才能针对那个stage对应的代码,去排查问题,或者是性能调优
     
            // stage划分算法总结
            // 1. 从finalStage倒推
            // 2. 通过宽依赖,来进行新的stage划分
            // 3. 使用递归,优先提交父stage
          }
        }
        // 提交等待的stage
        submitWaitingStages()
      }
    
    
    
    
    
    
    ###org.apache.spark.scheduler/DAGScheduler.scala
    
     // 提交stage的方法
      // 这其实就是stage划分算法的入口,但是,stage划分算法,其实是由submitStage()和getMissingParentStages()方法共同组成的
      private def submitStage(stage: Stage) {
        val jobId = activeJobForStage(stage)
        if (jobId.isDefined) {
          logDebug("submitStage(" + stage + ")")
          if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            // 调用getMissingParentStages()去获取当前这个stage的父stage
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            // 这里其实会反复递归调用,直到最初的stage,它没有父stage了,那么,此时,就会首先提交这个第一个stage,stage0
            // 其余的stage,此时,全部都在waitingStages里面
            if (missing == Nil) {
              logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
              submitMissingTasks(stage, jobId.get)
            } else {
              // 递归调用submitStage()方法,去提交父stage
              // 这里的递归,就是stage划分算法的推动者和精髓
              for (parent <- missing) {
                submitStage(parent)
              }
              // 并且将当前stage放入waitingStages等待执行的stage队列中
              waitingStages += stage
            }
          }
        } else {
          abortStage(stage, "No active job for stage " + stage.id)
        }
      }
    
    
    
    
    
    
    ###org.apache.spark.scheduler/DAGScheduler.scala
    
    // 获取某个stage的父stage
      // 这个方法的意思,就是说,对于一个stage,如果它的最后一个rdd的所有依赖,都是窄依赖,那么就不会创建任何新的stage
      // 但是,只要发现这个stage的rdd宽依赖了某个rdd,那么就用宽依赖的那个rdd,创建一个新的stage,然后立即将新的stage返回
      private def getMissingParentStages(stage: Stage): List[Stage] = {
        val missing = new HashSet[Stage]
        val visited = new HashSet[RDD[_]]
        // We are manually maintaining a stack here to prevent StackOverflowError
        // caused by recursively visiting
        val waitingForVisit = new Stack[RDD[_]]
        def visit(rdd: RDD[_]) {
          if (!visited(rdd)) {
            visited += rdd
            if (getCacheLocs(rdd).contains(Nil)) {
              // 遍历rdd的依赖
              // 所以说,针对之前那个流程图,其实对于每一种有shuffle的操作,比如groupByKey、reduceByKey、countByKey
              // 等操作,底层对应了三个RDD,MapPartitionsRDD、ShuffleRDD、MapPartitionsRDD,会划分为两个stage
              for (dep <- rdd.dependencies) {
                dep match {
                  // 如果是宽依赖
                  case shufDep: ShuffleDependency[_, _, _] =>
                    // 那么使用宽依赖的那个rdd,创建一个stage,并且会将isShuffleMap设置为true
                    // 默认最后一个stage,不是shuffleMap stage,但是finalStage之前所有的stage,都是shuffleMap stage
                    val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                    if (!mapStage.isAvailable) {
                      missing += mapStage
                    }
                  // 如果是窄依赖,那么将依赖的rdd放入栈中
                  case narrowDep: NarrowDependency[_] =>
                    waitingForVisit.push(narrowDep.rdd)
                }
              }
            }
          }
        }
        // 首先往栈中,推入了stage的最后一个rdd
        waitingForVisit.push(stage.rdd)
        // 进行while循环
        while (!waitingForVisit.isEmpty) {
          // 对stage的最后一个rdd,调用自己内部定义的visit()方法
          visit(waitingForVisit.pop())
        }
        missing.toList
      }
    
    
    
    
    
    
    ###org.apache.spark.scheduler/DAGScheduler.scala
    
    // 提交stage,为stage创建一批task,task数量与partition数量相同
      private def submitMissingTasks(stage: Stage, jobId: Int) {
        logDebug("submitMissingTasks(" + stage + ")")
        // Get our pending tasks and remember them in our pendingTasks entry
        stage.pendingTasks.clear()
     
        // First figure out the indexes of partition ids to compute.
        // 获取你要创建的task的数量
        val partitionsToCompute: Seq[Int] = {
          if (stage.isShuffleMap) {
            (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
          } else {
            val job = stage.resultOfJob.get
            (0 until job.numPartitions).filter(id => !job.finished(id))
          }
        }
     
        val properties = if (jobIdToActiveJob.contains(jobId)) {
          jobIdToActiveJob(stage.jobId).properties
        } else {
          // this stage will be assigned to "default" pool
          null
        }
     
        // 将stage加入runningStages队列
        runningStages += stage
        // SparkListenerStageSubmitted should be posted before testing whether tasks are
        // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
        // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
        // event.
        stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
        outputCommitCoordinator.stageStart(stage.id)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
     
        // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
        // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
        // the serialized copy of the RDD and for each task we will deserialize it, which means each
        // task gets a different copy of the RDD. This provides stronger isolation between tasks that
        // might modify state of objects referenced in their closures. This is necessary in Hadoop
        // where the JobConf/Configuration object is not thread-safe.
        var taskBinary: Broadcast[Array[Byte]] = null
        try {
          // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
          // For ResultTask, serialize and broadcast (rdd, func).
          val taskBinaryBytes: Array[Byte] =
            if (stage.isShuffleMap) {
              closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()
            } else {
              closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
            }
          taskBinary = sc.broadcast(taskBinaryBytes)
        } catch {
          // In the case of a failure during serialization, abort the stage.
          case e: NotSerializableException =>
            abortStage(stage, "Task not serializable: " + e.toString)
            runningStages -= stage
            return
          case NonFatal(e) =>
            abortStage(stage, s"Task serialization failed: $e
    ${e.getStackTraceString}")
            runningStages -= stage
            return
        }
     
        // 为stage创建指定数量的task
        // 这里很关键的一点是,task的最佳位置计算算法
        val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
          partitionsToCompute.map { id =>
            // 给每一个partition创建一个task,给每个task计算最佳位置
            val locs = getPreferredLocs(stage.rdd, id)
            val part = stage.rdd.partitions(id)
            // 对于finalStage之外的stage,它的isShuffleMap都是true,所以会创建ShuffleMapTask
            new ShuffleMapTask(stage.id, taskBinary, part, locs)
          }
        } else {
          // 如果不是shuffleMap,那么就是finalStage,finalStage是创建ResultTask
          val job = stage.resultOfJob.get
          partitionsToCompute.map { id =>
            val p: Int = job.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = getPreferredLocs(stage.rdd, p)
            new ResultTask(stage.id, taskBinary, part, locs, id)
          }
        }
     
        if (tasks.size > 0) {
          logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
          stage.pendingTasks ++= tasks
          logDebug("New pending tasks: " + stage.pendingTasks)
          // 最后,针对stage的task,创建TaskSet对象,调用taskScheduler的submitTasks()方法,提交taskSet
          taskScheduler.submitTasks(
            new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
          stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
        } else {
          // Because we posted SparkListenerStageSubmitted earlier, we should post
          // SparkListenerStageCompleted here in case there are no tasks to run.
          outputCommitCoordinator.stageEnd(stage.id)
          listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
          logDebug("Stage " + stage + " is actually done; %b %d %d".format(
            stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
          runningStages -= stage
        }
      }
    
    
    
    
    
    
    ###org.apache.spark.scheduler/DAGScheduler.scala
    
      private[spark]
      def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
        getPreferredLocsInternal(rdd, partition, new HashSet)
      }
    
    
    
    
    
    
    ###org.apache.spark.scheduler/DAGScheduler.scala
    
    /**
        * 计算每个task对应的partition的最佳位置,说白了,就是从stage的最后一个rdd开始,去找哪个rdd的partition,是被cache了,或者checkpoint了
        * 那么,task的最佳位置,就是缓存的/checkpoint的partition的位置
        * 因为这样的话,task就在哪个节点上执行,不需要计算之前的rdd了
        */
      private def getPreferredLocsInternal(
          rdd: RDD[_],
          partition: Int,
          visited: HashSet[(RDD[_],Int)])
        : Seq[TaskLocation] =
      {
        // If the partition has already been visited, no need to re-visit.
        // This avoids exponential path exploration.  SPARK-695
        if (!visited.add((rdd,partition))) {
          // Nil has already been returned for previously visited partitions.
          return Nil
        }
        // If the partition is cached, return the cache locations
        // 寻找当前pdd的partiton是否缓存了
        val cached = getCacheLocs(rdd)(partition)
        if (!cached.isEmpty) {
          return cached
        }
        // If the RDD has some placement preferences (as is the case for input RDDs), get those
        // 寻找当前rdd的partition是否checkpoint了
        val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
        if (!rddPrefs.isEmpty) {
          return rddPrefs.map(TaskLocation(_))
        }
        // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
        // that has any placement preferences. Ideally we would choose based on transfer sizes,
        // but this will do for now.
        // 最后,递归调用自己,去寻找rdd的父rdd,看看对应的partition是否缓存或者checkpoint了
        rdd.dependencies.foreach {
          case n: NarrowDependency[_] =>
            for (inPart <- n.getParents(partition)) {
              val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
              if (locs != Nil) {
                return locs
              }
            }
          case _ =>
        }
        // 如果这个stage,从最后一个rdd,到最开始的rdd,partition都没有被缓存或者checkpoint,那么task的最佳位置(PreferredLocs),就是Nil
     
        Nil
      }
  • 相关阅读:
    【面积并】 Atlantis
    【动态前k大 贪心】 Gone Fishing
    【复杂枚举】 library
    【双端队列bfs 网格图建图】拯救大兵瑞恩
    【奇偶传递关系 边带权】 奇偶游戏
    【权值并查集】 supermarket
    CF w4d3 A. Pythagorean Theorem II
    CF w4d2 C. Purification
    CF w4d2 B. Road Construction
    CF w4d2 A. Cakeminator
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11226310.html
Copyright © 2011-2022 走看看