Task的运行过程分析
Task的运行通过Worker启动时生成的Executor实例进行,
caseRegisteredExecutor(sparkProperties)=>
logInfo("Successfullyregistered with driver")
//Make this host instead of hostPort ?
executor= newExecutor(executorId, Utils.parseHostPort(hostPort)._1,sparkProperties)
通过executor实例的launchTask启动task的运行操作。
deflaunchTask(context: ExecutorBackend, taskId: Long, serializedTask:ByteBuffer) {
valtr = newTaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId,tr)
threadPool.execute(tr)
}
生成TaskRunner线程,把task与当前的Wroker的启动的executorBackend传入,
onyarn模式为CoarseGrainedExecutorBackend.
通过threadPool线程池运行生成TaskRunner线程。
TaskRunner.run函数:
用于运行task任务的线程
overridedefrun(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser){ () =>
valstartTime =System.currentTimeMillis()
SparkEvn后面在进行分析。
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
valser =SparkEnv.get.closureSerializer.newInstance()
logInfo("Runningtask ID " + taskId)
通过execBackend更新此task的状态。设置task的状态为RUNNING.向master发送StatusUpdate事件。
execBackend.statusUpdate(taskId,TaskState.RUNNING,EMPTY_BYTE_BUFFER)
varattemptedTask:Option[Task[Any]] = None
vartaskStart:Long = 0
defgcTime =ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
valstartGCTime= gcTime
try{
SparkEnv.set(env)
Accumulators.clear()
解析出task的资源信息。包含要运行的jar,其他资源,task定义信息
val(taskFiles,taskJars,taskBytes)= Task.deserializeWithDependencies(serializedTask)
更新资源信息,并把task运行的jar更新到当前Thread的ClassLoader中。
updateDependencies(taskFiles,taskJars)
通过SparkEnv中配置的Serialize实现对task定义进行反serialize,得到Task实例。
Task的详细实现为ShuffleMapTask或者ResultTask
task= ser.deserialize[Task[Any]](taskBytes,Thread.currentThread.getContextClassLoader)
假设killed的值为true,不运行当前task任务,进入catch处理。
//If this task has been killed before we deserializedit, let's quit now. Otherwise,
//continue executing the task.
if(killed) {
//Throw an exception rather than returning, because returning within atry{} block
//causes a NonLocalReturnControl exception to be thrown. TheNonLocalReturnControl
//exception will be caught by the catch block, leading to an incorrectExceptionFailure
//for the task.
throwTaskKilledException
}
attemptedTask= Some(task)
logDebug("Task" + taskId +"'sepoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
生成TaskContext实例,通过Task.runTask运行task的任务,等待task运行完毕。
//Run the actual task and measure its runtime.
taskStart= System.currentTimeMillis()
valvalue =task.run(taskId.toInt)
valtaskFinish= System.currentTimeMillis()
此时task运行结束,检查假设task是被killed的结果,进入catch处理。
//If the task has been killed, let's fail it.
if(task.killed){
throwTaskKilledException
}
对task运行的返回结果进行serialize操作。
valresultSer =SparkEnv.get.serializer.newInstance()
valbeforeSerialization= System.currentTimeMillis()
valvalueBytes= resultSer.serialize(value)
valafterSerialization= System.currentTimeMillis()
发送监控指标
for(m <-task.metrics){
m.hostname= Utils.localHostName()
m.executorDeserializeTime= (taskStart- startTime).toInt
m.executorRunTime= (taskFinish- taskStart).toInt
m.jvmGCTime= gcTime - startGCTime
m.resultSerializationTime= (afterSerialization- beforeSerialization).toInt
}
valaccumUpdates= Accumulators.values
把Task的返回结果生成DirectTaskResult实例。并对其进行serialize操作。
valdirectResult= newDirectTaskResult(valueBytes,accumUpdates,task.metrics.getOrElse(null))
valserializedDirectResult= ser.serialize(directResult)
logInfo("Serializedsize of result for " + taskId + "is " +serializedDirectResult.limit)
检查taskresult的大小是否超过了akka的发送消息大小,
假设是通过BlockManager来管理结果,设置RDD的存储级别为MEMORY与DISK,否则表示没有达到actor消息大小,
直接使用TaskResult,此部分信息主要是须要通过状态更新向Scheduler向送StatusUpdate事件调用。
valserializedResult= {
if(serializedDirectResult.limit>= akkaFrameSize- 1024) {
logInfo("Storingresult for " + taskId + "in local BlockManager")
valblockId =TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId,serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER)
ser.serialize(newIndirectTaskResult[Any](blockId))
} else{
logInfo("Sendingresult for " + taskId + "directly to driver")
serializedDirectResult
}
}
通过execBackend更新此task的状态。设置task的状态为FINISHED.向master发送StatusUpdate事件。
execBackend.statusUpdate(taskId,TaskState.FINISHED,serializedResult)
logInfo("Finishedtask ID " + taskId)
} catch{
出现异常,发送FAILED事件。
caseffe:FetchFailedException => {
valreason =ffe.toTaskEndReason
execBackend.statusUpdate(taskId,TaskState.FAILED,ser.serialize(reason))
}
caseTaskKilledException => {
logInfo("Executorkilled task " + taskId)
execBackend.statusUpdate(taskId,TaskState.KILLED,ser.serialize(TaskKilled))
}
caset:Throwable => {
valserviceTime= (System.currentTimeMillis() - taskStart).toInt
valmetrics =attemptedTask.flatMap(t=> t.metrics)
for(m <-metrics) {
m.executorRunTime= serviceTime
m.jvmGCTime= gcTime - startGCTime
}
valreason =ExceptionFailure(t.getClass.getName,t.toString,t.getStackTrace,metrics)
execBackend.statusUpdate(taskId,TaskState.FAILED,ser.serialize(reason))
//TODO: Should we exit the whole executor here? On the one hand, thefailed task may
//have left some weird state around depending on when the exception wasthrown, but on
//the other hand, maybe we could detect that when future tasks fail andexit then.
logError("Exceptionin task ID " + taskId, t)
//System.exit(1)
}
} finally{
从shuffleMemoryMap中移出此线程相应的shuffle的内存空间
//TODO: Unregister shuffle memory only for ResultTask
valshuffleMemoryMap= env.shuffleMemoryMap
shuffleMemoryMap.synchronized{
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
从runningTasks中移出此task
runningTasks.remove(taskId)
}
}
}
Task运行过程的状态更新
ExecutorBackend.statusUpdate
onyarn模式实现类CoarseGrainedExecutorBackend,通过master的actor发送StatusUpdate事件。
overridedefstatusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver! StatusUpdate(executorId, taskId, state, data)
}
master中的ExecutorBackend处理状态更新操作:
实现类:CoarseGrainedSchedulerBackend.DriverActor
caseStatusUpdate(executorId,taskId,state,data) =>
通过TaskSchedulerImpl的statusUpdate处理状态更新。
scheduler.statusUpdate(taskId,state,data.value)
假设Task状态为完毕状态,完毕状态包括(FINISHED,FAILED,KILLED,LOST)
if(TaskState.isFinished(state)){
if(executorActor.contains(executorId)){
每个task占用一个cpucore,此时task完毕,把可用的core值加一
freeCores(executorId) += 1
在此executor上接着运行其于的task任务,此部分可參见scheduler调度过程分析中的部分说明。
makeOffers(executorId)
} else{
//Ignoring the update since we don't know about the executor.
valmsg ="Ignored task status update (%dstate %s) from unknown executor %s with ID %s"
logWarning(msg.format(taskId,state,sender, executorId))
}
}
TaskSchedulerImpl.statusUpdate函数处理流程
defstatusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer){
varfailedExecutor:Option[String] = None
synchronized {
try{
假设Task的状态传入为Task的运行丢失,同一时候task在executor列表中存在
if(state == TaskState.LOST&& taskIdToExecutorId.contains(tid)){
得到此task运行的worker所属的executorID,
//We lost this entire executor, so remember that it's gone
valexecId =taskIdToExecutorId(tid)
假设此executor是active的Executor,运行scheduler的executorLost操作。
包括TaskSetManager,会运行TaskSetManager.executorLost操作.
设置当前的executor为failedExecutor,共函数最后使用。
if(activeExecutorIds.contains(execId)){
removeExecutor(execId)
failedExecutor= Some(execId)
}
}
taskIdToTaskSetId.get(tid)match{
caseSome(taskSetId)=>
假设task状态是完毕状态,非RUNNING状态。移出相应的容器中的值
if(TaskState.isFinished(state)) {
taskIdToTaskSetId.remove(tid)
if(taskSetTaskIds.contains(taskSetId)){
taskSetTaskIds(taskSetId)-= tid
}
taskIdToExecutorId.remove(tid)
}
activeTaskSets.get(taskSetId).foreach{ taskSet =>
假设task是成功完毕,从TaskSet中移出此task,同一时候通过TaskResultGetter获取数据。
if(state == TaskState.FINISHED){
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet,tid, serializedData)
} elseif(Set(TaskState.FAILED,TaskState.KILLED,TaskState.LOST).contains(state)){
task任务运行失败的处理部分:
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet,tid, state, serializedData)
}
}
caseNone =>
logInfo("Ignoringupdate with state %s from TID %s because its task set is gone"
.format(state, tid))
}
} catch{
casee:Exception => logError("Exceptionin statusUpdate", e)
}
}
假设有failed的worker,通过dagScheduler处理此executor.
//Update the DAGScheduler without holding a lock on this, since thatcan deadlock
if(failedExecutor!= None) {
dagScheduler.executorLost(failedExecutor.get)
发起task运行的分配与任务运行操作。
backend.reviveOffers()
}
}
TaskStatus.LOST状态,同一时候executor在activeExecutorIds中
TaskStatus的状态为LOST时,同一时候executor是活动的executor(也就是有过运行task的情况)
privatedefremoveExecutor(executorId: String) {
从activeExecutorIds中移出此executor
activeExecutorIds-= executorId
得到此executor相应的worker的host
valhost =executorIdToHost(executorId)
取出host相应的全部executor,并移出当前的executor
valexecs =executorsByHost.getOrElse(host,newHashSet)
execs-= executorId
if(execs.isEmpty){
executorsByHost-= host
}
从executor相应的host容器中移出此executor
executorIdToHost-= executorId
此处主要是去运行TaskSetManager.executorLost函数。
rootPool.executorLost(executorId,host)
}
TaskSetManager.executorLost函数:
此函数主要处理executor导致task丢失的情况,把executor上的task又一次加入到pending的tasks列表中
overridedefexecutorLost(execId: String, host: String) {
logInfo("Re-queueingtasks for " + execId + "from TaskSet " + taskSet.id)
//Re-enqueue pending tasks for this host based on the status of thecluster -- for example, a
//task that used to have locations on only this host might now go tothe no-prefslist. Note
//that it's okay if we add a task to the same queue twice (if it hadmultiple preferred
//locations), because findTaskFromList will skip already-running tasks.
又一次生成此TaskSet中的pending队列,由于当前executor的实例被移出,须要又一次生成。
for(index <-getPendingTasksForExecutor(execId)) {
addPendingTask(index,readding=true)
}
for(index <-getPendingTasksForHost(host)) {
addPendingTask(index,readding=true)
}
//Re-enqueue any tasks that ran on the failed executor if this is ashuffle map stage
假设当前的RDD是shuffle的rdd,
if(tasks(0).isInstanceOf[ShuffleMapTask]){
for((tid,info) <-taskInfosifinfo.executorId== execId) {
valindex =taskInfos(tid).index
if(successful(index)){
successful(index)= false
copiesRunning(index) -= 1
tasksSuccessful-= 1
addPendingTask(index)
//Tell the DAGScheduler that this task was resubmitted so that itdoesn't think our
//stage finishes when a total of tasks.size tasks finish.
通过DAGScheduler发送CompletionEvent处理事件,事件类型为Resubmitted,
sched.dagScheduler.taskEnded(tasks(index),Resubmitted, null,null,info, null)
}
}
}
假设task还处于running状态,同一时候此task在lost的executor上执行,
//Also re-enqueue any tasks that were running on the node
for((tid,info) <-taskInfosifinfo.running&& info.executorId== execId) {
设置task的failed值为true,移出此task的running列表中的值,又一次加入task到pendingtasks队列中。
handleFailedTask(tid,TaskState.FAILED,None)
}
}
DAGScheduler处理CompletionEvent事件。
...........................
casecompletion@ CompletionEvent(task,reason,_, _, taskInfo,taskMetrics)=>
listenerBus.post(SparkListenerTaskEnd(task,reason,taskInfo,taskMetrics))
handleTaskCompletion(completion)
.........................
caseResubmitted =>
logInfo("Resubmitted" + task+ ", so marking it as stillrunning")
pendingTasks(stage)+= task
(TaskState.FAILED,TaskState.KILLED,TaskState.LOST)状态
.........................
} elseif(Set(TaskState.FAILED,TaskState.KILLED,TaskState.LOST).contains(state)){
把task从running容器中移出
taskSet.removeRunningTask(tid)
此函数主要是解析出出错的信息。并通过TaskSchedulerImpl.handleFailedTask处理exception
taskResultGetter.enqueueFailedTask(taskSet,tid, state, serializedData)
}
TaskSchedulerImpl.handleFailedTask函数:
defhandleFailedTask(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
reason: Option[TaskEndReason]) =synchronized {
taskSetManager.handleFailedTask(tid,taskState, reason)
假设task不是被KILLED掉的task,又一次发起task的分配与运行操作。
if(taskState != TaskState.KILLED){
//Need to revive offers again now that the task set manager state hasbeen updated to
//reflect failed tasks that need to be re-run.
backend.reviveOffers()
}
}
TaskSetManager.handleFailedTask函数流程
TaskSetManager.handleFailedTask,函数,处理task运行的exception信息。
defhandleFailedTask(tid: Long, state: TaskState, reason:Option[TaskEndReason]) {
valinfo =taskInfos(tid)
if(info.failed){
return
}
removeRunningTask(tid)
valindex =info.index
info.markFailed()
varfailureReason= "unknown"
if(!successful(index)){
logWarning("LostTID %s (task %s:%d)".format(tid,taskSet.id,index))
copiesRunning(index) -= 1
假设是通过TaskSetManager.executorLost函数发起的此函数调用(Task.LOST),以下的case部分不会运行,
否则是task的运行exception情况,也就是状态更新中非Task.LOST状态时。
//Check if the problem is a map output fetch failure. In that case,this
//task will never succeed on any node, so tell the scheduler about it.
reason.foreach {
casefetchFailed:FetchFailed =>
读取失败,移出全部此taskset的task运行。并从scheduler中移出此taskset的调度,不再运行以下流程
logWarning("Losswas due to fetch failure from " +fetchFailed.bmAddress)
sched.dagScheduler.taskEnded(tasks(index),fetchFailed,null,null,info, null)
successful(index)= true
tasksSuccessful+= 1
sched.taskSetFinished(this)
removeAllRunningTasks()
return
caseTaskKilled =>
task被kill掉,移出此task,同一时候不再运行以下流程
logWarning("Task%d was killed.".format(tid))
sched.dagScheduler.taskEnded(tasks(index),reason.get, null,null,info, null)
return
caseef:ExceptionFailure =>
sched.dagScheduler.taskEnded(
tasks(index),ef, null,null,info,ef.metrics.getOrElse(null))
if(ef.className== classOf[NotSerializableException].getName()) {
//If the task result wasn't rerializable,there's no point in trying to re-execute it.
logError("Task%s:%s had a not serializable result: %s; not retrying".format(
taskSet.id,index,ef.description))
abort("Task%s:%s had a not serializable result: %s".format(
taskSet.id,index,ef.description))
return
}
valkey =ef.description
failureReason= "Exception failure:%s".format(ef.description)
valnow =clock.getTime()
val(printFull,dupCount) ={
if(recentExceptions.contains(key)){
val(dupCount,printTime)= recentExceptions(key)
if(now -printTime >EXCEPTION_PRINT_INTERVAL){
recentExceptions(key)= (0, now)
(true,0)
} else{
recentExceptions(key)= (dupCount+ 1,printTime)
(false,dupCount +1)
}
} else{
recentExceptions(key)= (0, now)
(true,0)
}
}
if(printFull){
vallocs =ef.stackTrace.map(loc=> " at%s".format(loc.toString))
logWarning("Losswas due to %s %s %s".format(
ef.className,ef.description,locs.mkString(" ")))
} else{
logInfo("Losswas due to %s [duplicate %d]".format(ef.description,dupCount))
}
caseTaskResultLost =>
failureReason= "Lost result for TID %s onhost %s".format(tid, info.host)
logWarning(failureReason)
sched.dagScheduler.taskEnded(tasks(index),TaskResultLost, null,null,info, null)
case_ => {}
}
又一次把task加入到pending的运行队列中,同一时候假设状态非KILLED的状态,设置并检查是否达到重试的最大次数
//On non-fetch failures, re-enqueue the task as pending for a maxnumber of retries
addPendingTask(index)
if(state != TaskState.KILLED){
numFailures(index) += 1
if(numFailures(index)>= maxTaskFailures){
logError("Task%s:%d failed %d times; aborting job".format(
taskSet.id,index,maxTaskFailures))
abort("Task%s:%d failed %d times (most recent failure: %s)".format(
taskSet.id,index,maxTaskFailures,failureReason))
}
}
}else{
logInfo("Ignoringtask-lost event for TID " + tid +
"because task " + index+ " is already finished")
}
}
DAGScheduler处理taskEnded流程:
deftaskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics) {
eventProcessActor! CompletionEvent(task, reason, result, accumUpdates, taskInfo,taskMetrics)
}
处理CompletionEvent事件:
casecompletion@ CompletionEvent(task,reason, _,_, taskInfo,taskMetrics)=>
listenerBus.post(SparkListenerTaskEnd(task,reason,taskInfo,taskMetrics))
handleTaskCompletion(completion)
DAGScheduler.handleTaskCompletion
读取失败的case,
caseFetchFailed(bmAddress,shuffleId,mapId,reduceId)=>
//Mark the stage that the reducer was in as unrunnable
valfailedStage= stageIdToStage(task.stageId)
running-= failedStage
failed+= failedStage
..............................
//Mark the map whose fetch failed as broken in the map stage
valmapStage =shuffleToMapStage(shuffleId)
if(mapId !=-1) {
mapStage.removeOutputLoc(mapId,bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId,mapId,bmAddress)
}
...........................
failed+= mapStage
//Remember that a fetch failed now; this is used to resubmit the broken
//stages later, after a small wait (to give other tasks the chance tofail)
lastFetchFailureTime= System.currentTimeMillis() // TODO:Use pluggableclock
//TODO: mark the executor as failed only if there were lots of fetchfailures on it
if(bmAddress!= null){
把stage中可运行的partition中相应的executorid的location所有移出。
handleExecutorLost(bmAddress.executorId,Some(task.epoch))
}
caseExceptionFailure(className,description,stackTrace,metrics) =>
//Do nothing here, left up to the TaskScheduler to decide how to handleuser failures
caseTaskResultLost =>
//Do nothing here; the TaskScheduler handles these failures andresubmits the task.
TaskStatus.FINISHED状态
此状态表示task正常完毕,
if(state == TaskState.FINISHED){
移出taskSet中的running队列中移出此task
taskSet.removeRunningTask(tid)
获取task的响应数据。
taskResultGetter.enqueueSuccessfulTask(taskSet,tid, serializedData)
TaskResultGetter.enqueueSuccessfulTask函数:
defenqueueSuccessfulTask(
taskSetManager: TaskSetManager,tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(newRunnable {
overridedefrun() {
try{
从响应的结果中得到数据,须要先运行deserialize操作。
valresult =serializer.get().deserialize[TaskResult[_]](serializedData)match{
假设result的结果小于akka的actor传输的大小,直接返回task的运行结果
casedirectResult:DirectTaskResult[_] => directResult
caseIndirectTaskResult(blockId)=>
否则,result结果太大,通过BlockManager管理,通过blockManager拿到result的数据
logDebug("Fetchingindirect task result for TID %s".format(tid))
给DAGScheduler发送GettingResultEvent事件处理,
见以下TaskSchedulerImpl.handleTaskGettingResult函数
scheduler.handleTaskGettingResult(taskSetManager,tid)
得到task的运行结果
valserializedTaskResult= sparkEnv.blockManager.getRemoteBytes(blockId)
task运行完毕,并拿结果失败,见上面的错误处理中的TaskResultLost部分。
if(!serializedTaskResult.isDefined){
/*We won't be able to get the task result if the machine that ran thetask failed
* between when the taskended and when we tried to fetch the result, or if the
* block manager had toflush the result. */
scheduler.handleFailedTask(
taskSetManager, tid,TaskState.FINISHED,Some(TaskResultLost))
return
}
对task的运行结果进行deserialized操作。
valdeserializedResult= serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
拿到运行结果,移出相应的blockid
sparkEnv.blockManager.master.removeBlock(blockId)
deserializedResult
}
result.metrics.resultSize= serializedData.limit()
见以下的TaskSchedulerImpl.handleSuccessfulTask处理函数。
scheduler.handleSuccessfulTask(taskSetManager,tid, result)
} catch{
casecnf:ClassNotFoundException =>
valloader =Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFoundwith classloader: " + loader)
caseex:Throwable =>
taskSetManager.abort("Exceptionwhile deserializing and fetching task: %s".format(ex))
}
}
})
}
TaskSchedulerImpl.handleTaskGettingResult函数:
defhandleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
taskSetManager.handleTaskGettingResult(tid)
}
taskSetManager中
defhandleTaskGettingResult(tid: Long) = {
valinfo =taskInfos(tid)
info.markGettingResult()
sched.dagScheduler.taskGettingResult(tasks(info.index),info)
}
通过DAGScheduler发起GettingResultEvent事件。
deftaskGettingResult(task: Task[_], taskInfo: TaskInfo) {
eventProcessActor! GettingResultEvent(task, taskInfo)
}
对GettingResultEvent事件的处理:事实上就是打个酱油,无实际处理操作。
caseGettingResultEvent(task,taskInfo)=>
listenerBus.post(SparkListenerTaskGettingResult(task,taskInfo))
TaskSchedulerImpl.handleSuccessfulTask处理函数:
defhandleSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_])= synchronized {
taskSetManager.handleSuccessfulTask(tid,taskResult)
}
TastSetManager中
defhandleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
valinfo =taskInfos(tid)
valindex =info.index
info.markSuccessful()
从running队列中移出此task
removeRunningTask(tid)
if(!successful(index)){
logInfo("FinishedTID %s in %d ms on %s (progress: %d/%d)".format(
tid, info.duration,info.host,tasksSuccessful,numTasks))
向dagscheduler发送success消息,
sched.dagScheduler.taskEnded(
tasks(index),Success, result.value,result.accumUpdates,info,result.metrics)
设置成功完毕的task个数加一,同一时候在successful容器中设置task相应的执行状态为true,表示成功。
//Mark successful and stop if all the tasks have succeeded.
tasksSuccessful+= 1
successful(index)= true
假设完毕的task个数,达到task的总个数,完毕此taskset,也就相当于完毕了一个rdd
if(tasksSuccessful== numTasks){
sched.taskSetFinished(this)
}
}else{
logInfo("Ignorningtask-finished event for TID " + tid+ " because task "+
index+ " has already completedsuccessfully")
}
}
DAGScheduler处理CompletionEvent的Success,,,,
caseSuccess =>
logInfo("Completed" + task)
if(event.accumUpdates!= null){
Accumulators.add(event.accumUpdates)// TODO: do this only if task wasn'tresubmitted
}
把等待运行队列中移出此task
pendingTasks(stage)-= task
stageToInfos(stage).taskInfos+= event.taskInfo-> event.taskMetrics
依据task的运行类型,处理两个类型的Task
taskmatch{
假设task是ResultTask,表示不须要shuffle操作
casert:ResultTask[_, _] =>
resultStageToJob.get(stage)match{
caseSome(job)=>
假设此运行的stage的ActiveJob中相应此task的partition存储的finished标志为false,
if(!job.finished(rt.outputId)){
设置task的完毕标志为true
job.finished(rt.outputId)= true
把job中完毕的task个数加一,同一时候检查是否全部的task都完毕,假设全部task都完毕,
从相关的容器中移出此job与相应的stage.
job.numFinished+= 1
//If the whole job has finished, remove it
if(job.numFinished== job.numPartitions){
idToActiveJob-= stage.jobId
activeJobs-= job
resultStageToJob-= stage
markStageAsFinished(stage)
jobIdToStageIdsRemove(job.jobId)
listenerBus.post(SparkListenerJobEnd(job,JobSucceeded))
}
调用ActiveJob内的JobWaiter.taskSucceeded函数,更新此task为完毕,同一时候把result传入进行输出处理。
job.listener.taskSucceeded(rt.outputId,event.result)
}
caseNone =>
logInfo("Ignoringresult from " + rt+ " because its job has finished")
}
针对shuffle的task的运行完毕,处理流程:
casesmt:ShuffleMapTask =>
valstatus =event.result.asInstanceOf[MapStatus]
valexecId =status.location.executorId
logDebug("ShuffleMapTaskfinished on " + execId)
if(failedEpoch.contains(execId)&& smt.epoch<= failedEpoch(execId)){
logInfo("Ignoringpossibly bogus ShuffleMapTask completion from "+ execId)
} else{
把shuffle的result(MapStatus)写入到stage的outputLoc中。每加入一个会把numAvailableOutputs的值加一,
当numAvailableOutputs的值==numPartitions的值时,表示shuffle的map运行完毕。
stage.addOutputLoc(smt.partitionId,status)
}
假设此stage还处在running状态,同一时候pendingTasks中全部的task已经处理完毕
if(running.contains(stage)&& pendingTasks(stage).isEmpty){
更新stage的状态
markStageAsFinished(stage)
.......................................
此处表示shuffle的stage处理完毕,把shuffleid与stage的outputLocs注冊到mapOutputTracker中。
把每个shuffletaks运行的executor与host等信息,每个task运行完毕的大小。注冊到mapoutput中。
每个task的shuffle的writer都会有shuffleid的信息,注冊成功后,
下一个stage会依据mapoutputtracker中此shuffleid的信息读取数据。
mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list=> if(list.isEmpty) nullelselist.head).toArray,
changeEpoch = true)
}
clearCacheLocs()
stage中每个partition的outputLoc默认值为Nil,假设发现有partition的值为Nil,表示有task处理失败,
又一次提交此stage.此时会把没有成功的task又一次运行。
if(stage.outputLocs.exists(_== Nil)) {
.........................................
submitStage(stage)
} else{
valnewlyRunnable= newArrayBuffer[Stage]
for(stage <-waiting) {
logInfo("Missingparents for " + stage+ ": "+ getMissingParentStages(stage))
}
此处检查以下未运行的全部的stage,假设stage(RDD)的上级shuffle依赖完毕,
或者后面全部的stage不再有shuffle的stage的全部stage,拿到这些个stage.
for(stage <-waiting ifgetMissingParentStages(stage) == Nil) {
newlyRunnable+= stage
}
运行此stage后面的全部可运行的stage,把waiting中移出要运行的stage,
waiting--= newlyRunnable
在running队列中加入要运行的新的stage.
running++= newlyRunnable
for{
stage<- newlyRunnable.sortBy(_.id)
jobId<- activeJobForStage(stage)
} {
提交下一个stage的task分配与运行。
logInfo("Submitting" + stage+ " ("+ stage.rdd+ "), which is now runnable")
submitMissingTasks(stage,jobId)
}
}
}
}
JobWaiter.taskSucceeded函数,
task完毕后的处理函数。
overridedef taskSucceeded(index: Int,result: Any): Unit = synchronized {
if(_jobFinished){
thrownewUnsupportedOperationException("taskSucceeded()called on a finished JobWaiter")
}
通过resultHandler函数把结果进行处理。此函数是生成JobWaiter时传入
resultHandler(index,result.asInstanceOf[T])
把完毕的task值加一
finishedTasks+= 1
if(finishedTasks== totalTasks) {
假设完毕的task个数等于全部的task的个数时,设置job的完毕状态为true,并设置状态为JobSucceeded
假设设置为true,表示job运行完毕,前面的等待运行完毕结束等待。
_jobFinished= true
jobResult= JobSucceeded
this.notifyAll()
}
}
Task.runTask函数实现
Task的实现分为两类,
须要进行shuffle操作的ShuffleMapTask,
不须要进行shuffle操作的ResultTask.
ResulitTask.runTask
overridedef runTask(context:TaskContext): U = {
metrics= Some(context.taskMetrics)
try{
此处通过生成task实例时也就是DAGScheduler的runJob时传入的function进行处理
比方在PairRDDFunction.saveAsHadoopDataset中定义的writeToFile函数
rdd.iterator中会依据不现的RDD的实现,运行其compute函数,
而compute函数详细运行通过业务代码中定义的如map函数传入的定义的function进行运行,
func(context,rdd.iterator(split,context))
}finally{
context.executeOnCompleteCallbacks()
}
}
ShuffleMapTask.runTask
overridedef runTask(context:TaskContext): MapStatus = {
valnumOutputSplits= dep.partitioner.numPartitions
metrics= Some(context.taskMetrics)
valblockManager= SparkEnv.get.blockManager
valshuffleBlockManager= blockManager.shuffleBlockManager
varshuffle:ShuffleWriterGroup = null
varsuccess =false
try{
通过shuffleId拿到一个shuffle的写入实例
//Obtain all the block writers for shuffle blocks.
valser =SparkEnv.get.serializerManager.get(dep.serializerClass,SparkEnv.get.conf)
shuffle= shuffleBlockManager.forMapTask(dep.shuffleId,partitionId,numOutputSplits,ser)
运行rdd.iterator操作,生成Pair,也就是Product2,依据key又一次shuffle到不同的文件里。
当全部的shuffle的task完毕后,会把此stage注冊到mapOutputTracker中,
等待下一个stage从中读取数据并运行其他操作,每个shuffle的task完毕后会生成一个MapStatus实例,
此实例主要包括有shuffle运行的executor与host等信息,每个task运行完毕的大小。
详细的shuffle数据读取可參见后面的shufle分析.
//Write the map output to its associated buckets.
for(elem <-rdd.iterator(split,context)) {
valpair =elem.asInstanceOf[Product2[Any,Any]]
valbucketId =dep.partitioner.getPartition(pair._1)
shuffle.writers(bucketId).write(pair)
}
//Commit the writes. Get the size of each bucket block (total blocksize).
vartotalBytes= 0L
vartotalTime =0L
valcompressedSizes:Array[Byte] = shuffle.writers.map{ writer: BlockObjectWriter =>
writer.commit()
writer.close()
valsize =writer.fileSegment().length
totalBytes+= size
totalTime+= writer.timeWriting()
MapOutputTracker.compressSize(size)
}
//Update shuffle metrics.
valshuffleMetrics= newShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten= totalBytes
shuffleMetrics.shuffleWriteTime= totalTime
metrics.get.shuffleWriteMetrics= Some(shuffleMetrics)
success= true
newMapStatus(blockManager.blockManagerId,compressedSizes)
}catch{ casee:Exception =>
//If there is an exception from running the task, revert the partialwrites
//and throw the exception upstream to Spark.
if(shuffle !=null&& shuffle.writers!= null){
for(writer <-shuffle.writers){
writer.revertPartialWrites()
writer.close()
}
}
throwe
}finally{
//Release the writers back to the shuffle block manager.
if(shuffle !=null&& shuffle.writers!= null){
shuffle.releaseWriters(success)
}
//Execute the callbackson task completion.
context.executeOnCompleteCallbacks()
}
}