zoukankan      html  css  js  c++  java
  • Spark Scheduler 模块(下)

    Scheduler 模块中最重要的两个类是 DAGScheduler 和 TaskScheduler。上篇讲了 DAGScheduler,这篇讲 TaskScheduler。

    TaskScheduler

    前面提到,在 SparkContext 初始化的过程中,根据 master 的类型分别创建不同的 TaskScheduler 的实现。当 master 为 local, spark, mesos 时创建 TaskSchedulerImpl,当 master 为 YARN 时,创建其他的实现,读者可以自行研究。

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)
    
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
    
      case mesosUrl @ MESOS_REGEX(_) =>
        MesosNativeLibrary.load()
        val scheduler = new TaskSchedulerImpl(sc)
        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
        val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
        val backend = if (coarseGrained) {
          new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager)
        } else {
          new MesosSchedulerBackend(scheduler, sc, url)
        }
        scheduler.initialize(backend)
        (backend, scheduler)
    
      ....
    }

    此时细心的读者就会有疑问了, TaskScheduler 需要将任务调度在不同的资源管理平台上(local, spark, mesos),怎么就能使用同一个 TaskSchedulerImpl 呢?注意这里有个很重要的成员 backend。每种 master 对应的 backend 都不一样,而正是这个 backend 负责与资源管理平台通信。

    因为这个层面的调度,需要跟资源管理器通信了,所以也会部分的涉及到 deploy 模块和 executor 模块的内容。因为 Local 模式的过于简单(本地启动多线程处理 task),而 YARN 和 Mesos 需要编程接口相关的背景知识,这里我们选择 SparkDeploySchedulerBackend 着重分析。这是 Spark 自身实现的资源管理系统,有些读者可能已经搭建和使用过。

    TaskSchedulerImpl 的启动

    在 SparkContext 中(上面代码),首先创建了 TaskSchedulerImpl 和 SparkDeploySchedulerBackend,并将 backend 传入 TaskSchedulerImpl。之后启动了 TaskSchedulerImpl。

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

    TaskSchedulerImpl.start 方法主要是调用了 backend.start()。SparkDeploySchedulerBackend 的启动首先调用父类 CoarseGrainedSchedulerBackend 的 start 方法,其中创建了一个 driverEndpoint,它是一个本地的 driver,以 RPC 的方式与其他 executor 通信。

    // CoarseGrainedSchedulerBackend.scala
    override
    def start() { driverEndpoint = rpcEnv.setupEndpoint( CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties)) }
    // SparkDeploySchedulerBackend.scala
    override def start() {
      super.start()
    
      // The endpoint for executors to talk to us
      val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
          RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
          CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
      val args = Seq(
        "--driver-url", driverUrl,
        "--executor-id", "{{EXECUTOR_ID}}",
        "--hostname", "{{HOSTNAME}}",
        "--cores", "{{CORES}}",
        "--app-id", "{{APP_ID}}",
        "--worker-url", "{{WORKER_URL}}")  
      ....
      val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
          args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
      val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
          command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
      client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
      client.start()
      waitForRegistration()
    }

    这里的创建了一个 client:AppClient,它会连接到 masters(spark://master:7077) 上,附带一条命令 command,用来启动 executor,改命令还有 driver-url 的参数。如此 executor 启动时就能自动连接上 driver。

    至此,TaskSchedulerImpl 和 SparkDeploySchedulerBackend 的启动过程已经完成。主要做了两件事情,启动 local driver,通知 masters 启动 executors。并且 driver 和 executors 使用了 RPC 通信。

    注:至于 executor 如何启动,等待分析 deploy 和 executor 模块的时候再仔细分析。

    TaskSchedulerImpl 提交任务

    在上一篇中,我们说到 DAGScheduler 最后调用了 taskScheduler.submitTasks 提交任务。下面继续上篇的分析:

    override def submitTasks(taskSet: TaskSet) {
      val tasks = taskSet.tasks
      this.synchronized {
        val manager = createTaskSetManager(taskSet, maxTaskFailures)
        val stage = taskSet.stageId
        val stageTaskSets =
          taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
        stageTaskSets(taskSet.stageAttemptId) = manager
        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) // 将任务加入到变量 rootPool 中
      }
      backend.reviveOffers()
    }

    这里将 TaskSet 再包装成 TaskSetManager,加入到 schedulableBuilder 中。顺便提一下,schedulableBuilder 是 Spark 的调度策略实现,有 FIFO 和 FAIR 两种,默认的是 FIFO。它们最终都是把 TaskSetManager 放到了 rootPool 中。

    然后调用了 backend.reviveOffers,这里有个较转折的调用关系。因为 SparkDeploySchedulerBackend 没有方法 reviveOffers,所以是调用了其父类 CoarseGrainedSchedulerBackend 的同名方法。而 CoarseGrainedSchedulerBackend.reviveOffers 实现中只有一行,即

    override def reviveOffers() {
      driverEndpoint.send(ReviveOffers)
    }

    而 DriverEndpoint 接收到 ReviceOffers,调用了 makeOffers 方法:

    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
      val workOffers = activeExecutors.map { case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }

    整个调用关系为 TaskScheduler.submitTasks() -> CoarseGrainedSchedulerBackend.reviveOffers() -> RpcEndpointRef.send(ReviveOffers) ->DriverEndpoint.ReviceOffers -> DriverEndpoint.makeOffers

    这里调用 TaskSchedulerImpl 的方法 resourceOffers,该方法给任务分配计算资源。接着调用了 CoarseGrainedSchedulerBackend.launchTasks,真正向 executor 发送计算任务。

    // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task) // 序列化 task
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          // 序列化结果超过上限,报警
        }
        else {
          val executorData = executorDataMap(task.executorId) // 选择一个 executor 执行 task
          executorData.freeCores -= scheduler.CPUS_PER_TASK // 标记 executor 的 CPU 资源被占用了一部分
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) // 向 executor 的 RPC 服务器发送执行 task 信息
        }
      }
    }

    上文提到,TaskSchedulerImpl 启动时,masters 也启动了 executor,具体的启动方法是 org.apache.spark.executor.CoarseGrainedExecutorBackend。所以 executor 接受消息的方法也在该类中:

    // org.apache.spark.executor.CoarseGrainedExecutorBackend
    override def receive: PartialFunction[Any, Unit] = {
      case LaunchTask(data) =>
        if (executor == null) {
          logError("Received LaunchTask command but executor was null")
          System.exit(1)
        } else {
          val taskDesc = ser.deserialize[TaskDescription](data.value) // 发序列化
          logInfo("Got assigned task " + taskDesc.taskId)
          executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
            taskDesc.name, taskDesc.serializedTask)
        }
      ...
    }

    executor 执行 task:

    // org.apache.spark.executor.Executor
    def launchTask(
        context: ExecutorBackend,
        taskId: Long,
        attemptNumber: Int,
        taskName: String,
        serializedTask: ByteBuffer): Unit = {
      val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) // 创建线程执行 task
      runningTasks.put(taskId, tr)
      threadPool.execute(tr)
    }

    TaskRunner 使用 ClassLoader 中从字节中加载 Task,并执行得到结果,把结果序列化使用 RPC 返回。

    // org.apache.spark.executor.Executor.TaskRunner
    override def run(): Unit = {
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) // executor 通知 driver,正在执行 task
      try {
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) // 反序列化出依赖文件,jar包,任务自身
        updateDependencies(taskFiles, taskJars)
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
        task.setTaskMemoryManager(taskMemoryManager)
    
        val (value, accumUpdates) = try {
          val res = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) // 执行 task
          res
        } finally {
          ...
        }
    
        val resultSer = env.serializer.newInstance()
        val valueBytes = resultSer.serialize(value) // 序列化结果
    
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) // 以 RPC 的方式返回结果给 driver
    } catch {
      ...
    }

    至此 task 在 TaskSchedulerImpl 内运行的流程有了一个大致介绍。其中略过了很多分支,但不影响读者对整体流程的理解。

    总结

    Spark Scheduler 模块上下两篇对 Spark 的调度逻辑按照执行的顺序有了一个概括的介绍。

    Scheduler 模块的代码架构充分体现了分层和隔离的设计哲学。首先 DAGScheduler 是 Spark 独有的逻辑,而 TaskScheduler 则因资源调度器而各不相同,所以把调度部分割裂成这两部分,前者只需一种实现,而后者可以在不同平台各自实现。即便是 TaskScheduler,在多种平台上也有共性,所以 TaskSchedulerImpl 也是一个较通用的实现,只是与资源调度器的通信部分使用了不同的 backend。

  • 相关阅读:
    Codeforces 992C(数学)
    Codeforces 990C (思维)
    Codeforces 989C (构造)
    POJ 1511 Invitation Cards(链式前向星,dij,反向建边)
    Codeforces 1335E2 Three Blocks Palindrome (hard version)(暴力)
    POJ 3273 Monthly Expense(二分)
    POJ 2566 Bound Found(尺取前缀和)
    POJ 1321 棋盘问题(dfs)
    HDU 1506 Largest Rectangle in a Histogram(单调栈)
    POJ 2823 Sliding Window(单调队列)
  • 原文地址:https://www.cnblogs.com/keepthinking/p/4854947.html
Copyright © 2011-2022 走看看