zoukankan      html  css  js  c++  java
  • Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法

    上篇文章 

    Spark 源码解析 : DAGScheduler中的DAG划分与提交

    介绍了DAGScheduler的Stage划分算法。

    本文继续分析Stage被封装成TaskSet,并将TaskSet提交到集群的Executor执行的过程

    在DAGScheduler的submitStage方法中,将Stage划分完成,生成拓扑结构,当一个stage没有父stage时候,会调用DAGScheduler的submitMissingTasks方法来提交该stage包含tasks。
    首先来分析一下DAGScheduler的submitMissingTasks方法

    1.获取Task的最佳计算位置:
    1. val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
    2. stage match {
    3. case s: ShuffleMapStage =>
    4. partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
    5. case s: ResultStage =>
    6. val job = s.activeJob.get
    7. partitionsToCompute.map { id =>
    8. val p = s.partitions(id)
    9. (id, getPreferredLocs(stage.rdd, p))
    10. }.toMap
    11. }
    12. }

    核心是其中的getPreferredLocs方法,根据RDD的数据信息得到task的最佳计算位置,从而获取较好的数据本地性。其中的细节这里先跳过,在以后的文章在做分析

    2.序列化Task的Binary,并进行广播。Executor端在执行task时会向反序列化Task。

    3.根据stage的不同类型创建,为stage的每个分区创建创建task,并封装成TaskSet。Stage分两种类型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask。
    1. val tasks: Seq[Task[_]] = try {
    2. stage match {
    3. case stage: ShuffleMapStage =>
    4. partitionsToCompute.map { id =>
    5. val locs = taskIdToLocations(id)
    6. val part = stage.rdd.partitions(id)
    7. new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
    8. taskBinary, part, locs, stage.internalAccumulators)
    9. }
    10. case stage: ResultStage =>
    11. val job = stage.activeJob.get
    12. partitionsToCompute.map { id =>
    13. val p: Int = stage.partitions(id)
    14. val part = stage.rdd.partitions(p)
    15. val locs = taskIdToLocations(id)
    16. new ResultTask(stage.id, stage.latestInfo.attemptId,
    17. taskBinary, part, locs, id, stage.internalAccumulators)
    18. }
    19. }

    4.调用TaskScheduler的submitTasks,提交TaskSet
    1. logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
    2. stage.pendingPartitions ++= tasks.map(_.partitionId)
    3. logDebug("New pending partitions: " + stage.pendingPartitions)
    4. taskScheduler.submitTasks(new TaskSet(
    5. tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    6. stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

    submitTasks方法的实现在TaskScheduler的实现类TaskSchedulerImpl中。

    4.1 TaskSchedulerImpl的submitTasks方法首先创建TaskSetManager。
    1. val manager = createTaskSetManager(taskSet, maxTaskFailures)
    2. val stage = taskSet.stageId
    3. val stageTaskSets =
    4. taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
    5. stageTaskSets(taskSet.stageAttemptId) = manager

    TaskSetManager负责管理TaskSchedulerImpl中一个单独TaskSet,跟踪每一个task,如果task失败,负责重试task直到达到task重试次数的最多次数。并且通过延迟调度来执行task的位置感知调度。

    1. private[spark] class TaskSetManager(
    2. sched: TaskSchedulerImpl,//绑定的TaskSchedulerImpl
    3. val taskSet: TaskSet,
    4. val maxTaskFailures: Int, //失败最大重试次数
    5. clock: Clock = new SystemClock())
    6. extends Schedulable with Logging

    4.2 将TaskSetManger加入schedulableBuilder
    1. schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //将TaskSetManager加入rootPool调度池中,由schedulableBuilder决定调度顺序

    schedulableBuilder的类型是 SchedulerBuilder,SchedulerBuilder是一个trait,有两个实现FIFOSchedulerBuilder和 FairSchedulerBuilder,并且默认采用的是FIFO方式

    1. // default scheduler is FIFO
    2. private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")

    schedulableBuilder的创建是在SparkContext创建SchedulerBackend和TaskScheduler后调用TaskSchedulerImpl的初始化方法进行创建的。

    1. def initialize(backend: SchedulerBackend) {
    2. this.backend = backend
    3. // temporarily set rootPool name to empty
    4. rootPool = new Pool("", schedulingMode, 0, 0)
    5. schedulableBuilder = {
    6. schedulingMode match {
    7. case SchedulingMode.FIFO =>
    8. new FIFOSchedulableBuilder(rootPool)
    9. case SchedulingMode.FAIR =>
    10. new FairSchedulableBuilder(rootPool, conf)
    11. }
    12. }
    13. schedulableBuilder.buildPools()
    14. }

    schedulableBuilder是TaskScheduler中一个重要成员,他根据调度策略决定了TaskSetManager的调度顺序。

    4.3 接下来调用SchedulerBackend的riviveOffers方法对Task进行调度,决定task具体运行在哪个Executor中。

    调用CoarseGrainedSchedulerBackend的riviveOffers方法,该方法给driverEndpoint发送ReviveOffer消息

    1. override def reviveOffers() {
    2. driverEndpoint.send(ReviveOffers)
    3. }

    driverEndpoint收到ReviveOffer消息后调用makeOffers方法
    1. // Make fake resource offers on all executors
    2. private def makeOffers() {
    3. //过滤出活跃状态的Executor
    4. val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    5.      //将Executor封装成WorkerOffer对象
    6. val workOffers = activeExecutors.map { case (id, executorData) =>
    7. new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    8. }.toSeq

    9. launchTasks(scheduler.resourceOffers(workOffers))
    10. }

    注意:上面代码中的executorDataMap,在客户的向Master注册Application的时候,Master已经为Application分配并启动好Executor,然后注册给CoarseGrainedSchedulerBackend,注册信息就是存储在executorDataMap数据结构中。

    准备好计算资源后,接下来TaskSchedulerImpl基于这些计算资源为task分配Executor。
    我们看一下TaskSchedulerImpl的resourceOffers方法:

    1. // 随机打乱offers
    2. val shuffledOffers = Random.shuffle(offers)

    3. // 构建一个二维数组,保存每个Executor上将要分配的那些task
    4. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    5. val availableCpus = shuffledOffers.map(o => o.cores).toArray
    1.  
    2.    //根据SchedulerBuilder的调度算法,给TaskManager排好序
      1. val sortedTaskSets = rootPool.getSortedTaskSetQueue
    3. for (taskSet <- sortedTaskSets) {
    4. logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    5. taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    6. if (newExecAvail) {
    7. taskSet.executorAdded()
    8. }
    9. }
    10. // 使用双重循环,对每一个taskset 依照调度的顺序,依次按照本地性级别顺序尝试启动task
    11. // 数据本地性级别顺序: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    12. var launchedTask = false
    13. for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
    14. do {
    15. launchedTask = resourceOfferSingleTaskSet(
    16. taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
    17. } while (launchedTask)
    18. }
    19. if (tasks.size > 0) {
    20. hasLaunchedTask = true
    21. }
    22. return tasks

    下面看看 resourceOfferSingleTaskSet 方法:
    用当前的数据本地性,调用TaskSetManagerresourceOffer方法,在当前executor上分配task
    1. private def resourceOfferSingleTaskSet(
    2. taskSet: TaskSetManager,
    3. maxLocality: TaskLocality,
    4. shuffledOffers: Seq[WorkerOffer],
    5. availableCpus: Array[Int],
    6. tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
    7. var launchedTask = false
    8. for (i <- 0 until shuffledOffers.size) {
    9. val execId = shuffledOffers(i).executorId
    10. val host = shuffledOffers(i).host
    11.        //如果executor 的cup数大于 每个task的cup数目(值为1)
    12. if (availableCpus(i) >= CPUS_PER_TASK) {
    13. try {
    14.        //
    15. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
    16. tasks(i) += task
    17. val tid = task.taskId
    18. taskIdToTaskSetManager(tid) = taskSet
    19. taskIdToExecutorId(tid) = execId
    20. executorIdToTaskCount(execId) += 1
    21. executorsByHost(host) += execId
    22. availableCpus(i) -= CPUS_PER_TASK
    23. assert(availableCpus(i) >= 0)
    24. launchedTask = true
    25. }
    26. }

    为Task分配好资源之后,DriverEndpint调用launchTask方法将task在Executor上启动运行。task在Executor上的启动运行过程,在后面的文章中会继续分析,敬请关注。

    总结一下调用过程:
    TaskSchedulerImpl#submitTasks
    CoarseGrainedSchedulerBackend#riviveOffers
    CoarseGrainedSchedulerBackend$DriverEndpoint#makeOffers
      |-TaskSchedulerImpl#resourceOffers(offers) 为offers分配task 
        |- TaskSchedulerImpl#resourceOfferSingleTaskSet
    CoarseGrainedSchedulerBackend$DriverEndpoint#launchTask









  • 相关阅读:
    58. 最后一个单词的长度
    53. 最大子序和
    50. Pow(x, n)
    35. 搜索插入位置
    28. 实现 strStr()
    leetcode 27. 移除元素
    leetcode 26. 删除排序数组中的重复项
    leetcode 21. 合并两个有序链表
    20. 有效的括号
    多线程案例_循环打印_设计4个线程...
  • 原文地址:https://www.cnblogs.com/zhouyf/p/5743382.html
Copyright © 2011-2022 走看看