zoukankan      html  css  js  c++  java
  • spark1.1.0源码阅读-dagscheduler and stage

    1. rdd action ->sparkContext.runJob->dagscheduler.runJob

     1   def runJob[T, U: ClassTag](
     2       rdd: RDD[T],
     3       func: (TaskContext, Iterator[T]) => U,
     4       partitions: Seq[Int],
     5       callSite: String,
     6       allowLocal: Boolean,
     7       resultHandler: (Int, U) => Unit,
     8       properties: Properties = null)
     9   {
    10     val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    11     waiter.awaitResult() match {
    12       case JobSucceeded => {}
    13       case JobFailed(exception: Exception) =>
    14         logInfo("Failed to run " + callSite)
    15         throw exception
    16     }
    17   }

    2. sumbitJob

     1   /**
     2    * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
     3    * can be used to block until the the job finishes executing or can be used to cancel the job.
     4    */
     5   def submitJob[T, U](
     6       rdd: RDD[T],
     7       func: (TaskContext, Iterator[T]) => U,
     8       partitions: Seq[Int],
     9       callSite: String,
    10       allowLocal: Boolean,
    11       resultHandler: (Int, U) => Unit,
    12       properties: Properties = null): JobWaiter[U] =
    13   {
    14     // Check to make sure we are not launching a task on a partition that does not exist.
    15     val maxPartitions = rdd.partitions.length
    16     partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    17       throw new IllegalArgumentException(
    18         "Attempting to access a non-existent partition: " + p + ". " +
    19           "Total number of partitions: " + maxPartitions)
    20     }
    21 
    22     val jobId = nextJobId.getAndIncrement()
    23     if (partitions.size == 0) {
    24       return new JobWaiter[U](this, jobId, 0, resultHandler)
    25     }
    26 
    27     assert(partitions.size > 0)
    28     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    29     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    30     eventProcessActor ! JobSubmitted(
    31       jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) //向eventProcessActor发送消息,有个疑问:此处rdd怎么变成message?是将元数据(partition等位置信息)序列化吗?
    32     waiter
    33   }

    3. DAGSchedulerEventProcessActor

     1 private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
     2   extends Actor with Logging {
     3 
     4   override def preStart() {
     5     // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
     6     // valid when the messages arrive
     7     dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
     8   }
     9 
    10   /**
    11    * The main event loop of the DAG scheduler.
    12    */
    13   def receive = {
    14     case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
    15       dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
    16         listener, properties)
    17 
    18     case StageCancelled(stageId) =>
    19       dagScheduler.handleStageCancellation(stageId)
    20 
    21     case JobCancelled(jobId) =>
    22       dagScheduler.handleJobCancellation(jobId)
    23 
    24     case JobGroupCancelled(groupId) =>
    25       dagScheduler.handleJobGroupCancelled(groupId)
    26 
    27     case AllJobsCancelled =>
    28       dagScheduler.doCancelAllJobs()

    4. actor调用 handleJobSubmitted

     1   private[scheduler] def handleJobSubmitted(jobId: Int,
     2       finalRDD: RDD[_],
     3       func: (TaskContext, Iterator[_]) => _,
     4       partitions: Array[Int],
     5       allowLocal: Boolean,
     6       callSite: String,
     7       listener: JobListener,
     8       properties: Properties = null)
     9   {
    10     var finalStage: Stage = null
    11     try {
    12       // New stage creation may throw an exception if, for example, jobs are run on a
    13       // HadoopRDD whose underlying HDFS files have been deleted.
    14       finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
    15     } catch {
    16       case e: Exception =>
    17         logWarning("Creating new stage failed due to exception - job: " + jobId, e)
    18         listener.jobFailed(e)
    19         return
    20     }
    21     if (finalStage != null) {
    22       val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
    23       clearCacheLocs()
    24       logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
    25         job.jobId, callSite, partitions.length, allowLocal))
    26       logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
    27       logInfo("Parents of final stage: " + finalStage.parents)
    28       logInfo("Missing parents: " + getMissingParentStages(finalStage))
    29       if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
    30         // Compute very short actions like first() or take() with no parent stages locally.
    31         listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
    32         runLocally(job) //如果只有一个parition,而且没有parent,并运行本地运行,则单独起一个线程执行
    33       } else {
    34         jobIdToActiveJob(jobId) = job
    35         activeJobs += job
    36         resultStageToJob(finalStage) = job
    37         listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
    38           properties))
    39         submitStage(finalStage)
    40       }
    41     }
    42     submitWaitingStages()
    43   }
     1   /**
     2    * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
     3    * of a shuffle map stage in newOrUsedStage.  The stage will be associated with the provided
     4    * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
     5    * directly.
     6    */
     7   private def newStage(
     8       rdd: RDD[_],
     9       numTasks: Int,
    10       shuffleDep: Option[ShuffleDependency[_,_]],
    11       jobId: Int,
    12       callSite: Option[String] = None)
    13     : Stage =
    14   {
    15     val id = nextStageId.getAndIncrement()
    16     val stage =
    17       new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
    18     stageIdToStage(id) = stage
    19     updateJobIdStageIdMaps(jobId, stage)
    20     stageToInfos(stage) = StageInfo.fromStage(stage)
    21     stage
    22   }
     1   /**
     2    * Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
     3    * We run the operation in a separate thread just in case it takes a bunch of time, so that we
     4    * don't block the DAGScheduler event loop or other concurrent jobs.
     5    */
     6   protected def runLocally(job: ActiveJob) {
     7     logInfo("Computing the requested partition locally")
     8     new Thread("Local computation of job " + job.jobId) {
     9       override def run() {
    10         runLocallyWithinThread(job)
    11       }
    12     }.start()
    13   }

    5. submitStage: 如果parent stage有缺失,

     1   /** Submits stage, but first recursively submits any missing parents. */
     2   private def submitStage(stage: Stage) {
     3     val jobId = activeJobForStage(stage)
     4     if (jobId.isDefined) {
     5       logDebug("submitStage(" + stage + ")")
     6       if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
     7         val missing = getMissingParentStages(stage).sortBy(_.id) 
     8         logDebug("missing: " + missing)
     9         if (missing == Nil) {
    10           logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
    11           submitMissingTasks(stage, jobId.get) 
    12           runningStages += stage
    13         } else {
    14           for (parent <- missing) {
    15             submitStage(parent)
    16           }
    17           waitingStages += stage
    18         }
    19       }
    20     } else {
    21       abortStage(stage, "No active job for stage " + stage.id)
    22     }
    23   }
     1   private def getMissingParentStages(stage: Stage): List[Stage] = {
     2     val missing = new HashSet[Stage]
     3     val visited = new HashSet[RDD[_]]
     4     def visit(rdd: RDD[_]) {
     5       if (!visited(rdd)) {
     6         visited += rdd
     7         if (getCacheLocs(rdd).contains(Nil)) {//如果cacheLocs包含Nil,则认为此rdd miss了
     8           for (dep <- rdd.dependencies) {
     9             dep match { //分两种情况:shufDep和narrowDep,前者会生成shuffleMapStage,后者会递归访问
    10               case shufDep: ShuffleDependency[_,_] =>
    11                 val mapStage = getShuffleMapStage(shufDep, stage.jobId)
    12                 if (!mapStage.isAvailable) {
    13                   missing += mapStage
    14                 }
    15               case narrowDep: NarrowDependency[_] =>
    16                 visit(narrowDep.rdd)
    17             }
    18           }
    19         }
    20       }
    21     }
    22     visit(stage.rdd)
    23     missing.toList
    24   }

    6. submitMissTasks

     1   /** Called when stage's parents are available and we can now do its task. */
     2   private def submitMissingTasks(stage: Stage, jobId: Int) {
     3     logDebug("submitMissingTasks(" + stage + ")")
     4     // Get our pending tasks and remember them in our pendingTasks entry
     5     val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
     6     myPending.clear()
     7     var tasks = ArrayBuffer[Task[_]]()
     8     if (stage.isShuffleMap) {
     9       for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { //将stage中存储空间outputLocas为Nil的patition生成一个shuffleMapTask
    10         val locs = getPreferredLocs(stage.rdd, p)
    11         tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
    12       }
    13     } else {
    14       // This is a final stage; figure out its job's missing partitions
    15       val job = resultStageToJob(stage)
    16       for (id <- 0 until job.numPartitions if !job.finished(id)) {
    17         val partition = job.partitions(id)
    18         val locs = getPreferredLocs(stage.rdd, partition)
    19         tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) //生成resultTask
    20       }
    21     }
    22 
    23     val properties = if (jobIdToActiveJob.contains(jobId)) {
    24       jobIdToActiveJob(stage.jobId).properties
    25     } else {
    26       // this stage will be assigned to "default" pool
    27       null
    28     }
    29 
    30     // must be run listener before possible NotSerializableException
    31     // should be "StageSubmitted" first and then "JobEnded"
    32     listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))
    33 
    34     if (tasks.size > 0) {
    35       // Preemptively serialize a task to make sure it can be serialized. We are catching this
    36       // exception here because it would be fairly hard to catch the non-serializable exception
    37       // down the road, where we have several different implementations for local scheduler and
    38       // cluster schedulers.
    39       try {
    40         SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
    41       } catch {
    42         case e: NotSerializableException =>
    43           abortStage(stage, "Task not serializable: " + e.toString)
    44           runningStages -= stage
    45           return
    46       }
    47 
    48       logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
    49       myPending ++= tasks
    50       logDebug("New pending tasks: " + myPending)
    51       taskScheduler.submitTasks(
    52         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) //将这些task生成一个taskSet,并调用taskScheduler.submitTasks
    53       stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
    54     } else {
    55       logDebug("Stage " + stage + " is actually done; %b %d %d".format(
    56         stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
    57       runningStages -= stage
    58     }
    59   }

    7. taskSet: 某个rdd的一部分parition missing了,会通过上面的步骤找到,并将这些partition生成对应的tasks,通过taskSet来一起调度。

     1 /**
     2  * A set of tasks submitted together to the low-level TaskScheduler, usually representing
     3  * missing partitions of a particular stage.
     4  */
     5 private[spark] class TaskSet(
     6     val tasks: Array[Task[_]],
     7     val stageId: Int,
     8     val attempt: Int,
     9     val priority: Int,
    10     val properties: Properties) {
    11     val id: String = stageId + "." + attempt
    12 
    13   def kill(interruptThread: Boolean) {
    14     tasks.foreach(_.kill(interruptThread))
    15   }
    16 
    17   override def toString: String = "TaskSet " + id
    18 }

    8. taskScheduler.submitTasks

     1   override def submitTasks(taskSet: TaskSet) {
     2     val tasks = taskSet.tasks
     3     logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
     4     this.synchronized {
     5       val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
     6       activeTaskSets(taskSet.id) = manager
     7       schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
     8 
     9       if (!isLocal && !hasReceivedTask) {
    10         starvationTimer.scheduleAtFixedRate(new TimerTask() {
    11           override def run() {
    12             if (!hasLaunchedTask) {
    13               logWarning("Initial job has not accepted any resources; " +
    14                 "check your cluster UI to ensure that workers are registered " +
    15                 "and have sufficient memory")
    16             } else {
    17               this.cancel()
    18             }
    19           }
    20         }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
    21       }
    22       hasReceivedTask = true
    23     }
    24     backend.reviveOffers()
    25   }
  • 相关阅读:
    BZOJ1033:[ZJOI2008]杀蚂蚁antbuster(模拟)
    BZOJ4001:[TJOI2015]概率论(卡特兰数,概率期望)
    BZOJ1820:[JSOI2010]Express Service 快递服务(DP)
    BZOJ4066:简单题(K-D Tree)
    2110. [NOIP2015普及]金币
    73. 找最佳通路
    cogs 7. 通信线路
    codevs 3295 落单的数
    151. [USACO Dec07] 建造路径
    必备算法之二叉树的相关操作
  • 原文地址:https://www.cnblogs.com/Torstan/p/4158459.html
Copyright © 2011-2022 走看看