1. rdd action ->sparkContext.runJob->dagscheduler.runJob
1 def runJob[T, U: ClassTag]( 2 rdd: RDD[T], 3 func: (TaskContext, Iterator[T]) => U, 4 partitions: Seq[Int], 5 callSite: String, 6 allowLocal: Boolean, 7 resultHandler: (Int, U) => Unit, 8 properties: Properties = null) 9 { 10 val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties) 11 waiter.awaitResult() match { 12 case JobSucceeded => {} 13 case JobFailed(exception: Exception) => 14 logInfo("Failed to run " + callSite) 15 throw exception 16 } 17 }
2. sumbitJob
1 /** 2 * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object 3 * can be used to block until the the job finishes executing or can be used to cancel the job. 4 */ 5 def submitJob[T, U]( 6 rdd: RDD[T], 7 func: (TaskContext, Iterator[T]) => U, 8 partitions: Seq[Int], 9 callSite: String, 10 allowLocal: Boolean, 11 resultHandler: (Int, U) => Unit, 12 properties: Properties = null): JobWaiter[U] = 13 { 14 // Check to make sure we are not launching a task on a partition that does not exist. 15 val maxPartitions = rdd.partitions.length 16 partitions.find(p => p >= maxPartitions || p < 0).foreach { p => 17 throw new IllegalArgumentException( 18 "Attempting to access a non-existent partition: " + p + ". " + 19 "Total number of partitions: " + maxPartitions) 20 } 21 22 val jobId = nextJobId.getAndIncrement() 23 if (partitions.size == 0) { 24 return new JobWaiter[U](this, jobId, 0, resultHandler) 25 } 26 27 assert(partitions.size > 0) 28 val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] 29 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) 30 eventProcessActor ! JobSubmitted( 31 jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) //向eventProcessActor发送消息,有个疑问:此处rdd怎么变成message?是将元数据(partition等位置信息)序列化吗? 32 waiter 33 }
3. DAGSchedulerEventProcessActor
1 private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) 2 extends Actor with Logging { 3 4 override def preStart() { 5 // set DAGScheduler for taskScheduler to ensure eventProcessActor is always 6 // valid when the messages arrive 7 dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) 8 } 9 10 /** 11 * The main event loop of the DAG scheduler. 12 */ 13 def receive = { 14 case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => 15 dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, 16 listener, properties) 17 18 case StageCancelled(stageId) => 19 dagScheduler.handleStageCancellation(stageId) 20 21 case JobCancelled(jobId) => 22 dagScheduler.handleJobCancellation(jobId) 23 24 case JobGroupCancelled(groupId) => 25 dagScheduler.handleJobGroupCancelled(groupId) 26 27 case AllJobsCancelled => 28 dagScheduler.doCancelAllJobs()
4. actor调用 handleJobSubmitted
1 private[scheduler] def handleJobSubmitted(jobId: Int, 2 finalRDD: RDD[_], 3 func: (TaskContext, Iterator[_]) => _, 4 partitions: Array[Int], 5 allowLocal: Boolean, 6 callSite: String, 7 listener: JobListener, 8 properties: Properties = null) 9 { 10 var finalStage: Stage = null 11 try { 12 // New stage creation may throw an exception if, for example, jobs are run on a 13 // HadoopRDD whose underlying HDFS files have been deleted. 14 finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite)) 15 } catch { 16 case e: Exception => 17 logWarning("Creating new stage failed due to exception - job: " + jobId, e) 18 listener.jobFailed(e) 19 return 20 } 21 if (finalStage != null) { 22 val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) 23 clearCacheLocs() 24 logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format( 25 job.jobId, callSite, partitions.length, allowLocal)) 26 logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") 27 logInfo("Parents of final stage: " + finalStage.parents) 28 logInfo("Missing parents: " + getMissingParentStages(finalStage)) 29 if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { 30 // Compute very short actions like first() or take() with no parent stages locally. 31 listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) 32 runLocally(job) //如果只有一个parition,而且没有parent,并运行本地运行,则单独起一个线程执行 33 } else { 34 jobIdToActiveJob(jobId) = job 35 activeJobs += job 36 resultStageToJob(finalStage) = job 37 listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, 38 properties)) 39 submitStage(finalStage) 40 } 41 } 42 submitWaitingStages() 43 }
1 /** 2 * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation 3 * of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided 4 * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage 5 * directly. 6 */ 7 private def newStage( 8 rdd: RDD[_], 9 numTasks: Int, 10 shuffleDep: Option[ShuffleDependency[_,_]], 11 jobId: Int, 12 callSite: Option[String] = None) 13 : Stage = 14 { 15 val id = nextStageId.getAndIncrement() 16 val stage = 17 new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) 18 stageIdToStage(id) = stage 19 updateJobIdStageIdMaps(jobId, stage) 20 stageToInfos(stage) = StageInfo.fromStage(stage) 21 stage 22 }
1 /** 2 * Run a job on an RDD locally, assuming it has only a single partition and no dependencies. 3 * We run the operation in a separate thread just in case it takes a bunch of time, so that we 4 * don't block the DAGScheduler event loop or other concurrent jobs. 5 */ 6 protected def runLocally(job: ActiveJob) { 7 logInfo("Computing the requested partition locally") 8 new Thread("Local computation of job " + job.jobId) { 9 override def run() { 10 runLocallyWithinThread(job) 11 } 12 }.start() 13 }
5. submitStage: 如果parent stage有缺失,
1 /** Submits stage, but first recursively submits any missing parents. */ 2 private def submitStage(stage: Stage) { 3 val jobId = activeJobForStage(stage) 4 if (jobId.isDefined) { 5 logDebug("submitStage(" + stage + ")") 6 if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { 7 val missing = getMissingParentStages(stage).sortBy(_.id) 8 logDebug("missing: " + missing) 9 if (missing == Nil) { 10 logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") 11 submitMissingTasks(stage, jobId.get) 12 runningStages += stage 13 } else { 14 for (parent <- missing) { 15 submitStage(parent) 16 } 17 waitingStages += stage 18 } 19 } 20 } else { 21 abortStage(stage, "No active job for stage " + stage.id) 22 } 23 }
1 private def getMissingParentStages(stage: Stage): List[Stage] = { 2 val missing = new HashSet[Stage] 3 val visited = new HashSet[RDD[_]] 4 def visit(rdd: RDD[_]) { 5 if (!visited(rdd)) { 6 visited += rdd 7 if (getCacheLocs(rdd).contains(Nil)) {//如果cacheLocs包含Nil,则认为此rdd miss了 8 for (dep <- rdd.dependencies) { 9 dep match { //分两种情况:shufDep和narrowDep,前者会生成shuffleMapStage,后者会递归访问 10 case shufDep: ShuffleDependency[_,_] => 11 val mapStage = getShuffleMapStage(shufDep, stage.jobId) 12 if (!mapStage.isAvailable) { 13 missing += mapStage 14 } 15 case narrowDep: NarrowDependency[_] => 16 visit(narrowDep.rdd) 17 } 18 } 19 } 20 } 21 } 22 visit(stage.rdd) 23 missing.toList 24 }
6. submitMissTasks
1 /** Called when stage's parents are available and we can now do its task. */ 2 private def submitMissingTasks(stage: Stage, jobId: Int) { 3 logDebug("submitMissingTasks(" + stage + ")") 4 // Get our pending tasks and remember them in our pendingTasks entry 5 val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet) 6 myPending.clear() 7 var tasks = ArrayBuffer[Task[_]]() 8 if (stage.isShuffleMap) { 9 for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { //将stage中存储空间outputLocas为Nil的patition生成一个shuffleMapTask 10 val locs = getPreferredLocs(stage.rdd, p) 11 tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) 12 } 13 } else { 14 // This is a final stage; figure out its job's missing partitions 15 val job = resultStageToJob(stage) 16 for (id <- 0 until job.numPartitions if !job.finished(id)) { 17 val partition = job.partitions(id) 18 val locs = getPreferredLocs(stage.rdd, partition) 19 tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) //生成resultTask 20 } 21 } 22 23 val properties = if (jobIdToActiveJob.contains(jobId)) { 24 jobIdToActiveJob(stage.jobId).properties 25 } else { 26 // this stage will be assigned to "default" pool 27 null 28 } 29 30 // must be run listener before possible NotSerializableException 31 // should be "StageSubmitted" first and then "JobEnded" 32 listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties)) 33 34 if (tasks.size > 0) { 35 // Preemptively serialize a task to make sure it can be serialized. We are catching this 36 // exception here because it would be fairly hard to catch the non-serializable exception 37 // down the road, where we have several different implementations for local scheduler and 38 // cluster schedulers. 39 try { 40 SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head) 41 } catch { 42 case e: NotSerializableException => 43 abortStage(stage, "Task not serializable: " + e.toString) 44 runningStages -= stage 45 return 46 } 47 48 logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") 49 myPending ++= tasks 50 logDebug("New pending tasks: " + myPending) 51 taskScheduler.submitTasks( 52 new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) //将这些task生成一个taskSet,并调用taskScheduler.submitTasks 53 stageToInfos(stage).submissionTime = Some(System.currentTimeMillis()) 54 } else { 55 logDebug("Stage " + stage + " is actually done; %b %d %d".format( 56 stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) 57 runningStages -= stage 58 } 59 }
7. taskSet: 某个rdd的一部分parition missing了,会通过上面的步骤找到,并将这些partition生成对应的tasks,通过taskSet来一起调度。
1 /** 2 * A set of tasks submitted together to the low-level TaskScheduler, usually representing 3 * missing partitions of a particular stage. 4 */ 5 private[spark] class TaskSet( 6 val tasks: Array[Task[_]], 7 val stageId: Int, 8 val attempt: Int, 9 val priority: Int, 10 val properties: Properties) { 11 val id: String = stageId + "." + attempt 12 13 def kill(interruptThread: Boolean) { 14 tasks.foreach(_.kill(interruptThread)) 15 } 16 17 override def toString: String = "TaskSet " + id 18 }
8. taskScheduler.submitTasks
1 override def submitTasks(taskSet: TaskSet) { 2 val tasks = taskSet.tasks 3 logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") 4 this.synchronized { 5 val manager = new TaskSetManager(this, taskSet, maxTaskFailures) 6 activeTaskSets(taskSet.id) = manager 7 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) 8 9 if (!isLocal && !hasReceivedTask) { 10 starvationTimer.scheduleAtFixedRate(new TimerTask() { 11 override def run() { 12 if (!hasLaunchedTask) { 13 logWarning("Initial job has not accepted any resources; " + 14 "check your cluster UI to ensure that workers are registered " + 15 "and have sufficient memory") 16 } else { 17 this.cancel() 18 } 19 } 20 }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) 21 } 22 hasReceivedTask = true 23 } 24 backend.reviveOffers() 25 }