zoukankan      html  css  js  c++  java
  • Task的运行过程分析

    Task的运行过程分析

    Task的运行通过Worker启动时生成的Executor实例进行,

    caseRegisteredExecutor(sparkProperties)=>

    logInfo("Successfullyregistered with driver")

    //Make this host instead of hostPort ?

    executor= newExecutor(executorId, Utils.parseHostPort(hostPort)._1,sparkProperties)


    通过executor实例的launchTask启动task的运行操作。


    deflaunchTask(context: ExecutorBackend, taskId: Long, serializedTask:ByteBuffer) {

    valtr = newTaskRunner(context, taskId, serializedTask)

    runningTasks.put(taskId,tr)

    threadPool.execute(tr)

    }


    生成TaskRunner线程,把task与当前的Wroker的启动的executorBackend传入,

    onyarn模式为CoarseGrainedExecutorBackend.

    通过threadPool线程池运行生成TaskRunner线程。


    TaskRunner.run函数:

    用于运行task任务的线程

    overridedefrun(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser){ () =>

    valstartTime =System.currentTimeMillis()

    SparkEvn后面在进行分析。

    SparkEnv.set(env)

    Thread.currentThread.setContextClassLoader(replClassLoader)

    valser =SparkEnv.get.closureSerializer.newInstance()

    logInfo("Runningtask ID " + taskId)

    通过execBackend更新此task的状态。设置task的状态为RUNNING.master发送StatusUpdate事件。

    execBackend.statusUpdate(taskId,TaskState.RUNNING,EMPTY_BYTE_BUFFER)

    varattemptedTask:Option[Task[Any]] = None

    vartaskStart:Long = 0

    defgcTime =ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum

    valstartGCTime= gcTime


    try{

    SparkEnv.set(env)

    Accumulators.clear()

    解析出task的资源信息。包含要运行的jar,其他资源,task定义信息

    val(taskFiles,taskJars,taskBytes)= Task.deserializeWithDependencies(serializedTask)

    更新资源信息,并把task运行的jar更新到当前ThreadClassLoader中。

    updateDependencies(taskFiles,taskJars)

    通过SparkEnv中配置的Serialize实现对task定义进行反serialize,得到Task实例。

    Task的详细实现为ShuffleMapTask或者ResultTask

    task= ser.deserialize[Task[Any]](taskBytes,Thread.currentThread.getContextClassLoader)


    假设killed的值为true,不运行当前task任务,进入catch处理。

    //If this task has been killed before we deserializedit, let's quit now. Otherwise,

    //continue executing the task.

    if(killed) {

    //Throw an exception rather than returning, because returning within atry{} block

    //causes a NonLocalReturnControl exception to be thrown. TheNonLocalReturnControl

    //exception will be caught by the catch block, leading to an incorrectExceptionFailure

    //for the task.

    throwTaskKilledException

    }


    attemptedTask= Some(task)

    logDebug("Task" + taskId +"'sepoch is " + task.epoch)

    env.mapOutputTracker.updateEpoch(task.epoch)

    生成TaskContext实例,通过Task.runTask运行task的任务,等待task运行完毕。

    //Run the actual task and measure its runtime.

    taskStart= System.currentTimeMillis()

    valvalue =task.run(taskId.toInt)

    valtaskFinish= System.currentTimeMillis()


    此时task运行结束,检查假设task是被killed的结果,进入catch处理。

    //If the task has been killed, let's fail it.

    if(task.killed){

    throwTaskKilledException

    }

    task运行的返回结果进行serialize操作。

    valresultSer =SparkEnv.get.serializer.newInstance()

    valbeforeSerialization= System.currentTimeMillis()

    valvalueBytes= resultSer.serialize(value)

    valafterSerialization= System.currentTimeMillis()

    发送监控指标

    for(m <-task.metrics){

    m.hostname= Utils.localHostName()

    m.executorDeserializeTime= (taskStart- startTime).toInt

    m.executorRunTime= (taskFinish- taskStart).toInt

    m.jvmGCTime= gcTime - startGCTime

    m.resultSerializationTime= (afterSerialization- beforeSerialization).toInt

    }


    valaccumUpdates= Accumulators.values

    Task的返回结果生成DirectTaskResult实例。并对其进行serialize操作。

    valdirectResult= newDirectTaskResult(valueBytes,accumUpdates,task.metrics.getOrElse(null))

    valserializedDirectResult= ser.serialize(directResult)

    logInfo("Serializedsize of result for " + taskId + "is " +serializedDirectResult.limit)

    检查taskresult的大小是否超过了akka的发送消息大小,

    假设是通过BlockManager来管理结果,设置RDD的存储级别为MEMORYDISK,否则表示没有达到actor消息大小,

    直接使用TaskResult,此部分信息主要是须要通过状态更新向Scheduler向送StatusUpdate事件调用。

    valserializedResult= {

    if(serializedDirectResult.limit>= akkaFrameSize- 1024) {

    logInfo("Storingresult for " + taskId + "in local BlockManager")

    valblockId =TaskResultBlockId(taskId)

    env.blockManager.putBytes(

    blockId,serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER)

    ser.serialize(newIndirectTaskResult[Any](blockId))

    } else{

    logInfo("Sendingresult for " + taskId + "directly to driver")

    serializedDirectResult

    }

    }

    通过execBackend更新此task的状态。设置task的状态为FINISHED.master发送StatusUpdate事件。

    execBackend.statusUpdate(taskId,TaskState.FINISHED,serializedResult)

    logInfo("Finishedtask ID " + taskId)

    } catch{

    出现异常,发送FAILED事件。

    caseffe:FetchFailedException => {

    valreason =ffe.toTaskEndReason

    execBackend.statusUpdate(taskId,TaskState.FAILED,ser.serialize(reason))

    }


    caseTaskKilledException => {

    logInfo("Executorkilled task " + taskId)

    execBackend.statusUpdate(taskId,TaskState.KILLED,ser.serialize(TaskKilled))

    }


    caset:Throwable => {

    valserviceTime= (System.currentTimeMillis() - taskStart).toInt

    valmetrics =attemptedTask.flatMap(t=> t.metrics)

    for(m <-metrics) {

    m.executorRunTime= serviceTime

    m.jvmGCTime= gcTime - startGCTime

    }

    valreason =ExceptionFailure(t.getClass.getName,t.toString,t.getStackTrace,metrics)

    execBackend.statusUpdate(taskId,TaskState.FAILED,ser.serialize(reason))


    //TODO: Should we exit the whole executor here? On the one hand, thefailed task may

    //have left some weird state around depending on when the exception wasthrown, but on

    //the other hand, maybe we could detect that when future tasks fail andexit then.

    logError("Exceptionin task ID " + taskId, t)

    //System.exit(1)

    }

    } finally{

    shuffleMemoryMap中移出此线程相应的shuffle的内存空间

    //TODO: Unregister shuffle memory only for ResultTask

    valshuffleMemoryMap= env.shuffleMemoryMap

    shuffleMemoryMap.synchronized{

    shuffleMemoryMap.remove(Thread.currentThread().getId)

    }

    runningTasks中移出此task

    runningTasks.remove(taskId)

    }

    }

    }


    Task运行过程的状态更新

    ExecutorBackend.statusUpdate

    onyarn模式实现类CoarseGrainedExecutorBackend,通过masteractor发送StatusUpdate事件。

    overridedefstatusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {

    driver! StatusUpdate(executorId, taskId, state, data)

    }


    master中的ExecutorBackend处理状态更新操作:

    实现类:CoarseGrainedSchedulerBackend.DriverActor

    caseStatusUpdate(executorId,taskId,state,data) =>

    通过TaskSchedulerImplstatusUpdate处理状态更新。

    scheduler.statusUpdate(taskId,state,data.value)

    假设Task状态为完毕状态,完毕状态包括(FINISHED,FAILED,KILLED,LOST)

    if(TaskState.isFinished(state)){

    if(executorActor.contains(executorId)){

    每个task占用一个cpucore,此时task完毕,把可用的core值加一

    freeCores(executorId) += 1

    在此executor上接着运行其于的task任务,此部分可參见scheduler调度过程分析中的部分说明。

    makeOffers(executorId)

    } else{

    //Ignoring the update since we don't know about the executor.

    valmsg ="Ignored task status update (%dstate %s) from unknown executor %s with ID %s"

    logWarning(msg.format(taskId,state,sender, executorId))

    }

    }


    TaskSchedulerImpl.statusUpdate函数处理流程


    defstatusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer){

    varfailedExecutor:Option[String] = None

    synchronized {

    try{

    假设Task的状态传入为Task的运行丢失,同一时候taskexecutor列表中存在

    if(state == TaskState.LOST&& taskIdToExecutorId.contains(tid)){

    得到此task运行的worker所属的executorID

    //We lost this entire executor, so remember that it's gone

    valexecId =taskIdToExecutorId(tid)

    假设此executoractiveExecutor,运行schedulerexecutorLost操作。

    包括TaskSetManager,会运行TaskSetManager.executorLost操作.

    设置当前的executorfailedExecutor,共函数最后使用。

    if(activeExecutorIds.contains(execId)){

    removeExecutor(execId)

    failedExecutor= Some(execId)

    }

    }

    taskIdToTaskSetId.get(tid)match{

    caseSome(taskSetId)=>

    假设task状态是完毕状态,非RUNNING状态。移出相应的容器中的值

    if(TaskState.isFinished(state)) {

    taskIdToTaskSetId.remove(tid)

    if(taskSetTaskIds.contains(taskSetId)){

    taskSetTaskIds(taskSetId)-= tid

    }

    taskIdToExecutorId.remove(tid)

    }

    activeTaskSets.get(taskSetId).foreach{ taskSet =>

    假设task是成功完毕,从TaskSet中移出此task,同一时候通过TaskResultGetter获取数据。

    if(state == TaskState.FINISHED){

    taskSet.removeRunningTask(tid)

    taskResultGetter.enqueueSuccessfulTask(taskSet,tid, serializedData)

    } elseif(Set(TaskState.FAILED,TaskState.KILLED,TaskState.LOST).contains(state)){

    task任务运行失败的处理部分:

    taskSet.removeRunningTask(tid)

    taskResultGetter.enqueueFailedTask(taskSet,tid, state, serializedData)

    }

    }

    caseNone =>

    logInfo("Ignoringupdate with state %s from TID %s because its task set is gone"

    .format(state, tid))

    }

    } catch{

    casee:Exception => logError("Exceptionin statusUpdate", e)

    }

    }

    假设有failedworker,通过dagScheduler处理此executor.

    //Update the DAGScheduler without holding a lock on this, since thatcan deadlock

    if(failedExecutor!= None) {

    dagScheduler.executorLost(failedExecutor.get)

    发起task运行的分配与任务运行操作。

    backend.reviveOffers()

    }

    }


    TaskStatus.LOST状态,同一时候executoractiveExecutorIds

    TaskStatus的状态为LOST时,同一时候executor是活动的executor(也就是有过运行task的情况)

    privatedefremoveExecutor(executorId: String) {

    activeExecutorIds中移出此executor

    activeExecutorIds-= executorId

    得到此executor相应的workerhost

    valhost =executorIdToHost(executorId)

    取出host相应的全部executor,并移出当前的executor

    valexecs =executorsByHost.getOrElse(host,newHashSet)

    execs-= executorId

    if(execs.isEmpty){

    executorsByHost-= host

    }

    executor相应的host容器中移出此executor

    executorIdToHost-= executorId

    此处主要是去运行TaskSetManager.executorLost函数。

    rootPool.executorLost(executorId,host)

    }


    TaskSetManager.executorLost函数:

    此函数主要处理executor导致task丢失的情况,把executor上的task又一次加入到pendingtasks列表中

    overridedefexecutorLost(execId: String, host: String) {

    logInfo("Re-queueingtasks for " + execId + "from TaskSet " + taskSet.id)


    //Re-enqueue pending tasks for this host based on the status of thecluster -- for example, a

    //task that used to have locations on only this host might now go tothe no-prefslist. Note

    //that it's okay if we add a task to the same queue twice (if it hadmultiple preferred

    //locations), because findTaskFromList will skip already-running tasks.

    又一次生成此TaskSet中的pending队列,由于当前executor的实例被移出,须要又一次生成。

    for(index <-getPendingTasksForExecutor(execId)) {

    addPendingTask(index,readding=true)

    }

    for(index <-getPendingTasksForHost(host)) {

    addPendingTask(index,readding=true)

    }


    //Re-enqueue any tasks that ran on the failed executor if this is ashuffle map stage

    假设当前的RDDshufflerdd,

    if(tasks(0).isInstanceOf[ShuffleMapTask]){

    for((tid,info) <-taskInfosifinfo.executorId== execId) {

    valindex =taskInfos(tid).index

    if(successful(index)){

    successful(index)= false

    copiesRunning(index) -= 1

    tasksSuccessful-= 1

    addPendingTask(index)

    //Tell the DAGScheduler that this task was resubmitted so that itdoesn't think our

    //stage finishes when a total of tasks.size tasks finish.

    通过DAGScheduler发送CompletionEvent处理事件,事件类型为Resubmitted,

    sched.dagScheduler.taskEnded(tasks(index),Resubmitted, null,null,info, null)

    }

    }

    }

    假设task还处于running状态,同一时候此tasklostexecutor上执行,

    //Also re-enqueue any tasks that were running on the node

    for((tid,info) <-taskInfosifinfo.running&& info.executorId== execId) {

    设置taskfailed值为true,移出此taskrunning列表中的值,又一次加入taskpendingtasks队列中。

    handleFailedTask(tid,TaskState.FAILED,None)

    }

    }


    DAGScheduler处理CompletionEvent事件。

    ...........................

    casecompletion@ CompletionEvent(task,reason,_, _, taskInfo,taskMetrics)=>

    listenerBus.post(SparkListenerTaskEnd(task,reason,taskInfo,taskMetrics))

    handleTaskCompletion(completion)

    .........................

    caseResubmitted =>

    logInfo("Resubmitted" + task+ ", so marking it as stillrunning")

    pendingTasks(stage)+= task


    (TaskState.FAILED,TaskState.KILLED,TaskState.LOST)状态

    .........................

    } elseif(Set(TaskState.FAILED,TaskState.KILLED,TaskState.LOST).contains(state)){

    taskrunning容器中移出

    taskSet.removeRunningTask(tid)

    此函数主要是解析出出错的信息。并通过TaskSchedulerImpl.handleFailedTask处理exception

    taskResultGetter.enqueueFailedTask(taskSet,tid, state, serializedData)

    }



    TaskSchedulerImpl.handleFailedTask函数:

    defhandleFailedTask(

    taskSetManager: TaskSetManager,

    tid: Long,

    taskState: TaskState,

    reason: Option[TaskEndReason]) =synchronized {

    taskSetManager.handleFailedTask(tid,taskState, reason)

    假设task不是被KILLED掉的task,又一次发起task的分配与运行操作。

    if(taskState != TaskState.KILLED){

    //Need to revive offers again now that the task set manager state hasbeen updated to

    //reflect failed tasks that need to be re-run.

    backend.reviveOffers()

    }

    }


    TaskSetManager.handleFailedTask函数流程

    TaskSetManager.handleFailedTask,函数,处理task运行的exception信息。

    defhandleFailedTask(tid: Long, state: TaskState, reason:Option[TaskEndReason]) {

    valinfo =taskInfos(tid)

    if(info.failed){

    return

    }

    removeRunningTask(tid)

    valindex =info.index

    info.markFailed()

    varfailureReason= "unknown"

    if(!successful(index)){

    logWarning("LostTID %s (task %s:%d)".format(tid,taskSet.id,index))

    copiesRunning(index) -= 1

    假设是通过TaskSetManager.executorLost函数发起的此函数调用(Task.LOST),以下的case部分不会运行,

    否则是task的运行exception情况,也就是状态更新中非Task.LOST状态时。

    //Check if the problem is a map output fetch failure. In that case,this

    //task will never succeed on any node, so tell the scheduler about it.

    reason.foreach {

    casefetchFailed:FetchFailed =>

    读取失败,移出全部此tasksettask运行。并从scheduler中移出此taskset的调度,不再运行以下流程

    logWarning("Losswas due to fetch failure from " +fetchFailed.bmAddress)

    sched.dagScheduler.taskEnded(tasks(index),fetchFailed,null,null,info, null)

    successful(index)= true

    tasksSuccessful+= 1

    sched.taskSetFinished(this)

    removeAllRunningTasks()

    return


    caseTaskKilled =>

    taskkill掉,移出此task,同一时候不再运行以下流程

    logWarning("Task%d was killed.".format(tid))

    sched.dagScheduler.taskEnded(tasks(index),reason.get, null,null,info, null)

    return


    caseef:ExceptionFailure =>

    sched.dagScheduler.taskEnded(

    tasks(index),ef, null,null,info,ef.metrics.getOrElse(null))

    if(ef.className== classOf[NotSerializableException].getName()) {

    //If the task result wasn't rerializable,there's no point in trying to re-execute it.

    logError("Task%s:%s had a not serializable result: %s; not retrying".format(

    taskSet.id,index,ef.description))

    abort("Task%s:%s had a not serializable result: %s".format(

    taskSet.id,index,ef.description))

    return

    }

    valkey =ef.description

    failureReason= "Exception failure:%s".format(ef.description)

    valnow =clock.getTime()

    val(printFull,dupCount) ={

    if(recentExceptions.contains(key)){

    val(dupCount,printTime)= recentExceptions(key)

    if(now -printTime >EXCEPTION_PRINT_INTERVAL){

    recentExceptions(key)= (0, now)

    (true,0)

    } else{

    recentExceptions(key)= (dupCount+ 1,printTime)

    (false,dupCount +1)

    }

    } else{

    recentExceptions(key)= (0, now)

    (true,0)

    }

    }

    if(printFull){

    vallocs =ef.stackTrace.map(loc=> " at%s".format(loc.toString))

    logWarning("Losswas due to %s %s %s".format(

    ef.className,ef.description,locs.mkString(" ")))

    } else{

    logInfo("Losswas due to %s [duplicate %d]".format(ef.description,dupCount))

    }


    caseTaskResultLost =>

    failureReason= "Lost result for TID %s onhost %s".format(tid, info.host)

    logWarning(failureReason)

    sched.dagScheduler.taskEnded(tasks(index),TaskResultLost, null,null,info, null)


    case_ => {}

    }

    又一次把task加入到pending的运行队列中,同一时候假设状态非KILLED的状态,设置并检查是否达到重试的最大次数

    //On non-fetch failures, re-enqueue the task as pending for a maxnumber of retries

    addPendingTask(index)

    if(state != TaskState.KILLED){

    numFailures(index) += 1

    if(numFailures(index)>= maxTaskFailures){

    logError("Task%s:%d failed %d times; aborting job".format(

    taskSet.id,index,maxTaskFailures))

    abort("Task%s:%d failed %d times (most recent failure: %s)".format(

    taskSet.id,index,maxTaskFailures,failureReason))

    }

    }

    }else{

    logInfo("Ignoringtask-lost event for TID " + tid +

    "because task " + index+ " is already finished")

    }

    }


    DAGScheduler处理taskEnded流程:

    deftaskEnded(

    task: Task[_],

    reason: TaskEndReason,

    result: Any,

    accumUpdates: Map[Long, Any],

    taskInfo: TaskInfo,

    taskMetrics: TaskMetrics) {

    eventProcessActor! CompletionEvent(task, reason, result, accumUpdates, taskInfo,taskMetrics)

    }

    处理CompletionEvent事件:

    casecompletion@ CompletionEvent(task,reason, _,_, taskInfo,taskMetrics)=>

    listenerBus.post(SparkListenerTaskEnd(task,reason,taskInfo,taskMetrics))

    handleTaskCompletion(completion)


    DAGScheduler.handleTaskCompletion

    读取失败的case,

    caseFetchFailed(bmAddress,shuffleId,mapId,reduceId)=>

    //Mark the stage that the reducer was in as unrunnable

    valfailedStage= stageIdToStage(task.stageId)

    running-= failedStage

    failed+= failedStage

    ..............................

    //Mark the map whose fetch failed as broken in the map stage

    valmapStage =shuffleToMapStage(shuffleId)

    if(mapId !=-1) {

    mapStage.removeOutputLoc(mapId,bmAddress)

    mapOutputTracker.unregisterMapOutput(shuffleId,mapId,bmAddress)

    }

    ...........................

    failed+= mapStage

    //Remember that a fetch failed now; this is used to resubmit the broken

    //stages later, after a small wait (to give other tasks the chance tofail)

    lastFetchFailureTime= System.currentTimeMillis() // TODO:Use pluggableclock

    //TODO: mark the executor as failed only if there were lots of fetchfailures on it

    if(bmAddress!= null){

    stage中可运行的partition中相应的executoridlocation所有移出。

    handleExecutorLost(bmAddress.executorId,Some(task.epoch))

    }


    caseExceptionFailure(className,description,stackTrace,metrics) =>

    //Do nothing here, left up to the TaskScheduler to decide how to handleuser failures


    caseTaskResultLost =>

    //Do nothing here; the TaskScheduler handles these failures andresubmits the task.



    TaskStatus.FINISHED状态

    此状态表示task正常完毕,

    if(state == TaskState.FINISHED){

    移出taskSet中的running队列中移出此task

    taskSet.removeRunningTask(tid)

    获取task的响应数据。

    taskResultGetter.enqueueSuccessfulTask(taskSet,tid, serializedData)


    TaskResultGetter.enqueueSuccessfulTask函数:


    defenqueueSuccessfulTask(

    taskSetManager: TaskSetManager,tid: Long, serializedData: ByteBuffer) {

    getTaskResultExecutor.execute(newRunnable {

    overridedefrun() {

    try{

    从响应的结果中得到数据,须要先运行deserialize操作。

    valresult =serializer.get().deserialize[TaskResult[_]](serializedData)match{

    假设result的结果小于akkaactor传输的大小,直接返回task的运行结果

    casedirectResult:DirectTaskResult[_] => directResult

    caseIndirectTaskResult(blockId)=>

    否则,result结果太大,通过BlockManager管理,通过blockManager拿到result的数据

    logDebug("Fetchingindirect task result for TID %s".format(tid))

    DAGScheduler发送GettingResultEvent事件处理,

    见以下TaskSchedulerImpl.handleTaskGettingResult函数

    scheduler.handleTaskGettingResult(taskSetManager,tid)

    得到task的运行结果

    valserializedTaskResult= sparkEnv.blockManager.getRemoteBytes(blockId)

    task运行完毕,并拿结果失败,见上面的错误处理中的TaskResultLost部分。

    if(!serializedTaskResult.isDefined){

    /*We won't be able to get the task result if the machine that ran thetask failed

    * between when the taskended and when we tried to fetch the result, or if the

    * block manager had toflush the result. */

    scheduler.handleFailedTask(

    taskSetManager, tid,TaskState.FINISHED,Some(TaskResultLost))

    return

    }

    task的运行结果进行deserialized操作。

    valdeserializedResult= serializer.get().deserialize[DirectTaskResult[_]](

    serializedTaskResult.get)

    拿到运行结果,移出相应的blockid

    sparkEnv.blockManager.master.removeBlock(blockId)

    deserializedResult

    }

    result.metrics.resultSize= serializedData.limit()

    见以下的TaskSchedulerImpl.handleSuccessfulTask处理函数。

    scheduler.handleSuccessfulTask(taskSetManager,tid, result)

    } catch{

    casecnf:ClassNotFoundException =>

    valloader =Thread.currentThread.getContextClassLoader

    taskSetManager.abort("ClassNotFoundwith classloader: " + loader)

    caseex:Throwable =>

    taskSetManager.abort("Exceptionwhile deserializing and fetching task: %s".format(ex))

    }

    }

    })

    }


    TaskSchedulerImpl.handleTaskGettingResult函数:


    defhandleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {

    taskSetManager.handleTaskGettingResult(tid)

    }

    taskSetManager

    defhandleTaskGettingResult(tid: Long) = {

    valinfo =taskInfos(tid)

    info.markGettingResult()

    sched.dagScheduler.taskGettingResult(tasks(info.index),info)

    }

    通过DAGScheduler发起GettingResultEvent事件。

    deftaskGettingResult(task: Task[_], taskInfo: TaskInfo) {

    eventProcessActor! GettingResultEvent(task, taskInfo)

    }


    GettingResultEvent事件的处理:事实上就是打个酱油,无实际处理操作。

    caseGettingResultEvent(task,taskInfo)=>

    listenerBus.post(SparkListenerTaskGettingResult(task,taskInfo))



    TaskSchedulerImpl.handleSuccessfulTask处理函数:

    defhandleSuccessfulTask(

    taskSetManager: TaskSetManager,

    tid: Long,

    taskResult: DirectTaskResult[_])= synchronized {

    taskSetManager.handleSuccessfulTask(tid,taskResult)

    }

    TastSetManager

    defhandleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {

    valinfo =taskInfos(tid)

    valindex =info.index

    info.markSuccessful()

    running队列中移出此task

    removeRunningTask(tid)

    if(!successful(index)){

    logInfo("FinishedTID %s in %d ms on %s (progress: %d/%d)".format(

    tid, info.duration,info.host,tasksSuccessful,numTasks))

    dagscheduler发送success消息,

    sched.dagScheduler.taskEnded(

    tasks(index),Success, result.value,result.accumUpdates,info,result.metrics)

    设置成功完毕的task个数加一,同一时候在successful容器中设置task相应的执行状态为true,表示成功。

    //Mark successful and stop if all the tasks have succeeded.

    tasksSuccessful+= 1

    successful(index)= true

    假设完毕的task个数,达到task的总个数,完毕此taskset,也就相当于完毕了一个rdd

    if(tasksSuccessful== numTasks){

    sched.taskSetFinished(this)

    }

    }else{

    logInfo("Ignorningtask-finished event for TID " + tid+ " because task "+

    index+ " has already completedsuccessfully")

    }

    }


    DAGScheduler处理CompletionEventSuccess,,,,

    caseSuccess =>

    logInfo("Completed" + task)

    if(event.accumUpdates!= null){

    Accumulators.add(event.accumUpdates)// TODO: do this only if task wasn'tresubmitted

    }

    把等待运行队列中移出此task

    pendingTasks(stage)-= task

    stageToInfos(stage).taskInfos+= event.taskInfo-> event.taskMetrics

    依据task的运行类型,处理两个类型的Task

    taskmatch{

    假设taskResultTask,表示不须要shuffle操作

    casert:ResultTask[_, _] =>

    resultStageToJob.get(stage)match{

    caseSome(job)=>

    假设此运行的stageActiveJob中相应此taskpartition存储的finished标志为false,

    if(!job.finished(rt.outputId)){

    设置task的完毕标志为true

    job.finished(rt.outputId)= true

    job中完毕的task个数加一,同一时候检查是否全部的task都完毕,假设全部task都完毕,

    从相关的容器中移出此job与相应的stage.

    job.numFinished+= 1

    //If the whole job has finished, remove it

    if(job.numFinished== job.numPartitions){

    idToActiveJob-= stage.jobId

    activeJobs-= job

    resultStageToJob-= stage

    markStageAsFinished(stage)

    jobIdToStageIdsRemove(job.jobId)

    listenerBus.post(SparkListenerJobEnd(job,JobSucceeded))

    }

    调用ActiveJob内的JobWaiter.taskSucceeded函数,更新此task为完毕,同一时候把result传入进行输出处理。

    job.listener.taskSucceeded(rt.outputId,event.result)

    }

    caseNone =>

    logInfo("Ignoringresult from " + rt+ " because its job has finished")

    }

    针对shuffletask的运行完毕,处理流程:

    casesmt:ShuffleMapTask =>

    valstatus =event.result.asInstanceOf[MapStatus]

    valexecId =status.location.executorId

    logDebug("ShuffleMapTaskfinished on " + execId)

    if(failedEpoch.contains(execId)&& smt.epoch<= failedEpoch(execId)){

    logInfo("Ignoringpossibly bogus ShuffleMapTask completion from "+ execId)

    } else{

    shuffleresult(MapStatus)写入到stageoutputLoc中。每加入一个会把numAvailableOutputs的值加一,

    numAvailableOutputs的值==numPartitions的值时,表示shufflemap运行完毕。

    stage.addOutputLoc(smt.partitionId,status)

    }

    假设此stage还处在running状态,同一时候pendingTasks中全部的task已经处理完毕

    if(running.contains(stage)&& pendingTasks(stage).isEmpty){

    更新stage的状态

    markStageAsFinished(stage)

    .......................................


    此处表示shufflestage处理完毕,把shuffleidstageoutputLocs注冊到mapOutputTracker中。

    把每个shuffletaks运行的executorhost等信息,每个task运行完毕的大小。注冊到mapoutput中。

    每个taskshufflewriter都会有shuffleid的信息,注冊成功后,

    下一个stage会依据mapoutputtracker中此shuffleid的信息读取数据。

    mapOutputTracker.registerMapOutputs(

    stage.shuffleDep.get.shuffleId,

    stage.outputLocs.map(list=> if(list.isEmpty) nullelselist.head).toArray,

    changeEpoch = true)

    }

    clearCacheLocs()

    stage中每个partitionoutputLoc默认值为Nil,假设发现有partition的值为Nil,表示有task处理失败,

    又一次提交此stage.此时会把没有成功的task又一次运行。

    if(stage.outputLocs.exists(_== Nil)) {

    .........................................

    submitStage(stage)

    } else{

    valnewlyRunnable= newArrayBuffer[Stage]

    for(stage <-waiting) {

    logInfo("Missingparents for " + stage+ ": "+ getMissingParentStages(stage))

    }

    此处检查以下未运行的全部的stage,假设stage(RDD)的上级shuffle依赖完毕,

    或者后面全部的stage不再有shufflestage的全部stage,拿到这些个stage.

    for(stage <-waiting ifgetMissingParentStages(stage) == Nil) {

    newlyRunnable+= stage

    }

    运行此stage后面的全部可运行的stage,waiting中移出要运行的stage,

    waiting--= newlyRunnable

    running队列中加入要运行的新的stage.

    running++= newlyRunnable

    for{

    stage<- newlyRunnable.sortBy(_.id)

    jobId<- activeJobForStage(stage)

    } {

    提交下一个stagetask分配与运行。

    logInfo("Submitting" + stage+ " ("+ stage.rdd+ "), which is now runnable")

    submitMissingTasks(stage,jobId)

    }

    }

    }

    }


    JobWaiter.taskSucceeded函数,

    task完毕后的处理函数。

    overridedef taskSucceeded(index: Int,result: Any): Unit = synchronized {

    if(_jobFinished){

    thrownewUnsupportedOperationException("taskSucceeded()called on a finished JobWaiter")

    }

    通过resultHandler函数把结果进行处理。此函数是生成JobWaiter时传入

    resultHandler(index,result.asInstanceOf[T])

    把完毕的task值加一

    finishedTasks+= 1

    if(finishedTasks== totalTasks) {

    假设完毕的task个数等于全部的task的个数时,设置job的完毕状态为true,并设置状态为JobSucceeded

    假设设置为true,表示job运行完毕,前面的等待运行完毕结束等待。

    _jobFinished= true

    jobResult= JobSucceeded

    this.notifyAll()

    }

    }



    Task.runTask函数实现

    Task的实现分为两类,

    须要进行shuffle操作的ShuffleMapTask,

    不须要进行shuffle操作的ResultTask.


    ResulitTask.runTask

    overridedef runTask(context:TaskContext): U = {

    metrics= Some(context.taskMetrics)

    try{

    此处通过生成task实例时也就是DAGSchedulerrunJob时传入的function进行处理

    比方在PairRDDFunction.saveAsHadoopDataset中定义的writeToFile函数

    rdd.iterator中会依据不现的RDD的实现,运行其compute函数,

    compute函数详细运行通过业务代码中定义的如map函数传入的定义的function进行运行,

    func(context,rdd.iterator(split,context))

    }finally{

    context.executeOnCompleteCallbacks()

    }

    }


    ShuffleMapTask.runTask


    overridedef runTask(context:TaskContext): MapStatus = {

    valnumOutputSplits= dep.partitioner.numPartitions

    metrics= Some(context.taskMetrics)


    valblockManager= SparkEnv.get.blockManager

    valshuffleBlockManager= blockManager.shuffleBlockManager

    varshuffle:ShuffleWriterGroup = null

    varsuccess =false


    try{

    通过shuffleId拿到一个shuffle的写入实例

    //Obtain all the block writers for shuffle blocks.

    valser =SparkEnv.get.serializerManager.get(dep.serializerClass,SparkEnv.get.conf)

    shuffle= shuffleBlockManager.forMapTask(dep.shuffleId,partitionId,numOutputSplits,ser)

    运行rdd.iterator操作,生成Pair,也就是Product2,依据key又一次shuffle到不同的文件里。

    当全部的shuffletask完毕后,会把此stage注冊到mapOutputTracker中,

    等待下一个stage从中读取数据并运行其他操作,每个shuffletask完毕后会生成一个MapStatus实例,

    此实例主要包括有shuffle运行的executorhost等信息,每个task运行完毕的大小。

    详细的shuffle数据读取可參见后面的shufle分析.

    //Write the map output to its associated buckets.

    for(elem <-rdd.iterator(split,context)) {

    valpair =elem.asInstanceOf[Product2[Any,Any]]

    valbucketId =dep.partitioner.getPartition(pair._1)

    shuffle.writers(bucketId).write(pair)

    }


    //Commit the writes. Get the size of each bucket block (total blocksize).

    vartotalBytes= 0L

    vartotalTime =0L

    valcompressedSizes:Array[Byte] = shuffle.writers.map{ writer: BlockObjectWriter =>

    writer.commit()

    writer.close()

    valsize =writer.fileSegment().length

    totalBytes+= size

    totalTime+= writer.timeWriting()

    MapOutputTracker.compressSize(size)

    }


    //Update shuffle metrics.

    valshuffleMetrics= newShuffleWriteMetrics

    shuffleMetrics.shuffleBytesWritten= totalBytes

    shuffleMetrics.shuffleWriteTime= totalTime

    metrics.get.shuffleWriteMetrics= Some(shuffleMetrics)


    success= true

    newMapStatus(blockManager.blockManagerId,compressedSizes)

    }catch{ casee:Exception =>

    //If there is an exception from running the task, revert the partialwrites

    //and throw the exception upstream to Spark.

    if(shuffle !=null&& shuffle.writers!= null){

    for(writer <-shuffle.writers){

    writer.revertPartialWrites()

    writer.close()

    }

    }

    throwe

    }finally{

    //Release the writers back to the shuffle block manager.

    if(shuffle !=null&& shuffle.writers!= null){

    shuffle.releaseWriters(success)

    }

    //Execute the callbackson task completion.

    context.executeOnCompleteCallbacks()

    }

    }


  • 相关阅读:
    第一阶段 开源框架源码模块一:持久层框架设计任务一:自定义持久层01
    【转】Controller中为什么不能写@Transactional
    SFTP上传文件的小工具
    分布式事务 10 TCC的confirm原理、日志原理、网络通信原理
    Hadoop体系概述
    分布式事务08 TCC框架示例——hmily
    分布式事务07 TCC分布式事务与购物下单示例分析
    分布式事务06 三阶段提交与刚性事务的缺陷
    分布式事务05 两阶段事务
    常见环境搭建:MySQL5.7在Windows本地多实例安装
  • 原文地址:https://www.cnblogs.com/blfshiye/p/4033569.html
Copyright © 2011-2022 走看看