zoukankan      html  css  js  c++  java
  • Spark技术内幕:Stage划分及提交源码分析

    当触发一个RDD的action后,以count为例,调用关系如下:

    1. org.apache.spark.rdd.RDD#count

    2. org.apache.spark.SparkContext#runJob

    3. org.apache.spark.scheduler.DAGScheduler#runJob

    4. org.apache.spark.scheduler.DAGScheduler#submitJob

    5. org.apache.spark.scheduler.DAGSchedulerEventProcessActor#receive(JobSubmitted)

    6. org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted

    其 中步骤五的DAGSchedulerEventProcessActor是DAGScheduler 的与外部交互的接口代理,DAGScheduler在创建时会创建名字为eventProcessActor的actor。这个actor的作用看它的实 现就一目了然了:

    [java] view plaincopy

    在CODE上查看代码片

    派生到我的代码片

    1. /** 

    2.  * The main event loop of the DAG scheduler. 

    3.  */  

    4. def receive = {  

    5.   case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>  

    6.     dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,  

    7.       listener, properties) // 提交job,来自与RDD->SparkContext->DAGScheduler的消息。之所以在这需要在这里中转一下,是为了模块功能的一致性。  

    8.   

    9.   case StageCancelled(stageId) => // 消息源org.apache.spark.ui.jobs.JobProgressTab,在GUI上显示一个SparkContext的Job的执行状态。  

    10.     // 用户可以cancel一个Stage,会通过SparkContext->DAGScheduler 传递到这里。  

    11.     dagScheduler.handleStageCancellation(stageId)  

    12.   

    13.   case JobCancelled(jobId) => // 来自于org.apache.spark.scheduler.JobWaiter的消息。取消一个Job  

    14.     dagScheduler.handleJobCancellation(jobId)  

    15.   

    16.   case JobGroupCancelled(groupId) => // 取消整个Job Group  

    17.     dagScheduler.handleJobGroupCancelled(groupId)  

    18.   

    19.   case AllJobsCancelled => //取消所有Job  

    20.     dagScheduler.doCancelAllJobs()  

    21.   

    22.   case ExecutorAdded(execId, host) =& gt; // TaskScheduler得到一个Executor被添加的消息。具体来自 org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers  

    23.     dagScheduler.handleExecutorAdded(execId, host)  

    24.   

    25.   case ExecutorLost(execId) => //来自TaskScheduler  

    26.     dagScheduler.handleExecutorLost(execId)  

    27.   

    28.   case BeginEvent(task, taskInfo) => // 来自TaskScheduler  

    29.     dagScheduler.handleBeginEvent(task, taskInfo)  

    30.   

    31.   case GettingResultEvent(taskInfo) => //处理获得TaskResult信息的消息  

    32.     dagScheduler.handleGetTaskResult(taskInfo)  

    33.   

    34.   case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => //来自TaskScheduler,报告task是完成或者失败  

    35.     dagScheduler.handleTaskCompletion(completion)  

    36.   

    37.   case TaskSetFailed(taskSet, reason) => //来自TaskScheduler,要么TaskSet失败次数超过阈值或者由于Job Cancel。  

    38.     dagScheduler.handleTaskSetFailed(taskSet, reason)  

    39.   

    40.   case ResubmitFailedStages => //当一个Stage处理失败时,重试。来自org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion  

    41.     dagScheduler.resubmitFailedStages()  

    42. }  

    总 结一下org.apache.spark.scheduler.DAGSchedulerEventProcessActor的作用:可以把他理解成 DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节,也更易于理解其逻辑;也降低了维护成本,将DAGScheduler的比较 复杂功能接口化。

    handleJobSubmitted

    org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted 首先会根据RDD创建finalStage。finalStage,顾名思义,就是最后的那个Stage。然后创建job,最后提交。提交的job如果满 足一下条件,那么它将以本地模式运行:

    1)spark.localExecution.enabled设置为true  并且 2)用户程序显式指定可以本地运行 并且 3)finalStage的没有父Stage 并且 4)仅有一个partition

    3)和 4)的话主要为了任务可以快速执行;如果有多个stage或者多个partition的话,本地运行可能会因为本机的计算资源的问题而影响任务的计算速度。

    要 理解什么是Stage,首先要搞明白什么是Task。Task是在集群上运行的基本单位。一个Task负责处理RDD的一个partition。RDD的 多个patition会分别由不同的Task去处理。当然了这些Task的处理逻辑完全是一致的。这一组Task就组成了一个Stage。有两种 Task:

    1.  org.apache.spark.scheduler.ShuffleMapTask

    2.  org.apache.spark.scheduler.ResultTask

    ShuffleMapTask 根据Task的partitioner将计算结果放到不同的bucket中。而ResultTask将计算结果发送回Driver Application。一个Job包含了多个Stage,而Stage是由一组完全相同的Task组成的。最后的Stage包含了一组 ResultTask。

    在用户触发了一个action后, 比如count,collect,SparkContext会通过runJob的函数开始进行任务提交。最后会通过DAG的event processor 传递到DAGScheduler本身的handleJobSubmitted,它首先会划分Stage,提交Stage,提交Task。至此,Task就 开始在运行在集群上了。

    一个Stage的开始就是从外部存储或者shuffle结果中读取数据;一个Stage的结束就是由于发生shuffle或者生成结果时。

    创建finalStage

    handleJobSubmitted 通过调用newStage来创建finalStage:

    [java] view plaincopy

    在CODE上查看代码片

    派生到我的代码片

    1. finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)  


    创 建一个result stage,或者说finalStage,是通过调用 org.apache.spark.scheduler.DAGScheduler#newStage完成的;而创建一个shuffle stage,需要通过调用org.apache.spark.scheduler.DAGScheduler#newOrUsedStage。 

    [java] view plaincopy

    在CODE上查看代码片

    派生到我的代码片

    1. private def newStage(  

    2.       rdd: RDD[_],  

    3.       numTasks: Int,  

    4.       shuffleDep: Option[ShuffleDependency[_, _, _]],  

    5.       jobId: Int,  

    6.       callSite: CallSite)  

    7.     : Stage =  

    8.   {  

    9.     val id = nextStageId.getAndIncrement()  

    10.     val stage =  

    11.       new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)  

    12.     stageIdToStage(id) = stage  

    13.     updateJobIdStageIdMaps(jobId, stage)  

    14.     stage  

    15.   }  


    对于result 的final stage来说,传入的shuffleDep是None。

    我们知道,RDD通过org.apache.spark.rdd.RDD#getDependencies可以获得它依赖的parent RDD。而Stage也可能会有parent Stage。看一个RDD论文的Stage划分吧:

     

    一个stage的边界,输入是外部的存储或者一个stage shuffle的结果;输入则是Job的结果(result task对应的stage)或者shuffle的结果。

    上图的话stage3的输入则是RDD A和RDD F shuffle的结果。而A和F由于到B和G需要shuffle,因此需要划分到不同的stage。

    从源码实现的角度来看,通过触发action也就是最后一个RDD创建final stage(上图的stage 3),我们注意到new Stage的第五个参数就是该Stage的parent Stage:通过rdd和job id获取:

    [java] view plaincopy

    在CODE上查看代码片

    派生到我的代码片

    1. // 生成rdd的parent Stage。没遇到一个ShuffleDependency,就会生成一个Stage  

    2.   private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {  

    3.     val parents = new HashSet[Stage] //存储parent stage  

    4.     val visited = new HashSet[RDD[_]] //存储已经被访问到得RDD  

    5.     // We are manually maintaining a stack here to prevent StackOverflowError  

    6.     // caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。  

    7.     val waitingForVisit = new Stack[RDD[_]]  

    8.     def visit(r: RDD[_]) {  

    9.       if (!visited(r)) {  

    10.         visited += r  

    11.         // Kind of ugly: need to register RDDs with the cache here since  

    12.         // we can't do it in its constructor because # of partitions is unknown  

    13.         for (dep <- r.dependencies) {  

    14.           dep match {  

    15.             case shufDep: ShuffleDependency[_, _, _] => // 在ShuffleDependency时需要生成新的stage  

    16.               parents += getShuffleMapStage(shufDep, jobId)  

    17.             case _ =>  

    18.               waitingForVisit.push(dep.rdd) //不是ShuffleDependency,那么就属于同一个Stage  

    19.           }  

    20.         }  

    21.       }  

    22.     }  

    23.     waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd  

    24.     while (!waitingForVisit.isEmpty) { //只要stack不为空,则一直处理。  

    25.       visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage  

    26.     }  

    27.     parents.toList  

    28.   }  

    生成了finalStage后,就需要提交Stage了。

    [java] view plaincopy

    在CODE上查看代码片

    派生到我的代码片

    1. // 提交Stage,如果有parent Stage没有提交,那么递归提交它。  

    2. private def submitStage(stage: Stage) {  

    3.   val jobId = activeJobForStage(stage)  

    4.   if (jobId.isDefined) {  

    5.     logDebug("submitStage(" + stage + ")")  

    6.     // 如果当前stage不在等待其parent stage的返回,并且 不在运行的状态, 并且 没有已经失败(失败会有重试机制,不会通过这里再次提交)  

    7.     if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {  

    8.       val missing = getMissingParentStages(stage).sortBy(_.id)  

    9.       logDebug("missing: " + missing)  

    10.       if (missing == Nil) { // 如果所有的parent stage都已经完成,那么提交该stage所包含的task  

    11.         logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")  

    12.         submitMissingTasks(stage, jobId.get)  

    13.       } else {  

    14.         for (parent <- missing) { // 有parent stage为完成,则递归提交它  

    15.           submitStage(parent)  

    16.         }  

    17.         waitingStages += stage  

    18.       }  

    19.     }  

    20.   } else {  

    21.     abortStage(stage, "No active job for stage " + stage.id)  

    22.   }  

    23. }  

    DAGScheduler将Stage划分完成后,提交实际上是通过把Stage转换为TaskSet,然后通过TaskScheduler将计算任务最终提交到集群。其所在的位置如下图所示。

    接下来,将分析Stage是如何转换为TaskSet,并最终提交到Executor去运行的。

  • 相关阅读:
    自然语言交流系统 phxnet团队 创新实训 项目博客 (十一)
    install ubuntu on Android mobile phone
    Mac OS, Mac OSX 与Darwin
    About darwin OS
    自然语言交流系统 phxnet团队 创新实训 项目博客 (十)
    Linux下编译安装qemu和libvirt
    libvirt(virsh命令总结)
    深入浅出 kvm qemu libvirt
    自然语言交流系统 phxnet团队 创新实训 项目博客 (九)
    自然语言交流系统 phxnet团队 创新实训 项目博客 (八)
  • 原文地址:https://www.cnblogs.com/wzyxidian/p/4853618.html
Copyright © 2011-2022 走看看