zoukankan      html  css  js  c++  java
  • 18、TaskScheduler原理剖析与源码分析

    一、源码分析

    ###入口
    ###org.apache.spark.scheduler/DAGScheduler.scala
    
    // 最后,针对stage的task,创建TaskSet对象,调用taskScheduler的submitTasks()方法,提交taskSet
          // 默认情况下,我们的standalone模式,是使用的TaskSchedulerImpl,TaskScheduler只是一个trait
          taskScheduler.submitTasks(
            new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
    
    
    
    
    
    ###org.apache.spark.scheduler/TaskSchedulerlmpl.scala
    ###taskScheduler.submitTasks()方法,TaskSchedulerImpl的submitTasks()方法
    
    /**
        * TaskScheduler提交任务的入口
        * @param taskSet
        */
      override def submitTasks(taskSet: TaskSet) {
        val tasks = taskSet.tasks
        logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
        this.synchronized {
          // 给每一个taskSet,都会创建一个TaskSetManager
          // TaskSetManager实际上,在后面,会负责他的那个TaskSet的任务执行状况的监视和管理
          val manager = createTaskSetManager(taskSet, maxTaskFailures)
          // 加入内存缓存中
          activeTaskSets(taskSet.id) = manager
          schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
     
          if (!isLocal && !hasReceivedTask) {
            starvationTimer.scheduleAtFixedRate(new TimerTask() {
              override def run() {
                if (!hasLaunchedTask) {
                  logWarning("Initial job has not accepted any resources; " +
                    "check your cluster UI to ensure that workers are registered " +
                    "and have sufficient resources")
                } else {
                  this.cancel()
                }
              }
            }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
          }
          hasReceivedTask = true
        }
        // sparkContext原理剖析的时候,创建TaskScheduler的时候,一件非常重要的事情,就是为TaskSchedulerImpl创建
        // 一个SparkDeploySchedulerBackend,这里的backend,指的就是之前创建好的SparkDeploySchedulerBackend,而且这个
        // backend是负责创建AppClient,向Master注册Application的
        backend.reviveOffers()
      }
    
    
    
    
    
    
    ###org.apache.spark.scheduler/TaskSetManager.scala
    
    /**
      * 在TaskSchedulerImpl中,对一个单独的TaskSet的任务进行调度,这个类负责追踪每一个task,如果task失败的话,
      * 会负责重试task,直到超过重试的次数限制,并且会通过延迟调度,为这个TaskSet处理本地化调度机制。它的主要接口是resourceOffer,
      * 在这个接口中,TaskSet会希望在一个节点上运行一个任务,并且接受任务的状态改变消息,来知道它负责的task的状态改变了
      */
    private[spark] class TaskSetManager(
        sched: TaskSchedulerImpl,
        val taskSet: TaskSet,
        val maxTaskFailures: Int,
        clock: Clock = new SystemClock())
      extends Schedulable with Logging {
    
    
    
    
    
    ###org.apache.spark.scheduler.cluster/CoarseGrainedSchedulerBackend.scala
    ###backend.reviveOffers()方法,CoarseGrainedSchedulerBackend的reviveOffers()方法
    
      override def reviveOffers() {
        driverActor ! ReviveOffers
      }
    
    
    
    
    
    ###CoarseGrainedSchedulerBackend这个类的,DriverActor这个类的ReviveOffers
    
          case ReviveOffers =>
            makeOffers()
    
    
    
    
    
    ###org.apache.spark.scheduler.cluster/CoarseGrainedSchedulerBackend.scala
    ###makeOffers()方法
        // Make fake resource offers on all executors
        def makeOffers() {
          // 第一步,调用TaskSchedulerImpl的resourceOffers()方法,执行任务分配算法,将各个task分配到executor上去
          // 第二步,分配好task到Executor之后,执行自己的的launchTasks()方法,将分配的task发送launchTask消息到对应的Executor上去,由Executor启动并执行task
          // 给resourceOffers方法传入的是这个Application所有可用的Executor,并且将其封装成了WorkerOffer,每个WorkerOffer代表了每个Executor可用的cpu资源数量
          launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
          }.toSeq))
        }
    
    
    
    
    
    
    ###org.apache.spark.scheduler/TaskSchedulerImpl.scala
    ###resourceOffers()
    
      def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
        // Mark each slave as alive and remember its hostname
        // Also track if new executor is added
        var newExecAvail = false
        for (o <- offers) {
          executorIdToHost(o.executorId) = o.host
          activeExecutorIds += o.executorId
          if (!executorsByHost.contains(o.host)) {
            executorsByHost(o.host) = new HashSet[String]()
            executorAdded(o.executorId, o.host)
            newExecAvail = true
          }
          for (rack <- getRackForHost(o.host)) {
            hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
          }
        }
     
        // 首先,将可用的executor进行shuffle,也就是说,进行打散,从而做到,尽可能可以进行负载均衡
        // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
        val shuffledOffers = Random.shuffle(offers)
        // Build a list of tasks to assign to each worker.
        // 然后针对WorkerOffer,创建一堆需要用的东西
        // 比如tasks,它可以理解为一个二维数组,即ArrayBuffer的元素又是一个ArrayBuffer,并且每个子ArrayBuffer的数量是固定的,也就是这个Executor可用的cpu数量
        val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
        val availableCpus = shuffledOffers.map(o => o.cores).toArray
        // 这个很重要,从rootPool中取出了排序的TaskSet,之前讲解TaskScheduler初始化的时候,创建完TaskSchedulerImpl、SparkDeploySchedulerBackend之后,执行一个initialize()
        // 方法,在这个方法中,其实会创建一个调度池,这里,相当于是说,所有提交的taskSet,首先呢,会放入这个调度池,然后再执行task分配算法的时候,会从这个调度池中,取出排好队的TaskSet
        val sortedTaskSets = rootPool.getSortedTaskSetQueue
        for (taskSet <- sortedTaskSets) {
          logDebug("parentName: %s, name: %s, runningTasks: %s".format(
            taskSet.parent.name, taskSet.name, taskSet.runningTasks))
          if (newExecAvail) {
            taskSet.executorAdded()
          }
        }
     
        // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
        // of locality levels so that it gets a chance to launch local tasks on all of them.
        // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
     
        // 这里,是任务分配算法的核心,双重for循环,遍历所有的taskSet,以及每一种本地化级别
        // 本地化级别有
        // PROCESS_LOCAL,进程本地化,rdd的partition和task,进入一个Executor内,速度当然快
        // NODE_LOCAL,dd的partition和task,不在一个Executor重,不在一个进程,但是在一个worker节点上
        // NO_PREF,无,没有所谓的本地化级别
        // RACK_LOCAL,机架本地化,至少rdd的partition和task,在一个机架上
        // ANY,任意的本地化级别
        // 这几种本地化级别 是从小到大排列的
     
        var launchedTask = false
        // 对每一个taskSet,从最好的一种本地化级别,开始遍历
        for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
          do {
            // 对当前taskSet,尝试优先使用最小的本地化级别,将taskset的task,在Executor上进行启动
            // 如果启动不了,那么就跳出这个do while循环,进入下一种本地化级别,也就是放大本地化级别
            // 以此类推,直到尝试将taskset在某些本地化级别下,在task在Executor上全部启动
            launchedTask = resourceOfferSingleTaskSet(
                taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
          } while (launchedTask)
        }
     
        if (tasks.size > 0) {
          hasLaunchedTask = true
        }
        return tasks
      }
    
    
    
    
    
    
    
    ###org.apache.spark.scheduler/TaskSchedulerImpl.scala
    ###resourceOfferSingleTaskSet()
    
      private def resourceOfferSingleTaskSet(
          taskSet: TaskSetManager,
          maxLocality: TaskLocality,
          shuffledOffers: Seq[WorkerOffer],
          availableCpus: Array[Int],
          tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
        var launchedTask = false
        // 遍历所有Executor
        for (i <- 0 until shuffledOffers.size) {
          val execId = shuffledOffers(i).executorId
          val host = shuffledOffers(i).host
          //  如果当前Executor的cpu数量大于每个task要使用的cpu数量,默认是1
          if (availableCpus(i) >= CPUS_PER_TASK) {
            try {
              // 调用taskSetManager的resourceOffer方法,去找到,在这个Executor,用这种本地化级别,taskset的哪些task可以启动
              // resourceOffer()方法,就是说,会去判断这个task在这个这个本地化级别,之前的等待时间是多少,如果说,本地化级别的等待时间在一定范围内
              // 那么就认为task使用本地化级别可以在executor上启动
              for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
                tasks(i) += task
                val tid = task.taskId
                taskIdToTaskSetId(tid) = taskSet.taskSet.id
                taskIdToExecutorId(tid) = execId
                executorsByHost(host) += execId
                availableCpus(i) -= CPUS_PER_TASK
                assert(availableCpus(i) >= 0)
                launchedTask = true
              }
            } catch {
              case e: TaskNotSerializableException =>
                logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
                // Do not offer resources for this task, but don't throw an error to allow other
                // task sets to be submitted.
                return launchedTask
            }
          }
        }
        return launchedTask
      }
    
    
    
    
    
    
    ###org.apache.spark.scheduler.cluster/CoarseGrainedSchedulerBackend.scala
    ###launchTasks()方法
    
     // Launch tasks returned by a set of resource offers
        // 根据分配好的情况,去Executor上启动相应的task
        def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
          for (task <- tasks.flatten) {
            // 首先将每个Executor要执行的task信息,统一进行序列化操作
            val ser = SparkEnv.get.closureSerializer.newInstance()
            val serializedTask = ser.serialize(task)
            if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
              val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
              scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
                try {
                  var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                    "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                    "spark.akka.frameSize or using broadcast variables for large values."
                  msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                    AkkaUtils.reservedSizeBytes)
                  taskSet.abort(msg)
                } catch {
                  case e: Exception => logError("Exception in error callback", e)
                }
              }
            }
            else {
              // 找到对应的executor
              val executorData = executorDataMap(task.executorId)
              // 给executor上的资源,减去要使用的cpu资源
              executorData.freeCores -= scheduler.CPUS_PER_TASK
              // 向executor发送LaunchTask消息,来在executor上启动task
              executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
            }
          }
        }
  • 相关阅读:
    [USACO13NOV] Pogo-Cow
    《高性能Mysql》讲聚簇索引
    复合索引底层实现
    数据库索引实现(B+,B-,hash)
    B+树,B树,聚集索引,非聚集索引
    MySQL存储引擎
    synchronized实现原理
    【1】线程池的使用
    CompletionService
    原型模式
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11233559.html
Copyright © 2011-2022 走看看