zoukankan      html  css  js  c++  java
  • Spark2.4.0源码——TaskScheduler

      概述

      TaskScheduler定义了对任务进行调度的接口规范,目前spark只有taskSchedulerImpl一个实现类,用于接收DAGScheduler发送的taskSets,并按照资源调度算法将资源分配给task并提交task到executor上执行。

      TaskSchedulerImpl通过taskSetManager来实现任务的推测执行和task本地性分配,任务推测执行就是当发现有运行较慢的task时,将该task发送到其他executor上执行,采用最先完成的执行结果,减少运行较慢的task对整个任务进度的影响;task本地性算法可以将task发送到与该task将要处理的数据所在节点最近的executor,减少网络数据传输,spark目前支持5种本地性级别:PROCESS_LOCAL(本地进程)、NODE_LOCAL(本地节点)、NO_PREF(没有偏好)、RACK_LOCAL(本地机架)、ANY(任何)。

      TaskSchedulerImpl还依赖一个后端接口SchedulerBackend,给task分配资源实际上是由这个后端接口完成的。

      TaskSchedulerImpl的初始化和启动

      spark程序创建的sparkContext内部会创建TaskSchedulerImpl并调用TaskSchedulerImpl的initialize和start方法。

      taskSchedulerImpl的属性之一,在创建taskSchedulerImpl的时候会创建根调度池,taskSchedulerImpl对task的调度依赖调度池Pool,需要被调度的task都会被放入调度池中,调度池pool根据调度算法(FIFO/FAIR)对taskSet调度,并将被调度的taskSet交给taskSchedulerImpl进行资源调度。

    val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

       initialize和start方法

    def initialize(backend: SchedulerBackend) {
        //将sparkContext内创建的SchedulerBackend赋给backend属性
        this.backend = backend
        //根据不同的schedulingMode创建不同的调度池构建器
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
            case _ =>
              throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
                s"$schedulingMode")
          }
        }
        //构建调度池
        schedulableBuilder.buildPools()
      }
      
    override def start() {
        //启动schedulerBackend
        backend.start()
        //如果应用不是在local模式且开启了推测执行,设置一个执行间隔为SPECULATION_INTERVAL_MS(100ms)的检查可推测执行任务的定时器
        if (!isLocal && conf.getBoolean("spark.speculation", false)) {
          logInfo("Starting speculative execution thread")
          speculationScheduler.scheduleWithFixedDelay(new Runnable {
            override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
              //检查可推测执行任务
              checkSpeculatableTasks()
            }
          }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
        }
      }
    View Code

      Task提交

      DAGScheduler在调用submitMissingTasks提交taskSet时,内部调用taskScheduler.submitTask方法,实现如下:

    override def submitTasks(taskSet: TaskSet) {
        //获取task
        val tasks = taskSet.tasks
        logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
        this.synchronized {
          val manager = createTaskSetManager(taskSet, maxTaskFailures)
          val stage = taskSet.stageId
          ////对当前TaskSet进行冲突检查,taskSetsByStageIdAndAttempt中不该有同属于当前stage但TaskSet却不同的情况
          val stageTaskSets =
            taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
          stageTaskSets(taskSet.stageAttemptId) = manager
          val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
            ts.taskSet != taskSet && !ts.isZombie
          }
          if (conflictingTaskSet) {
            throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
              s" ${
                stageTaskSets.toSeq.map {
                  _._2.taskSet.id
                }.mkString(",")
              }")
          }
          
          //将刚创建的taskSetManager添加到调度池构建器创建的调度池中
          schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
          
          //如果程序不是local模式且还未接收到task,就设置一个定时器按照STARVATION_TIMEOUT_MS指定的间隔检查taskSchedulerImpl的饥饿状况
          if (!isLocal && !hasReceivedTask) {
            starvationTimer.scheduleAtFixedRate(new TimerTask() {
              override def run() {
                if (!hasLaunchedTask) {
                  logWarning("Initial job has not accepted any resources; " +
                    "check your cluster UI to ensure that workers are registered " +
                    "and have sufficient resources")
                } else {
                  this.cancel()
                }
              }
            }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
          }
          //表示taskSchedulerImpl已经接收到task
          hasReceivedTask = true
        }
        //给task分配资源并运行task
        backend.reviveOffers()
      }
    View Code

      reviveOffers()方法是SchedulerBackend用于给task分配资源并运行task,这个后端接口并不由taskSchedulerImpl创建,而是在sparkContext创建时根据提交模式创建的SchedulerBackend的不同实现类传递给TaskSchedulerImpl的。

      以local模式为例,localSchedulerBackend的reviveoffers方法实际上会向LocalEndpoint发送reviveOffers事件,LocalEndpoint再调用自己的reviveOffers方法,内部再调用TaskSchedulerImpl的resourceOffer方法给task分配资源,最后调用Executor.launchTask加载并尝试执行task,实现如下:

    def reviveOffers() {
        //创建包含一个workerOffer样例类的序列,workerOffer的localExecutorId为driver、localExecutorHostname为localhost、cores为1
        val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
          Some(rpcEnv.address.hostPort)))
        //调用taskSchedulerImpl.resourceOffers给task分配资源
        for (task <- scheduler.resourceOffers(offers).flatten) {
          //将空闲的CPU内核数-1
          freeCores -= scheduler.CPUS_PER_TASK
          //调用executor.launchTask方法加载task并执行
          executor.launchTask(executorBackend, task)
        }
      }
    }
    View Code

      资源分配

      TaskSchedulerImpl拿到包含WorkerOffers样例类的序列后会进行预处理,如更新host与executor、机架的映射关系用于task数据本地性的计算,workeroffers的随机shuffle保证任务均匀分配在worker节点,统计worker可用资源等,随后调用自己的ResourceOfferSingleTaskSet方法给TaskSet提供资源;

      ResourceOfferSingleTaskSet方法会获取WorkerOffers内的信息,如executor的身份标识、workerOffer的host,再对workerOffer的CPU检查,如果可用CPU大于task所需的CPU数,则执行以下操作:

      1、调用TaskSetManager的resourceOffer方法给待处理的task按照最大本地性创建TaskDescription

      2、将TaskDescription添加到tasks数组

      3、更新task的身份标识与taskSet、Executor的映射关系缓存

      4、CPU重计算,WorkerOffer的可用CPU减去task所用的CPU数,最后返回launchedTask,即是否给taskSet中的task分配了资源

      TaskSetManager.resourceOffer方法实现:

    def resourceOffer(
          execId: String,
          host: String,
          maxLocality: TaskLocality.TaskLocality)
        : Option[TaskDescription] =
      {
        //获取黑名单
        val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
          blacklist.isNodeBlacklistedForTaskSet(host) ||
            blacklist.isExecutorBlacklistedForTaskSet(execId)
        }
        //如果taskSetManager不是僵尸状态且要分配task的host和executor不是黑名单,执行以下操作
        if (!isZombie && !offerBlacklisted) {
          //获取执行时间和最大本地性
          val curTime = clock.getTimeMillis()
          var allowedLocality = maxLocality
    
          //计算允许的最大本地性级别,如果最大本地性级别是NO_PREF,则允许的最大本地性级别为NO_PREF
          //否则最大本地性级别为最大本地性级别maxLocality和getAllowedLocalityLevel获取的本地级别中的最小值
          if (maxLocality != TaskLocality.NO_PREF) {
            allowedLocality = getAllowedLocalityLevel(curTime)
            if (allowedLocality > maxLocality) {
              // We're not allowed to search for farther-away tasks
              allowedLocality = maxLocality
            }
          }
    
          //调用dequeueTask方法根据指定的host,executor和本地性级别,找出要执行的task的索引、相应的本地性和是否推断执行,返回三元组
          dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
            // Found a task; do some bookkeeping and return a task description
            //根据要执行的task的索引找到task
            val task = tasks(index)
            //为task生成新的身份标识
            val taskId = sched.newTaskId()
            // Do various bookkeeping
            //增加复制运行数
            copiesRunning(index) += 1
            //获取任务尝试号
            val attemptNum = taskAttempts(index).size
            //创建task尝试信息
            val info = new TaskInfo(taskId, index, attemptNum, curTime,
              execId, host, taskLocality, speculative)
            //更新task与task尝试信息的映射关系
            taskInfos(taskId) = info
            taskAttempts(index) = info :: taskAttempts(index)
            // Update our locality level for delay scheduling
            // NO_PREF will not affect the variables related to delay scheduling
            //如果最大本地性级别不是NO_PREF
            if (maxLocality != TaskLocality.NO_PREF) {
              //获取任务的本地性级别并将最后运行时间设置为当前系统时间
              currentLocalityIndex = getLocalityIndex(taskLocality)
              lastLaunchTime = curTime
            }
            // Serialize and return the task
            //序列化task
            val serializedTask: ByteBuffer = try {
              ser.serialize(task)
            } catch {
              // If the task cannot be serialized, then there's no point to re-attempt the task,
              // as it will always fail. So just abort the whole task-set.
              case NonFatal(e) =>
                val msg = s"Failed to serialize task $taskId, not attempting to retry it."
                logError(msg, e)
                abort(s"$msg Exception during serialization: $e")
                throw new TaskNotSerializableException(e)
            }
            
            //task大小检查
            if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
              !emittedTaskSizeWarning) {
              emittedTaskSizeWarning = true
              logWarning(s"Stage ${task.stageId} contains a task of very large size " +
                s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
                s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
            }
            
            //向runningTaskSet中添加task的身份标识,并增加调度池中记录的当前运行中任务的数量
            addRunningTask(taskId)
    
            // We used to log the time it takes to serialize the task, but task size is already
            // a good proxy to task serialization time.
            // val timeTaken = clock.getTime() - startTime
            //生成task名称
            val taskName = s"task ${info.id} in stage ${taskSet.id}"
            logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
              s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")
      
            /**
              * def taskStarted(task: Task[_], taskInfo: TaskInfo) {
              *     eventProcessLoop.post(BeginEvent(task, taskInfo))
              * }
              * 向DAGSchedulerEventProcessLoop投递BeginEvent事件
              */
            sched.dagScheduler.taskStarted(task, info)
            //创建并返回taskDescription对象
            new TaskDescription(
              taskId,
              attemptNum,
              execId,
              taskName,
              index,
              task.partitionId,
              addedFiles,
              addedJars,
              task.localProperties,
              serializedTask)
          }
        } else {
          None
        }
      }
    View Code
     
  • 相关阅读:
    leetcode236
    leetcode139
    leetcode56
    leetcode19
    2018-5-22-SublimeText-粘贴图片保存到本地
    2019-1-29-Sublime-Text-安装中文、英文字体
    2019-1-29-Sublime-Text-安装中文、英文字体
    2018-8-15-WPF-插拔触摸设备触摸失效
    2018-8-15-WPF-插拔触摸设备触摸失效
    2019-10-18-dotnet-修复找不到-System.ServiceProcess-定义
  • 原文地址:https://www.cnblogs.com/cnblogs-syui/p/11133475.html
Copyright © 2011-2022 走看看