zoukankan      html  css  js  c++  java
  • Spark Scheduler 模块(下)

    Scheduler 模块中最重要的两个类是 DAGScheduler 和 TaskScheduler。上篇讲了 DAGScheduler,这篇讲 TaskScheduler。

    TaskScheduler

    前面提到,在 SparkContext 初始化的过程中,根据 master 的类型分别创建不同的 TaskScheduler 的实现。当 master 为 local, spark, mesos 时创建 TaskSchedulerImpl,当 master 为 YARN 时,创建其他的实现,读者可以自行研究。

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)
    
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
    
      case mesosUrl @ MESOS_REGEX(_) =>
        MesosNativeLibrary.load()
        val scheduler = new TaskSchedulerImpl(sc)
        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
        val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
        val backend = if (coarseGrained) {
          new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager)
        } else {
          new MesosSchedulerBackend(scheduler, sc, url)
        }
        scheduler.initialize(backend)
        (backend, scheduler)
    
      ....
    }

    此时细心的读者就会有疑问了, TaskScheduler 需要将任务调度在不同的资源管理平台上(local, spark, mesos),怎么就能使用同一个 TaskSchedulerImpl 呢?注意这里有个很重要的成员 backend。每种 master 对应的 backend 都不一样,而正是这个 backend 负责与资源管理平台通信。

    因为这个层面的调度,需要跟资源管理器通信了,所以也会部分的涉及到 deploy 模块和 executor 模块的内容。因为 Local 模式的过于简单(本地启动多线程处理 task),而 YARN 和 Mesos 需要编程接口相关的背景知识,这里我们选择 SparkDeploySchedulerBackend 着重分析。这是 Spark 自身实现的资源管理系统,有些读者可能已经搭建和使用过。

    TaskSchedulerImpl 的启动

    在 SparkContext 中(上面代码),首先创建了 TaskSchedulerImpl 和 SparkDeploySchedulerBackend,并将 backend 传入 TaskSchedulerImpl。之后启动了 TaskSchedulerImpl。

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

    TaskSchedulerImpl.start 方法主要是调用了 backend.start()。SparkDeploySchedulerBackend 的启动首先调用父类 CoarseGrainedSchedulerBackend 的 start 方法,其中创建了一个 driverEndpoint,它是一个本地的 driver,以 RPC 的方式与其他 executor 通信。

    // CoarseGrainedSchedulerBackend.scala
    override
    def start() { driverEndpoint = rpcEnv.setupEndpoint( CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties)) }
    // SparkDeploySchedulerBackend.scala
    override def start() {
      super.start()
    
      // The endpoint for executors to talk to us
      val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
          RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
          CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
      val args = Seq(
        "--driver-url", driverUrl,
        "--executor-id", "{{EXECUTOR_ID}}",
        "--hostname", "{{HOSTNAME}}",
        "--cores", "{{CORES}}",
        "--app-id", "{{APP_ID}}",
        "--worker-url", "{{WORKER_URL}}")  
      ....
      val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
          args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
      val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
          command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
      client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
      client.start()
      waitForRegistration()
    }

    这里的创建了一个 client:AppClient,它会连接到 masters(spark://master:7077) 上,附带一条命令 command,用来启动 executor,改命令还有 driver-url 的参数。如此 executor 启动时就能自动连接上 driver。

    至此,TaskSchedulerImpl 和 SparkDeploySchedulerBackend 的启动过程已经完成。主要做了两件事情,启动 local driver,通知 masters 启动 executors。并且 driver 和 executors 使用了 RPC 通信。

    注:至于 executor 如何启动,等待分析 deploy 和 executor 模块的时候再仔细分析。

    TaskSchedulerImpl 提交任务

    在上一篇中,我们说到 DAGScheduler 最后调用了 taskScheduler.submitTasks 提交任务。下面继续上篇的分析:

    override def submitTasks(taskSet: TaskSet) {
      val tasks = taskSet.tasks
      this.synchronized {
        val manager = createTaskSetManager(taskSet, maxTaskFailures)
        val stage = taskSet.stageId
        val stageTaskSets =
          taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
        stageTaskSets(taskSet.stageAttemptId) = manager
        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) // 将任务加入到变量 rootPool 中
      }
      backend.reviveOffers()
    }

    这里将 TaskSet 再包装成 TaskSetManager,加入到 schedulableBuilder 中。顺便提一下,schedulableBuilder 是 Spark 的调度策略实现,有 FIFO 和 FAIR 两种,默认的是 FIFO。它们最终都是把 TaskSetManager 放到了 rootPool 中。

    然后调用了 backend.reviveOffers,这里有个较转折的调用关系。因为 SparkDeploySchedulerBackend 没有方法 reviveOffers,所以是调用了其父类 CoarseGrainedSchedulerBackend 的同名方法。而 CoarseGrainedSchedulerBackend.reviveOffers 实现中只有一行,即

    override def reviveOffers() {
      driverEndpoint.send(ReviveOffers)
    }

    而 DriverEndpoint 接收到 ReviceOffers,调用了 makeOffers 方法:

    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
      val workOffers = activeExecutors.map { case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }

    整个调用关系为 TaskScheduler.submitTasks() -> CoarseGrainedSchedulerBackend.reviveOffers() -> RpcEndpointRef.send(ReviveOffers) ->DriverEndpoint.ReviceOffers -> DriverEndpoint.makeOffers

    这里调用 TaskSchedulerImpl 的方法 resourceOffers,该方法给任务分配计算资源。接着调用了 CoarseGrainedSchedulerBackend.launchTasks,真正向 executor 发送计算任务。

    // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task) // 序列化 task
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          // 序列化结果超过上限,报警
        }
        else {
          val executorData = executorDataMap(task.executorId) // 选择一个 executor 执行 task
          executorData.freeCores -= scheduler.CPUS_PER_TASK // 标记 executor 的 CPU 资源被占用了一部分
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) // 向 executor 的 RPC 服务器发送执行 task 信息
        }
      }
    }

    上文提到,TaskSchedulerImpl 启动时,masters 也启动了 executor,具体的启动方法是 org.apache.spark.executor.CoarseGrainedExecutorBackend。所以 executor 接受消息的方法也在该类中:

    // org.apache.spark.executor.CoarseGrainedExecutorBackend
    override def receive: PartialFunction[Any, Unit] = {
      case LaunchTask(data) =>
        if (executor == null) {
          logError("Received LaunchTask command but executor was null")
          System.exit(1)
        } else {
          val taskDesc = ser.deserialize[TaskDescription](data.value) // 发序列化
          logInfo("Got assigned task " + taskDesc.taskId)
          executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
            taskDesc.name, taskDesc.serializedTask)
        }
      ...
    }

    executor 执行 task:

    // org.apache.spark.executor.Executor
    def launchTask(
        context: ExecutorBackend,
        taskId: Long,
        attemptNumber: Int,
        taskName: String,
        serializedTask: ByteBuffer): Unit = {
      val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) // 创建线程执行 task
      runningTasks.put(taskId, tr)
      threadPool.execute(tr)
    }

    TaskRunner 使用 ClassLoader 中从字节中加载 Task,并执行得到结果,把结果序列化使用 RPC 返回。

    // org.apache.spark.executor.Executor.TaskRunner
    override def run(): Unit = {
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) // executor 通知 driver,正在执行 task
      try {
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) // 反序列化出依赖文件,jar包,任务自身
        updateDependencies(taskFiles, taskJars)
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
        task.setTaskMemoryManager(taskMemoryManager)
    
        val (value, accumUpdates) = try {
          val res = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) // 执行 task
          res
        } finally {
          ...
        }
    
        val resultSer = env.serializer.newInstance()
        val valueBytes = resultSer.serialize(value) // 序列化结果
    
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) // 以 RPC 的方式返回结果给 driver
    } catch {
      ...
    }

    至此 task 在 TaskSchedulerImpl 内运行的流程有了一个大致介绍。其中略过了很多分支,但不影响读者对整体流程的理解。

    总结

    Spark Scheduler 模块上下两篇对 Spark 的调度逻辑按照执行的顺序有了一个概括的介绍。

    Scheduler 模块的代码架构充分体现了分层和隔离的设计哲学。首先 DAGScheduler 是 Spark 独有的逻辑,而 TaskScheduler 则因资源调度器而各不相同,所以把调度部分割裂成这两部分,前者只需一种实现,而后者可以在不同平台各自实现。即便是 TaskScheduler,在多种平台上也有共性,所以 TaskSchedulerImpl 也是一个较通用的实现,只是与资源调度器的通信部分使用了不同的 backend。

  • 相关阅读:
    VSCode添加git bash作为默认终端
    Git无法提交branch is currently checked out
    Excel创建下拉列表限制数据有效性
    Windows添加管理员用户
    从Windows10中彻底删除【3D对象】文件夹
    异常处理机制
    泛型
    Java集合
    String、StringBuffer、StringBulider
    System类与Runtime类
  • 原文地址:https://www.cnblogs.com/keepthinking/p/4854947.html
Copyright © 2011-2022 走看看