zoukankan      html  css  js  c++  java
  • Spark技术内幕:Stage划分及提交源码分析

    http://blog.csdn.net/anzhsoft/article/details/39859463

    当触发一个RDD的action后,以count为例,调用关系如下:

    1. org.apache.spark.rdd.RDD#count
    2. org.apache.spark.SparkContext#runJob
    3. org.apache.spark.scheduler.DAGScheduler#runJob
    4. org.apache.spark.scheduler.DAGScheduler#submitJob
    5. org.apache.spark.scheduler.DAGSchedulerEventProcessActor#receive(JobSubmitted)
    6. org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted

    其中步骤五的DAGSchedulerEventProcessActor是DAGScheduler 的与外部交互的接口代理,DAGScheduler在创建时会创建名字为eventProcessActor的actor。这个actor的作用看它的实现就一目了然了:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * The main event loop of the DAG scheduler. 
    3.  */  
    4. def receive = {  
    5.   case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>  
    6.     dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,  
    7.       listener, properties) // 提交job,来自与RDD->SparkContext->DAGScheduler的消息。之所以在这需要在这里中转一下,是为了模块功能的一致性。  
    8.   
    9.   case StageCancelled(stageId) => // 消息源org.apache.spark.ui.jobs.JobProgressTab,在GUI上显示一个SparkContext的Job的执行状态。  
    10.     // 用户可以cancel一个Stage,会通过SparkContext->DAGScheduler 传递到这里。  
    11.     dagScheduler.handleStageCancellation(stageId)  
    12.   
    13.   case JobCancelled(jobId) => // 来自于org.apache.spark.scheduler.JobWaiter的消息。取消一个Job  
    14.     dagScheduler.handleJobCancellation(jobId)  
    15.   
    16.   case JobGroupCancelled(groupId) => // 取消整个Job Group  
    17.     dagScheduler.handleJobGroupCancelled(groupId)  
    18.   
    19.   case AllJobsCancelled => //取消所有Job  
    20.     dagScheduler.doCancelAllJobs()  
    21.   
    22.   case ExecutorAdded(execId, host) => // TaskScheduler得到一个Executor被添加的消息。具体来自org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers  
    23.     dagScheduler.handleExecutorAdded(execId, host)  
    24.   
    25.   case ExecutorLost(execId) => //来自TaskScheduler  
    26.     dagScheduler.handleExecutorLost(execId)  
    27.   
    28.   case BeginEvent(task, taskInfo) => // 来自TaskScheduler  
    29.     dagScheduler.handleBeginEvent(task, taskInfo)  
    30.   
    31.   case GettingResultEvent(taskInfo) => //处理获得TaskResult信息的消息  
    32.     dagScheduler.handleGetTaskResult(taskInfo)  
    33.   
    34.   case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => //来自TaskScheduler,报告task是完成或者失败  
    35.     dagScheduler.handleTaskCompletion(completion)  
    36.   
    37.   case TaskSetFailed(taskSet, reason) => //来自TaskScheduler,要么TaskSet失败次数超过阈值或者由于Job Cancel。  
    38.     dagScheduler.handleTaskSetFailed(taskSet, reason)  
    39.   
    40.   case ResubmitFailedStages => //当一个Stage处理失败时,重试。来自org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion  
    41.     dagScheduler.resubmitFailedStages()  
    42. }  

    总结一下org.apache.spark.scheduler.DAGSchedulerEventProcessActor的作用:可以把他理解成DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节,也更易于理解其逻辑;也降低了维护成本,将DAGScheduler的比较复杂功能接口化。


    handleJobSubmitted

    org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted首先会根据RDD创建finalStage。finalStage,顾名思义,就是最后的那个Stage。然后创建job,最后提交。提交的job如果满足一下条件,那么它将以本地模式运行:

    1)spark.localExecution.enabled设置为true  并且 2)用户程序显式指定可以本地运行 并且 3)finalStage的没有父Stage 并且 4)仅有一个partition

    3)和 4)的话主要为了任务可以快速执行;如果有多个stage或者多个partition的话,本地运行可能会因为本机的计算资源的问题而影响任务的计算速度。

    要理解什么是Stage,首先要搞明白什么是Task。Task是在集群上运行的基本单位。一个Task负责处理RDD的一个partition。RDD的多个patition会分别由不同的Task去处理。当然了这些Task的处理逻辑完全是一致的。这一组Task就组成了一个Stage。有两种Task:

    1.  org.apache.spark.scheduler.ShuffleMapTask
    2.  org.apache.spark.scheduler.ResultTask

    ShuffleMapTask根据Task的partitioner将计算结果放到不同的bucket中。而ResultTask将计算结果发送回Driver Application。一个Job包含了多个Stage,而Stage是由一组完全相同的Task组成的。最后的Stage包含了一组ResultTask。

    在用户触发了一个action后,比如count,collect,SparkContext会通过runJob的函数开始进行任务提交。最后会通过DAG的event processor 传递到DAGScheduler本身的handleJobSubmitted,它首先会划分Stage,提交Stage,提交Task。至此,Task就开始在运行在集群上了。

    一个Stage的开始就是从外部存储或者shuffle结果中读取数据;一个Stage的结束就是由于发生shuffle或者生成结果时。


    创建finalStage

    handleJobSubmitted 通过调用newStage来创建finalStage:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)  

    创建一个result stage,或者说finalStage,是通过调用org.apache.spark.scheduler.DAGScheduler#newStage完成的;而创建一个shuffle stage,需要通过调用org.apache.spark.scheduler.DAGScheduler#newOrUsedStage。 

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. private def newStage(  
    2.       rdd: RDD[_],  
    3.       numTasks: Int,  
    4.       shuffleDep: Option[ShuffleDependency[_, _, _]],  
    5.       jobId: Int,  
    6.       callSite: CallSite)  
    7.     : Stage =  
    8.   {  
    9.     val id = nextStageId.getAndIncrement()  
    10.     val stage =  
    11.       new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)  
    12.     stageIdToStage(id) = stage  
    13.     updateJobIdStageIdMaps(jobId, stage)  
    14.     stage  
    15.   }  

    对于result 的final stage来说,传入的shuffleDep是None。

    我们知道,RDD通过org.apache.spark.rdd.RDD#getDependencies可以获得它依赖的parent RDD。而Stage也可能会有parent Stage。看一个RDD论文的Stage划分吧:


    一个stage的边界,输入是外部的存储或者一个stage shuffle的结果;输入则是Job的结果(result task对应的stage)或者shuffle的结果。

    上图的话stage3的输入则是RDD A和RDD F shuffle的结果。而A和F由于到B和G需要shuffle,因此需要划分到不同的stage。

    从源码实现的角度来看,通过触发action也就是最后一个RDD创建final stage(上图的stage 3),我们注意到new Stage的第五个参数就是该Stage的parent Stage:通过rdd和job id获取:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. // 生成rdd的parent Stage。没遇到一个ShuffleDependency,就会生成一个Stage  
    2.   private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {  
    3.     val parents = new HashSet[Stage] //存储parent stage  
    4.     val visited = new HashSet[RDD[_]] //存储已经被访问到得RDD  
    5.     // We are manually maintaining a stack here to prevent StackOverflowError  
    6.     // caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。  
    7.     val waitingForVisit = new Stack[RDD[_]]  
    8.     def visit(r: RDD[_]) {  
    9.       if (!visited(r)) {  
    10.         visited += r  
    11.         // Kind of ugly: need to register RDDs with the cache here since  
    12.         // we can't do it in its constructor because # of partitions is unknown  
    13.         for (dep <- r.dependencies) {  
    14.           dep match {  
    15.             case shufDep: ShuffleDependency[_, _, _] => // 在ShuffleDependency时需要生成新的stage  
    16.               parents += getShuffleMapStage(shufDep, jobId)  
    17.             case _ =>  
    18.               waitingForVisit.push(dep.rdd) //不是ShuffleDependency,那么就属于同一个Stage  
    19.           }  
    20.         }  
    21.       }  
    22.     }  
    23.     waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd  
    24.     while (!waitingForVisit.isEmpty) { //只要stack不为空,则一直处理。  
    25.       visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage  
    26.     }  
    27.     parents.toList  
    28.   }  

    生成了finalStage后,就需要提交Stage了。

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. // 提交Stage,如果有parent Stage没有提交,那么递归提交它。  
    2. private def submitStage(stage: Stage) {  
    3.   val jobId = activeJobForStage(stage)  
    4.   if (jobId.isDefined) {  
    5.     logDebug("submitStage(" + stage + ")")  
    6.     // 如果当前stage不在等待其parent stage的返回,并且 不在运行的状态, 并且 没有已经失败(失败会有重试机制,不会通过这里再次提交)  
    7.     if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {  
    8.       val missing = getMissingParentStages(stage).sortBy(_.id)  
    9.       logDebug("missing: " + missing)  
    10.       if (missing == Nil) { // 如果所有的parent stage都已经完成,那么提交该stage所包含的task  
    11.         logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")  
    12.         submitMissingTasks(stage, jobId.get)  
    13.       } else {  
    14.         for (parent <- missing) { // 有parent stage为完成,则递归提交它  
    15.           submitStage(parent)  
    16.         }  
    17.         waitingStages += stage  
    18.       }  
    19.     }  
    20.   } else {  
    21.     abortStage(stage, "No active job for stage " + stage.id)  
    22.   }  
    23. }  


    DAGScheduler将Stage划分完成后,提交实际上是通过把Stage转换为TaskSet,然后通过TaskScheduler将计算任务最终提交到集群。其所在的位置如下图所示。


    接下来,将分析Stage是如何转换为TaskSet,并最终提交到Executor去运行的。

    在上文《Spark技术内幕:Stage划分及提交源码分析》中,我们分析了Stage的生成和提交。但是Stage的提交,只是DAGScheduler完成了对DAG的划分,生成了一个计算拓扑,即需要按照顺序计算的Stage,Stage中包含了可以以partition为单位并行计算的Task。我们并没有分析Stage中得Task是如何生成并且最终提交到Executor中去的。

    这就是本文的主题。

    从org.apache.spark.scheduler.DAGScheduler#submitMissingTasks开始,分析Stage是如何生成TaskSet的。

    如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tasks。

    org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的计算流程如下:

    1. 首先得到RDD中需要计算的partition,对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成。
    2. 序列化task的binary。Executor可以通过广播变量得到它。每个task运行的时候首先会反序列化。这样在不同的executor上运行的task是隔离的,不会相互影响。
    3. 为每个需要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage,生成一个ResultTask类型的task
    4. 确保Task是可以被序列化的。因为不同的cluster有不同的taskScheduler,在这里判断可以简化逻辑;保证TaskSet的task都是可以序列化的
    5. 通过TaskScheduler提交TaskSet。
    TaskSet就是可以做pipeline的一组完全相同的task,每个task的处理逻辑完全相同,不同的是处理数据,每个task负责处理一个partition。pipeline,可以称为大数据处理的基石,只有数据进行pipeline处理,才能将其放到集群中去运行。对于一个task来说,它从数据源获得逻辑,然后按照拓扑顺序,顺序执行(实际上是调用rdd的compute)。
    TaskSet是一个数据结构,存储了这一组task:
    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. private[spark] class TaskSet(  
    2.     val tasks: Array[Task[_]],  
    3.     val stageId: Int,  
    4.     val attempt: Int,  
    5.     val priority: Int,  
    6.     val properties: Properties) {  
    7.     val id: String = stageId + "." + attempt  
    8.   
    9.   override def toString: String = "TaskSet " + id  
    10. }  


    管理调度这个TaskSet的时org.apache.spark.scheduler.TaskSetManager,TaskSetManager会负责task的失败重试;跟踪每个task的执行状态;处理locality-aware的调用。
    详细的调用堆栈如下:
    1. org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
    2. org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager
    3. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers
    4. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers
    5. org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers
    6. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks
    7. org.apache.spark.executor.CoarseGrainedExecutorBackend.receiveWithLogging#launchTask
    8. org.apache.spark.executor.Executor#launchTask

    首先看一下org.apache.spark.executor.Executor#launchTask:
    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. def launchTask(  
    2.     context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {  
    3.   val tr = new TaskRunner(context, taskId, taskName, serializedTask)  
    4.   runningTasks.put(taskId, tr)  
    5.   threadPool.execute(tr) // 开始在executor中运行  
    6. }  


    TaskRunner会从序列化的task中反序列化得到task,这个需要看 org.apache.spark.executor.Executor.TaskRunner#run 的实现:task.run(taskId.toInt)。而task.run的实现是:
    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. final def run(attemptId: Long): T = {  
    2.    context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)  
    3.    context.taskMetrics.hostname = Utils.localHostName()  
    4.    taskThread = Thread.currentThread()  
    5.    if (_killed) {  
    6.      kill(interruptThread = false)  
    7.    }  
    8.    runTask(context)  
    9.  }  

    对于原来提到的两种Task,即
    1.  org.apache.spark.scheduler.ShuffleMapTask
    2.  org.apache.spark.scheduler.ResultTask
    分别实现了不同的runTask:
    org.apache.spark.scheduler.ResultTask#runTask即顺序调用rdd的compute,通过rdd的拓扑顺序依次对partition进行计算:
    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. override def runTask(context: TaskContext): U = {  
    2.   // Deserialize the RDD and the func using the broadcast variables.  
    3.   val ser = SparkEnv.get.closureSerializer.newInstance()  
    4.   val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](  
    5.     ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)  
    6.   
    7.   metrics = Some(context.taskMetrics)  
    8.   try {  
    9.     func(context, rdd.iterator(partition, context))  
    10.   } finally {  
    11.     context.markTaskCompleted()  
    12.   }  
    13. }  


    而org.apache.spark.scheduler.ShuffleMapTask#runTask则是写shuffle的结果,

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
    1. override def runTask(context: TaskContext): MapStatus = {  
    2.   // Deserialize the RDD using the broadcast variable.  
    3.   val ser = SparkEnv.get.closureSerializer.newInstance()  
    4.   val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](  
    5.     ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)  
    6.     //此处的taskBinary即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的  
    7.   
    8.   metrics = Some(context.taskMetrics)  
    9.   var writer: ShuffleWriter[Any, Any] = null  
    10.   try {  
    11.     val manager = SparkEnv.get.shuffleManager  
    12.     writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)  
    13.     writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 将rdd计算的结果写入memory或者disk  
    14.     return writer.stop(success = true).get  
    15.   } catch {  
    16.     case e: Exception =>  
    17.       if (writer != null) {  
    18.         writer.stop(success = false)  
    19.       }  
    20.       throw e  
    21.   } finally {  
    22.     context.markTaskCompleted()  
    23.   }  
    24. }  


    这两个task都不要按照拓扑顺序调用rdd的compute来完成对partition的计算,不同的是ShuffleMapTask需要shuffle write,以供child stage读取shuffle的结果。 对于这两个task都用到的taskBinary,即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的。

    通过上述几篇博文,实际上我们已经粗略的分析了从用户定义SparkContext开始,集群是如果为每个Application分配Executor的,回顾一下这个序列图:

    还有就是用户触发某个action,集群是如何生成DAG,如果将DAG划分为可以成Stage,已经Stage是如何将这些可以pipeline执行的task提交到Executor去执行的。

  • 相关阅读:
    (转帖) oracle是否归档模式及修改模式
    (转帖) Oracle实例恢复(Oracle instance recovery)
    实习第一周总结
    UML类图几种关系的总结
    表格排序
    利用js查找页面中的内链,外链
    事件机制(事件冒泡与事件捕获)
    谈 CSS 模块化
    初学后台框架总结篇二——快速了解CI框架
    初学后台框架总结篇一——学习过程
  • 原文地址:https://www.cnblogs.com/bluejoe/p/5115906.html
Copyright © 2011-2022 走看看