多个Job如何分配计算资源:
调度模块分为:DAGScheduler和TaskScheduler
计算任务按DAG分阶段提交集群
用户只需要关心如何创建RDD来操作,无需关注底层的调度,除非要优化
DAGScheduler负责分析应用,建立DAG,划分Stage,Stage由并发Task(Task逻辑相同,作用于不同数据Partition)构成,DAG的实现在不同资源管理框架(L、S、M、Y)下一样
Task提交到TaskScheduler,TaskScheduler通过Cluster Manager分配Worker的Executor启动Task。Executor先看当前任务有无缓存,没有则开始计算。计算结果会回传到Driver或保存在本地。不同资源管理框架实现方式不同,主要是实现org.apache.spark.scheduler.TaskSchedulerImpl
调度模块的三个类:org.apache.spark.scheduler下的
DAGScheduler、SchedulerBackend、TaskScheduler
这三个类构成了任务调度逻辑图:
SchedulerBackend负责向当前计算的Task分配Executor,并启动Task,完成计算调度,使用reviveOffers实现,不同运行模式加入不同的逻辑
TaskScheduler为SparkContext调度任务,也就是从DAGScheduler接收Task,并提交,执行慢的会备份任务。其中的TaskSchedulerImpl会调用Backend的reviveOffers:提交新任务、任务执行失败、Executor不可用、任务执行慢重新分配资源
TaskScheduler和Backend的关系图:
每个Backend对应唯一TaskScheduler,都由SparkContext创建和持有
其中Mesos有两种Backend,粗coarsegrained和细fine-grained,将每个Spark Task映射成Mesos Task 从而让Mesos来调度和执行Task
DAGScheduler:
DAG划分Stage,Stage包含相同逻辑的Task,在不同资源框架实现相同。
SparkContext的runJob调用了DAGScheduler的runJob,然后开始划分、生成工作。
- DAGS的创建:
SparkContext调用DAGS的构造函数,在DAGS创建前需要先创建TaskS,因为DAGS要引用它
在DAGS构造函数源码中:
MapOutputTrackerMaster运行在Driver中管理Shuffle Map Task,用来获取Shuffle输出的位置
BlockManagerMaster也运行在Driver中,管理Job的Block信息
在DAGS创建过程中还会生成eventProcessActor,主要用于处理消息
用DAGSchedulerActorSupervisor来创建eventProcessActor,利用了Actor监督策略(错误处理),如果Actor出现错误,就会取消DAGS的Job,停止SparkContext
- 提交Job:
DAGS的runJob调用submitJob,具体的调用过程是:
RDD -> SparkContext#runJob -> DAGS#runJob -> DAGS#submitJob -> DAGS EventProcessActor#receive -> DAGS#handleJobSubmitted
submitJob为Job生成Job ID,并生成一个JobWaiter实例监听Job执行情况,Job的所有Task才会标记为成功,任一Task失败都会使Job失败(JobWaiter#jobFailed)。
近似估计的Job,DAGS不使用JobWaiter而是ApproximateActionListener
eventProcessActor最终调用handleJobSubmitted提交Job
- Stage划分:
handleJobSubmitted根据RDD创建finalStage,然后创建ActiveJob提交计算任务
不同Stage不能并行计算(后面用到前面的结果),因为要进行shuffle。Stage由一系列相同的独立的Task(一个task对应一个partition)组成
根据RDD之间的依赖种类(宽依赖和窄依赖),其中窄依赖的task能并行执行,可以分配到同一个Stage,而宽依赖需要shuffle,所以Spark根据宽依赖将Job划分Stage
Stage划分从最后一个RDD(执行Action)开始,此RDD从SparkContext的rubJob一直调用到handleJobSubmitted,逐层网上划分
如何通过Shuffle来划分Stage是难点,通过shuffleMapTask实现
划分实现细节:
handleJobSubmitted调用newStage创建finalStage,首先会调用getParentStages获取当前的parent Stage,每遇到一个ShuffleDependency(宽依赖)就会生成一个parent Stage,用广度优先搜索来执行
getShuffleMapStage获取shuffleDependency依赖的Stage,没有就创建。源码中这个方法里包括了registerShuffleDependencies确认是否生成,若没有则生成,newOrUsedStage也是生成,但可避免重复计算
- 生成任务:
handleJobSubmitted生成finalStage后为这个Job生成ActiveJob,并计算finalStage
除了本地运行,handleJobSubmitted调用submitStage,先递归提交parent Stage,再提交finalStage
最后,submitMissingTasks完成DAGS的工作,判断哪些partition需要计算,判断task是否已经结束,把task封装到TaskSet中,并向TaskScheduler提交Task,就完成了DAGS的工作。
任务调度(TaskScheduler):
每个TaskScheduler对应一个SchedulerBackend,TaskScheduler负责Application不同Job之间的调度,执行失败启动重试机制,并为执行慢的Task备份。SchedulerBackend与Cluster Manager交互,取得应用的资源,并传给TaskScheduler,并最终分配计算所取到的资源。
- 创建TaskScheduler:
SparkContext创建时,执行了createTaskScheduler创建,根据传入master的url来判断资源管理框架,生成不同的TaskScheduler和SchedulerBackend
设置spark.speculation为true打开任务推测,这样在创建之后会周期性调用TaskSetManager的checkSpeculatableTasks检查,周期间隔通过spark.speculation.interval设置
- 提交Task:
DAGS按照顺序提交Stage到TaskScheduler,用到TaskSchedulerImpl#submitTasks
接下来进行Task级别的资源调度,task分配到Executor最终完成
首先是scheduler层的调用类,位于Driver端:
1 scheduler.TaskSchedulerImpl#submitTasks ->
2 SchedulableBuilder#addTaskSetManager ->
3 cluster.CoarseGrainedSchedulerBackend#reviveOffers ->
4 cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers ->
5 TaskSchedulerImpl@resourceOffers ->
6 cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks ->
最终到executor
executor.CoarseGrainedSchedulerBackend.receiveWithLogging#launchTask
executor.Executor#launchTask
Driver端的调度处理(上面位于Scheduler的操作,对源码分析):
调用栈1将TaskSet放进TaskSetManager,根据就近原则(locality aware)分配Task计算资源,监控执行,并有失败重试、慢任务推测性执行功能
调用栈2中,schedulableBuilder是Application级别的调度器,会确定TaskSetManager调度顺序,再由TaskSetManager根据就近原则分配。有两种调度策略FIFO(先进先出)、FAIR(公平),可通过scheduler.mode设置,默认为FIFO。
调用栈5响应cluster.CoarseGrainedSchedulerBackend的资源调度请求,为Task具体分配资源,输入是一个Executor列表,输出是scheduler.TaskDescription二维数组,包含了Task ID、Executor ID和Task执行环境依赖等。此步骤源码中包括避免Task集中分配采取的随机打散、获取按照调度顺序排序好的TaskSetManager、计算就近原则、调用TaskSetManager为Executor分配Task
调用栈6将以上得到的Tasks发送给Executor
Executor接收到信息开始执行Task
- 具体调度实现:
schedulableBuilder.addTakSetManager根据调度方式不同,分别有FIFOSchedulableBuilder和FairSchedulableBuilder实现。构造方法中有rootPool这个输入,里面包含了TaskSetManager
FIFO调度:
保证Job ID较小的先被调度,同一个Job就Stage ID较小先调度,源码中也正是先比较Job ID再比较Stage ID
FAIR调度:
需要在rootPool的基础上根据配置文件来构建调度树,包含多个pool
- 处理Task运算结果:
Driver接收Executor执行结果:
Task在Executor执行完后向Driver发送StatusUpdate消息,Driver状态更新为TaskState.FINISHED,Driver把状态更新通知TaskScheduler,Executor重新分配新Task
TaskSchedulerImpl#statusUpdate中实现判断并标记Task是否执行成功。处理结果由Daemon线程池负责,默认为4个线程,通过spark.resultGetter.threads设置。
结果传回Driver后,spark.driver.maxResultSize设置最大结果大小,大于就直接丢弃。在此范围内,较大的结果存入storage.BlockManager,较小就直接回传给Driver,这个是通过spark.akka.frameSize设置界线,默认为10M,回传机制就是AKKA消息传递机制。
处理任务成功执行:
Driver端对结果的处理,首先确定大小符不符合要求,向远端worker获取结果,判断错误,最后将远端结果删除
Scheduler.handleSuccessfulTask负责处理获取到的计算结果
1 scheduler.TaskSchedulerImpl#handleSuccessfulTask ->
2 TaskSetManager#handleSuccessfulTask ->
3 DAGScheduler#taskEnded ->
4 DAGScheduler#eventProcessActor ->
5 DAGScheduler#handleTaskCompletion
调用栈5完成了计算结果处理,ResultTask调用scheduler.JobWaiter告知调用者任务结束。
当所有Task都结束时,把Stage标记为结束,通知JobWaiter任务能够结束
对于ShuffleMapTask,要将结果保存到Stage,将结果注册到MapOutputTrackerMaster,下一个Stage的Task可以获取Shuffle结果
如果所有任务都返回,Stage还有部分数据为空,证明某部分Task失败需要重新提交Stage
如果ResultTask对应的Stage执行完成,就证明Job执行完成。
处理任务失败的容错机制:
TaskSchedulerImpl#statusUpdate调用TaskResultGetter#enqueueFailedTask处理。
核心实现在于handleFailedTask,它调用TaskSetManager处理任务失败,当失败次数没有超过阈值(默认为4,通过spark.task.maxFailures设置)重新提交任务,如果超过阈值,标记为失败
总结:
Task是集群上运行的基本单位,一个Task负责处理RDD的一个Partition,RDD的多个Partition由不同Task处理,所以这些Task逻辑都一样,只不过处理的数据不同。这些Task组成了Stage
Task分类:ShuffleMapTask、ResultTask
ShuffleMapTask根据Task的partitioner将计算结果放到不同bucket中,而ResultTask将计算结果发回Driver
一个Job有多个Stage,最后的Stage包含的是ResultTask
在用户触发action指令,SparkContext通过runJob进行任务提交,先由DAG的handleJobSubmitted处理,有划分Stage、提交Stage、Task等,让Task运行在集群上
Stage的开始是从外部存储或Shuffle中读取数据;结束于发生Shuffle或者生成结果。
划分完Stage后,TaskScheduler为Stage的Task分配计算资源,而具体的分配其实由Cluster Manager负责。