zoukankan      html  css  js  c++  java
  • Spark源码分析 -- TaskScheduler

    Spark在设计上将DAGScheduler和TaskScheduler完全解耦合, 所以在资源管理和task调度上可以有更多的方案

    现在支持, LocalSheduler, ClusterScheduler, MesosScheduler, YarnClusterScheduler

    先分析ClusterScheduler, 即standalone的Spark集群上, 因为比较单纯不涉及其他的系统, 看看Spark的任务是如何被执行的

     

      private var taskScheduler: TaskScheduler = {
          case SPARK_REGEX(sparkUrl) =>
            val scheduler = new ClusterScheduler(this)  // 创建ClusterScheduler
            val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) // 创建SparkDeploySchedulerBackend
            scheduler.initialize(backend)
            scheduler
      } 


    TaskScheduler接口, 注释写的非常清楚

    /**
     * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
     * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
     * and are responsible for sending the tasks to the cluster, running them, retrying if there
     * are failures, and mitigating stragglers. They return events to the DAGScheduler through
     * the TaskSchedulerListener interface.
     */
    private[spark] trait TaskScheduler {
      def rootPool: Pool
      def schedulingMode: SchedulingMode
      def start(): Unit // 启动
      def postStartHook() { }
      def stop(): Unit
      // Submit a sequence of tasks to run.
      def submitTasks(taskSet: TaskSet): Unit // 核心, 提交taskset的接口
      // Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
      def setListener(listener: TaskSchedulerListener): Unit // TaskScheduler会使用这个listener来汇报当前task的运行状况,会注册DAGScheduler
      // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
      def defaultParallelism(): Int
    }

     

    ClusterScheduler

    对于集群的TaskScheduler实现, 相对于LocalScheduler
    主要就是创建和管理schedulable tree, 参考Spark源码分析 – SchedulableBuilder
    当然最终和cluster的executor通信还是需要依赖SparkDeploySchedulerBackend, 参考Spark源码分析 – SchedulerBackend

     

    对于submitTasks,
    首先将tasksetmanager放入schedulable tree等待schedule (delay schedule, 不一定会马上被调度到)
    然后给SchedulerBackend发送reviveOffers event, 请求分配资源并launch tasks (launch的并一定是刚提交的tasks)
    SchedulerBackend会向cluster申请workOffers(对于standalonebackend, 这步省略了), 然后再调用ClusterScheduler.resourceOffers来根据可用的workOffers分配tasks
    最终给executors发送LaunchTask, 启动tasks

     

    resourceOffers是核心函数, 当得到可用的workerOffer后, 用于从schedulable tree中schedule合适的被执行的tasks
    resourceOffers的逻辑有点小复杂
    1. 首先依次遍历sortedTaskSets, 并对于每个Taskset, 遍历TaskLocality
    2. 越local越优先, 找不到(launchedTask为false)才会到下个locality级别
    3. 在多次遍历offer list, 因为一次taskSet.resourceOffer只会占用一个core, 而不是一次用光所有的core, 这样有助于一个taskset中的task比较均匀的分布在workers上
    4. 只有在该taskset, 该locality下, 对所有worker offer都找不到合适的task时, 才跳到下个locality级别

     

    private[spark] class ClusterScheduler(val sc: SparkContext) extends TaskScheduler with Logging
    {
      var listener: TaskSchedulerListener = null
      var backend: SchedulerBackend = null
      val mapOutputTracker = SparkEnv.get.mapOutputTracker
      var schedulableBuilder: SchedulableBuilder = null
      var rootPool: Pool = null
      // default scheduler is FIFO
      val schedulingMode: SchedulingMode = SchedulingMode.withName(
        System.getProperty("spark.scheduler.mode", "FIFO"))
      def initialize(context: SchedulerBackend) {
        backend = context    // 初始化SchedulerBackend 
        // temporarily set rootPool name to empty
        rootPool = new Pool("", schedulingMode, 0, 0) // 创建Schedulable tree的root pool
        schedulableBuilder = { // 用schedulableBuilder初始化Schedulable tree
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool)
          }
        }
        schedulableBuilder.buildPools()
      }

      override def start() {
        backend.start() // 启动SchedulerBackend 
      }
    
      override def submitTasks(taskSet: TaskSet) {
        val tasks = taskSet.tasks
        logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
        this.synchronized {
          val manager = new ClusterTaskSetManager(this, taskSet)
          activeTaskSets(taskSet.id) = manager
          schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) // 将TaskSetManager加到Schedulable tree等待被调度执行
          taskSetTaskIds(taskSet.id) = new util.HashSet[Long]()
          backend.reviveOffers() // 调用SchedulerBackend的reviveOffers, 其实就是往DriverActor发送reviveOffers事件
      }
     
      /**
       * Called by cluster manager to offer resources on slaves. We respond by asking our active task
       * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
       * that tasks are balanced across the cluster.
       */
      // 根据当前可用的worker offers, 分配tasks
      def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
        SparkEnv.set(sc.env)
    
        // Build a list of tasks to assign to each worker
        val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) // 每个core可以分配一个task,所以对每个offer生成length为cores数目的ArrayBuffer
        val availableCpus = offers.map(o => o.cores).toArray  // 每个work可用的core数目的array
        val sortedTaskSets = rootPool.getSortedTaskSetQueue() // 得到根据schedule算法排序后的TaskSetManager列表
        // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
        // of locality levels so that it gets a chance to launch local tasks on all of them.
        var launchedTask = false
        for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { // 嵌套, 遍历sortedTaskSets, 并对每个taskSet遍历所有TaskLocality
          do {
            launchedTask = false
            for (i <- 0 until offers.size) { // 遍历每个offer, 试图在当前的taskset和当前的locality上找到合适的task
              val execId = offers(i).executorId
              val host = offers(i).host
              for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) { // 每次只会返回最多一个task
                tasks(i) += task
                val tid = task.taskId
                taskIdToTaskSetId(tid) = taskSet.taskSet.id
                taskSetTaskIds(taskSet.taskSet.id) += tid
                taskIdToExecutorId(tid) = execId
                activeExecutorIds += execId
                executorsByHost(host) += execId
                availableCpus(i) –= 1 // 分配一个task, 所以availableCpus - 1
                launchedTask = true
              }
            }
          } while (launchedTask) // 找到,就继续在这个locality上找task, 否则放宽到下个locality,或下个taskset
        }
    
        if (tasks.size > 0) {
          hasLaunchedTask = true
        }
        return tasks
      }
    }
  • 相关阅读:
    理解SVG坐标系统和变换: transform属性
    在svg文间画图过程中放大缩小图片后,坐标偏移问题
    理解SVG的缩放 偏移的计算公式
    svg 实践之屏幕坐标与svg元素坐标转换
    Winform 程序打包及安装
    使用bootstrap table小记(表格组件)
    MVC实现多级联动
    微信公众号开发之网页中及时获取当前用户Openid及注意事项
    微信公众号开发之网页授权获取用户基本信息
    微信公众号开发之自动消息回复和自定义菜单
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3504159.html
Copyright © 2011-2022 走看看