zoukankan      html  css  js  c++  java
  • 【Spark2.0源码学习】-10.Task执行与回馈

         通过上一节内容,DriverEndpoint最终生成多个可执行的TaskDescription对象,并向各个ExecutorEndpoint发送LaunchTask指令,本节内容将关注ExecutorEndpoint如何处理LaunchTask指令,处理完成后如何回馈给DriverEndpoint,以及整个job最终如何多次调度直至结束。
     
    一、Task的执行流程
         承接上一节内容,Executor接受LaunchTask指令后,开启一个新线程TaskRunner解析RDD,并调用RDD的compute方法,归并函数得到最终任务执行结果
         
    • ExecutorEndpoint接受到LaunchTask指令后,解码出TaskDescription,调用Executor的launchTask方法
    • Executor创建一个TaskRunner线程,并启动线程,同时将改线程添加到Executor的成员对象中,代码如下:
    private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
    runningTasks.put(taskDescription.taskId, taskRunner)
    • TaskRunner
      • 首先向DriverEndpoint发送任务最新状态为RUNNING
      • 从TaskDescription解析出Task,并调用Task的run方法
    • Task
      • 创建TaskContext以及CallerContext(与HDFS交互的上下文对象)
      • 执行Task的runTask方法
        • 如果Task实例为ShuffleMapTask:解析出RDD以及ShuffleDependency信息,调用RDD的compute()方法将结果写Writer中(Writer这里不介绍,可以作为黑盒理解,比如写入一个文件中),返回MapStatus对象
        • 如果Task实例为ResultTask:解析出RDD以及合并函数信息,调用函数将调用后的结果返回
    • TaskRunner将Task执行的结果序列化,再次向DriverEndpoint发送任务最新状态为FINISHED
     
    二、Task的回馈流程
         TaskRunner执行结束后,都将执行状态发送至DriverEndpoint,DriverEndpoint最终反馈指令CompletionEvent至DAGSchedulerEventProcessLoop中
         
    • DriverEndpoint接受到StatusUpdate消息后,调用TaskScheduler的statusUpdate(taskId, state, result)方法
    • TaskScheduler如果任务结果是完成,那么清除该任务处理中的状态,并调动TaskResultGetter相关方法,关键代码如下:
    val taskSet = taskIdToTaskSetManager.get(tid)
     
    taskIdToTaskSetManager.remove(tid)
    taskIdToExecutorId.remove(tid).foreach { executorId =>
      executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
    }
    taskSet.removeRunningTask(tid)
     
    if (state == TaskState.FINISHED) {
      taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
    } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
      taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
    }
    • TaskResultGetter启动线程启动线程【task-result-getter】进行相关处理
      • 通过解析或者远程获取得到Task的TaskResult对象
      • 调用TaskSet的handleSuccessfulTask方法,TaskSet的handleSuccessfulTask方法直接调用TaskSetManager的handleSuccessfulTask方法
    • TaskSetManager
      • 更新内部TaskInfo对象状态,并将该Task从运行中Task的集合删除,代码如下:
    val info = taskInfos(tid)
    info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
    removeRunningTask(tid)
      • 调用DAGScheduler的taskEnded方法,关键代码如下:
    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
    • DAGScheduler向DAGSchedulerEventProcessLoop存入CompletionEvent指令,CompletionEvent对象定义如下
    private[scheduler] case class CompletionEvent(
        task: Task[_],
        reason: TaskEndReason,
        result: Any,
        accumUpdates: Seq[AccumulatorV2[_, _]],
        taskInfo: TaskInfo)
      extends DAGSchedulerEvent
     
    三、Task的迭代流程
         DAGSchedulerEventProcessLoop中针对于CompletionEvent指令,调用DAGScheduler进行处理,DAGScheduler更新Stage与该Task的关系状态,如果Stage下Task都返回,则做下一层Stage的任务拆解与运算工作,直至Job被执行完毕
      
    • DAGSchedulerEventProcessLoop接收到CompletionEvent指令后,调用DAGScheduler的handleTaskCompletion方法
    • DAGScheduler根据Task的类型分别处理
    • 如果Task为ShuffleMapTask
      • 待回馈的Partitions减取当前partitionId
      • 如果所有task都返回,则markStageAsFinished(shuffleStage),同时向MapOutputTrackerMaster注册MapOutputs信息,且markMapStageJobAsFinished
      • 调用submitWaitingChildStages(shuffleStage)进行下层Stages的处理,从而迭代处理最终处理到ResultTask,job结束,关键代码如下:
    private def submitWaitingChildStages(parent: Stage) {
       ...
      val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
      waitingStages --= childStages
      for (stage <- childStages.sortBy(_.firstJobId)) {
        submitStage(stage)
      }
    }
    • 如果Task为ResultTask
      • 改job的partitions都已返回,则markStageAsFinished(resultStage),并cleanupStateForJobAndIndependentStages(job),关键代码如下
    for (stage <- stageIdToStage.get(stageId)) {
      if (runningStages.contains(stage)) {
        logDebug("Removing running stage %d".format(stageId))
        runningStages -= stage
      }
      for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
        shuffleIdToMapStage.remove(k)
      }
      if (waitingStages.contains(stage)) {
        logDebug("Removing stage %d from waiting set.".format(stageId))
        waitingStages -= stage
      }
      if (failedStages.contains(stage)) {
        logDebug("Removing stage %d from failed set.".format(stageId))
        failedStages -= stage
      }
    }
    // data structures based on StageId
    stageIdToStage -= stageId
    jobIdToStageIds -= job.jobId
    jobIdToActiveJob -= job.jobId
    activeJobs -= job
         至此,用户编写的代码最终调用Spark分布式计算完毕。
  • 相关阅读:
    图上两点之间的第k最短路径的长度 ACM-ICPC 2018 沈阳赛区网络预赛 D. Made In Heaven
    ACM-ICPC 2018 徐州赛区网络预赛 B. BE, GE or NE
    poj 1986
    ACM-ICPC 2018 徐州赛区网络预赛 A. Hard to prepare
    ACM-ICPC 2018 徐州赛区网络预赛 G. Trace
    hdu 5533
    ACM Changchun 2015 L . House Building
    ACM Changchun 2015 J. Chip Factory
    一些小程序
    ACM-ICPC 2018 徐州赛区网络预赛 H. Ryuji doesn't want to study
  • 原文地址:https://www.cnblogs.com/hframe/p/6970818.html
Copyright © 2011-2022 走看看