zoukankan      html  css  js  c++  java
  • 深入探究Spark -- 调度器Scheduler

    多个Job如何分配计算资源:
     
     
    调度模块分为:DAGScheduler和TaskScheduler
    计算任务按DAG分阶段提交集群
    用户只需要关心如何创建RDD来操作,无需关注底层的调度,除非要优化
     
    DAGScheduler负责分析应用,建立DAG,划分Stage,Stage由并发Task(Task逻辑相同,作用于不同数据Partition)构成,DAG的实现在不同资源管理框架(L、S、M、Y)下一样
     
    Task提交到TaskScheduler,TaskScheduler通过Cluster Manager分配Worker的Executor启动Task。Executor先看当前任务有无缓存,没有则开始计算。计算结果会回传到Driver或保存在本地。不同资源管理框架实现方式不同,主要是实现org.apache.spark.scheduler.TaskSchedulerImpl
     
    调度模块的三个类:org.apache.spark.scheduler下的
    DAGScheduler、SchedulerBackend、TaskScheduler
     
    这三个类构成了任务调度逻辑图:
     
     
    SchedulerBackend负责向当前计算的Task分配Executor,并启动Task,完成计算调度,使用reviveOffers实现,不同运行模式加入不同的逻辑
     
    TaskScheduler为SparkContext调度任务,也就是从DAGScheduler接收Task,并提交,执行慢的会备份任务。其中的TaskSchedulerImpl会调用Backend的reviveOffers:提交新任务、任务执行失败、Executor不可用、任务执行慢重新分配资源
     
    TaskScheduler和Backend的关系图:
     
    每个Backend对应唯一TaskScheduler,都由SparkContext创建和持有
     
    其中Mesos有两种Backend,粗coarsegrained和细fine-grained,将每个Spark Task映射成Mesos Task 从而让Mesos来调度和执行Task
     
     
     
     
     
    DAGScheduler:
     
    DAG划分Stage,Stage包含相同逻辑的Task,在不同资源框架实现相同。
     
    SparkContext的runJob调用了DAGScheduler的runJob,然后开始划分、生成工作。
     
     
    1. DAGS的创建:
     
    SparkContext调用DAGS的构造函数,在DAGS创建前需要先创建TaskS,因为DAGS要引用它
     
    在DAGS构造函数源码中:
    MapOutputTrackerMaster运行在Driver中管理Shuffle Map Task,用来获取Shuffle输出的位置
    BlockManagerMaster也运行在Driver中,管理Job的Block信息
     
    在DAGS创建过程中还会生成eventProcessActor,主要用于处理消息
     
    用DAGSchedulerActorSupervisor来创建eventProcessActor,利用了Actor监督策略(错误处理),如果Actor出现错误,就会取消DAGS的Job,停止SparkContext
     
     
    1. 提交Job:
     
    DAGS的runJob调用submitJob,具体的调用过程是:
    RDD -> SparkContext#runJob -> DAGS#runJob -> DAGS#submitJob -> DAGS EventProcessActor#receive -> DAGS#handleJobSubmitted
     
    submitJob为Job生成Job ID,并生成一个JobWaiter实例监听Job执行情况,Job的所有Task才会标记为成功,任一Task失败都会使Job失败(JobWaiter#jobFailed)。
     
    近似估计的Job,DAGS不使用JobWaiter而是ApproximateActionListener
     
    eventProcessActor最终调用handleJobSubmitted提交Job
     
     
     
    1. Stage划分:
     
    handleJobSubmitted根据RDD创建finalStage,然后创建ActiveJob提交计算任务
     
    不同Stage不能并行计算(后面用到前面的结果),因为要进行shuffle。Stage由一系列相同的独立的Task(一个task对应一个partition)组成
     
    根据RDD之间的依赖种类(宽依赖和窄依赖),其中窄依赖的task能并行执行,可以分配到同一个Stage,而宽依赖需要shuffle,所以Spark根据宽依赖将Job划分Stage
     
    Stage划分从最后一个RDD(执行Action)开始,此RDD从SparkContext的rubJob一直调用到handleJobSubmitted,逐层网上划分
     
    如何通过Shuffle来划分Stage是难点,通过shuffleMapTask实现
     
    划分实现细节:
    handleJobSubmitted调用newStage创建finalStage,首先会调用getParentStages获取当前的parent Stage,每遇到一个ShuffleDependency(宽依赖)就会生成一个parent Stage,用广度优先搜索来执行
     
    getShuffleMapStage获取shuffleDependency依赖的Stage,没有就创建。源码中这个方法里包括了registerShuffleDependencies确认是否生成,若没有则生成,newOrUsedStage也是生成,但可避免重复计算
     
     
    1. 生成任务:
     
    handleJobSubmitted生成finalStage后为这个Job生成ActiveJob,并计算finalStage
     
    除了本地运行,handleJobSubmitted调用submitStage,先递归提交parent Stage,再提交finalStage
     
    最后,submitMissingTasks完成DAGS的工作,判断哪些partition需要计算,判断task是否已经结束,把task封装到TaskSet中,并向TaskScheduler提交Task,就完成了DAGS的工作。
     
     
     
     
     
    任务调度(TaskScheduler):
     
    每个TaskScheduler对应一个SchedulerBackend,TaskScheduler负责Application不同Job之间的调度,执行失败启动重试机制,并为执行慢的Task备份。SchedulerBackend与Cluster Manager交互,取得应用的资源,并传给TaskScheduler,并最终分配计算所取到的资源。
     
    1. 创建TaskScheduler:
     
    SparkContext创建时,执行了createTaskScheduler创建,根据传入master的url来判断资源管理框架,生成不同的TaskScheduler和SchedulerBackend
     
    设置spark.speculation为true打开任务推测,这样在创建之后会周期性调用TaskSetManager的checkSpeculatableTasks检查,周期间隔通过spark.speculation.interval设置
     
     
    1. 提交Task:
     
    DAGS按照顺序提交Stage到TaskScheduler,用到TaskSchedulerImpl#submitTasks
     
    接下来进行Task级别的资源调度,task分配到Executor最终完成
    首先是scheduler层的调用类,位于Driver端:
    1 scheduler.TaskSchedulerImpl#submitTasks -> 
    2 SchedulableBuilder#addTaskSetManager ->
    3 cluster.CoarseGrainedSchedulerBackend#reviveOffers -> 
    4 cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers -> 
    5 TaskSchedulerImpl@resourceOffers -> 
    6 cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks -> 
     
    最终到executor
    executor.CoarseGrainedSchedulerBackend.receiveWithLogging#launchTask
    executor.Executor#launchTask
     
    Driver端的调度处理(上面位于Scheduler的操作,对源码分析):
     
    调用栈1将TaskSet放进TaskSetManager,根据就近原则(locality aware)分配Task计算资源,监控执行,并有失败重试、慢任务推测性执行功能
     
    调用栈2中,schedulableBuilder是Application级别的调度器,会确定TaskSetManager调度顺序,再由TaskSetManager根据就近原则分配。有两种调度策略FIFO(先进先出)、FAIR(公平),可通过scheduler.mode设置,默认为FIFO。
     
    调用栈5响应cluster.CoarseGrainedSchedulerBackend的资源调度请求,为Task具体分配资源,输入是一个Executor列表,输出是scheduler.TaskDescription二维数组,包含了Task ID、Executor ID和Task执行环境依赖等。此步骤源码中包括避免Task集中分配采取的随机打散、获取按照调度顺序排序好的TaskSetManager、计算就近原则、调用TaskSetManager为Executor分配Task
     
    调用栈6将以上得到的Tasks发送给Executor
     
    Executor接收到信息开始执行Task
     
     
    1. 具体调度实现:
     
    schedulableBuilder.addTakSetManager根据调度方式不同,分别有FIFOSchedulableBuilder和FairSchedulableBuilder实现。构造方法中有rootPool这个输入,里面包含了TaskSetManager
     
    FIFO调度:
     
    保证Job ID较小的先被调度,同一个Job就Stage ID较小先调度,源码中也正是先比较Job ID再比较Stage ID
     
    FAIR调度:
     
    需要在rootPool的基础上根据配置文件来构建调度树,包含多个pool
     
     
     
     
    1. 处理Task运算结果:
     
    Driver接收Executor执行结果:
     
    Task在Executor执行完后向Driver发送StatusUpdate消息,Driver状态更新为TaskState.FINISHED,Driver把状态更新通知TaskScheduler,Executor重新分配新Task
     
    TaskSchedulerImpl#statusUpdate中实现判断并标记Task是否执行成功。处理结果由Daemon线程池负责,默认为4个线程,通过spark.resultGetter.threads设置。
     
    结果传回Driver后,spark.driver.maxResultSize设置最大结果大小,大于就直接丢弃。在此范围内,较大的结果存入storage.BlockManager,较小就直接回传给Driver,这个是通过spark.akka.frameSize设置界线,默认为10M,回传机制就是AKKA消息传递机制。
     
    处理任务成功执行:
     
    Driver端对结果的处理,首先确定大小符不符合要求,向远端worker获取结果,判断错误,最后将远端结果删除
     
    Scheduler.handleSuccessfulTask负责处理获取到的计算结果
     
    1 scheduler.TaskSchedulerImpl#handleSuccessfulTask ->
    2 TaskSetManager#handleSuccessfulTask ->
    3 DAGScheduler#taskEnded ->
    4 DAGScheduler#eventProcessActor ->
    5 DAGScheduler#handleTaskCompletion
     
    调用栈5完成了计算结果处理,ResultTask调用scheduler.JobWaiter告知调用者任务结束。
    当所有Task都结束时,把Stage标记为结束,通知JobWaiter任务能够结束
    对于ShuffleMapTask,要将结果保存到Stage,将结果注册到MapOutputTrackerMaster,下一个Stage的Task可以获取Shuffle结果
    如果所有任务都返回,Stage还有部分数据为空,证明某部分Task失败需要重新提交Stage
    如果ResultTask对应的Stage执行完成,就证明Job执行完成。
     
    处理任务失败的容错机制:
     
    TaskSchedulerImpl#statusUpdate调用TaskResultGetter#enqueueFailedTask处理。
    核心实现在于handleFailedTask,它调用TaskSetManager处理任务失败,当失败次数没有超过阈值(默认为4,通过spark.task.maxFailures设置)重新提交任务,如果超过阈值,标记为失败
     
     
     
     
     
    总结:
     
    Task是集群上运行的基本单位,一个Task负责处理RDD的一个Partition,RDD的多个Partition由不同Task处理,所以这些Task逻辑都一样,只不过处理的数据不同。这些Task组成了Stage
     
    Task分类:ShuffleMapTask、ResultTask
     
    ShuffleMapTask根据Task的partitioner将计算结果放到不同bucket中,而ResultTask将计算结果发回Driver
     
    一个Job有多个Stage,最后的Stage包含的是ResultTask
     
    在用户触发action指令,SparkContext通过runJob进行任务提交,先由DAG的handleJobSubmitted处理,有划分Stage、提交Stage、Task等,让Task运行在集群上
     
    Stage的开始是从外部存储或Shuffle中读取数据;结束于发生Shuffle或者生成结果。
     
    划分完Stage后,TaskScheduler为Stage的Task分配计算资源,而具体的分配其实由Cluster Manager负责。
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
  • 相关阅读:
    JS数组及操作方法,堆和栈的概念
    JS变量作用域,浏览器预解析
    JS函数的基本概念
    JS循环语句
    JS程序三大结构及语法语句
    src与href
    JS概念及基本语法
    图片整合技术
    hdu6395 Sequence(分段矩阵快速幂)
    hdu6396 Swordsman(贪心)
  • 原文地址:https://www.cnblogs.com/kinghey-java-ljx/p/8521380.html
Copyright © 2011-2022 走看看