zoukankan      html  css  js  c++  java
  • TaskScheduler

    一初始化

    在SparkContext初始化的时候,同时初始化三个对象。DAGScheduler,TaskScheduler,SchedulerBackend。DAGScheduler,前面已经讲到,做stage的划分及每个stage对应的task划分及任务提交,SchedulerBackend用于资源划分。本节讲TaskScheduler。

      private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend
    
      private[spark] def taskScheduler: TaskScheduler = _taskScheduler
      private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
        _taskScheduler = ts
      }
    
      private[spark] def dagScheduler: DAGScheduler = _dagScheduler
      private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
        _dagScheduler = ds
      }

    具体到TaskScheduler, 是在createTaskScheduler函数中,使用模式匹配分别对各种情况下创建三个对象。 

     master match {
          case "local" =>case LOCAL_N_REGEX(threads) =>case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>case SPARK_REGEX(sparkUrl) =>
            val scheduler = new TaskSchedulerImpl(sc)
            val masterUrls = sparkUrl.split(",").map("spark://" + _)
            val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
            case masterUrl =>
        }

    在cluster集群模式下,TaskScheduler得到了TaskSchedulerImpl实例,SchedulerBackend得到了StandaloneSchedulerBackend实例。

    2构建调度池

    def initialize(backend: SchedulerBackend) {
        this.backend = backend
        // temporarily set rootPool name to empty
        rootPool = new Pool("", schedulingMode, 0, 0)
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
            case _ =>
              throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
          }
        }
        schedulableBuilder.buildPools()
      }

    在生成这两个对象后,接下来将backend传入scheduler的初始化方法中进行初始化,TaskSchedulerImpl.initialize方法中主要是根据调度模式初始化调度池。

    spark中的调度模式主要有两种:FIFO和FAIR。默认情况下Spark的调度模式是FIFO(先进先出),谁先提交谁先执行,后面的任务需要等待前面的任务执行。而FAIR(公平调度)模式支持在调度池中为任务进行分组,不同的调度池权重不同,任务可以按照权重来决定执行顺序。

    3启动

    初始化成功,资源调度初始化成功,启动TaskScheduler.

    override def submitTasks(taskSet: TaskSet) {
        val tasks = taskSet.tasks
        logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
        this.synchronized {
          // 生成一个TaskSetManager类型对象,
          // task最大重试次数,由参数spark.task.maxFailures设置,默认为4
          val manager = createTaskSetManager(taskSet, maxTaskFailures)
          val stage = taskSet.stageId
          // key为stageId,value为一个HashMap,这个HashMap中的key为stageAttemptId,value为TaskSetManager对象
          val stageTaskSets =
            taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
          stageTaskSets(taskSet.stageAttemptId) = manager
          // 如果当前这个stageId对应的HashMap[Int, TaskSetManager]中存在某个taskSet
          // 使得当前的taskSet和这个taskSet不是同一个,并且当前这个TaskSetManager不是zombie进程
          // 即对于同一个stageId,如果当前这个TaskSetManager不是zombie进程,即其中的tasks需要运行,
          // 并且对当前stageId,有两个不同的taskSet在运行
          // 那么就应该抛出异常,确保同一个Stage在正常运行情况下不能有两个taskSet在运行
          val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
            ts.taskSet != taskSet && !ts.isZombie
          }
          if (conflictingTaskSet) {
            throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
              s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
          }
          // 根据调度模式生成FIFOSchedulableBuilder或者FairSchedulableBuilder,将当前的TaskSetManager提交到调度池中
          schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    
          if (!isLocal && !hasReceivedTask) {
            starvationTimer.scheduleAtFixedRate(new TimerTask() {
              override def run() {
                if (!hasLaunchedTask) {
                  logWarning("Initial job has not accepted any resources; " +
                    "check your cluster UI to ensure that workers are registered " +
                    "and have sufficient resources")
                } else {
                  this.cancel()
                }
              }
            }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
          }
          hasReceivedTask = true
        }
        // 向schedulerBackend申请资源
        backend.reviveOffers()
      }

    TaskSetManager

    每个taskset都会创建一个对应的TaskManager来管理其生命周期。通过参数指定如果任务失败后的重试次数。

  • 相关阅读:
    Java多线程总结
    Linux命令总结
    Java笔记
    JDK7和JDK8一些重要新特性
    第八周(11.04-11.10)----每周报告
    第八周(11.04-11.10)----结对项目----逆波兰
    第八周(11.04-11.10)----个人作业----历年学生作品点评
    第七周PSP(10.27-11.03)
    第七周(10.27-11.03)----补交第六周(10.20-26)每周例行报告
    个人项目----词频统计WEB(部分功能)
  • 原文地址:https://www.cnblogs.com/eryuan/p/7229562.html
Copyright © 2011-2022 走看看