zoukankan      html  css  js  c++  java
  • Spark Executor Task 的执行和数量

    基本原理 (YARN 模式)

    每个 stage 会有多个 partition,每个 partition 由 Executor 的一个 Task 执行

    stage 的默认 partition 数量由 spark.default.parallelism 参数决定,默认由 parent stage 决定

    最大可以同时执行多少 Task,由三个参数决定

    • Executor 的数量,由 spark.executor.instances 或 --num-executors 指定,默认是 1
    • Executor 的核数,由 spark.executor.cores 或 --executor-cores 指定,默认是 1
    • 每个 Task 需要的核数,由 spark.task.cpus 指定,默认是 1

    Executor 的数量,还有 Executor 的核数,参考实际的机器数量和 CPU 数量,但可以配的比机器数和 CPU 数大

    具体流程 (YARN 模式)

    在 DAGScheduler 中

    • 提交 Job 时执行 handleJobSubmitted 函数,handleJobSubmitted 函数调用 submitStage 函数
    • submitStage 函数是一个递归函数,从最后一个 action stage 开始,不断往前寻找 parent stage,将有 parent stage 的 stage 添加到 waitingStages,直到找到第一个 stage,对其执行 submitMissingTasks 函数
    • submitMissingTasks 获取 stage 的所有 partitions,为每个 partitions 创建 task,调用 taskScheduler.submitTasks 提交 stage 的所有 task
    • 完成后再对相应的 waitingStages 再调用 submitStage

    在 TaskSchedulerImpl 中

    • submitTasks 函数添加所有 task 然后调用 backend.reviveOffers() 函数

    在 CoarseGrainedSchedulerBackend 中

    • reviveOffers 调用 driverEndpoint.send(ReviveOffers)
    • Backend 的内部类 DriverEndpoint 收到后调用 makeOffers()
    • makeOffers 函数 scheduler.resourceOffers(workOffers) 分配资源
    • 对能分配到资源的 task 调用 launchTasks(taskDescs)
    • launchTasks 发送 LaunchTask 消息给相应的 CoarseGrainedExecutorBackend
    • CoarseGrainedExecutorBackend 就是真正的 Executor JVM 程序

    在 TaskSchedulerImpl 中

    • resourceOffers 调用 resourceOfferSingleTaskSet
    • resourceOfferSingleTaskSet 判断是否有足够的 CPU 资源

    在 CoarseGrainedExecutorBackend 中

    • 收到 LaunchTask 后,调用 executor.launchTask 启动任务
    • Executor.launchTask,调用线程池执行任务

    每个 Executor 的 Task 最大并发数量,由 Executor 定义的 CPU (默认是 1) 和 Task 定义的 CPU (默认是 1) 决定

    代码

    // DAGScheduler.scala
    
      private[scheduler] def handleJobSubmitted(......) {
        ......
        submitStage(finalStage)
        ......
      }
    
      private def submitStage(stage: Stage) {
        ......
            val missing = getMissingParentStages(stage).sortBy(_.id)
            if (missing.isEmpty) {
              logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
              submitMissingTasks(stage, jobId.get)
            } else {
              for (parent <- missing) {
                submitStage(parent)
              }
              waitingStages += stage
            }
        ......
      }
    
      private def submitMissingTasks(stage: Stage, jobId: Int) {
        ......
        val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    
        val tasks: Seq[Task[_]] = try {
            ......
        }
    
        if (tasks.size > 0) {
          logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
            s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
          taskScheduler.submitTasks(new TaskSet(
            tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
        } else {
          ......
          submitWaitingChildStages(stage)
        }
      }
    
    
    // TaskSchedulerImpl.scala
    
      override def submitTasks(taskSet: TaskSet) {
        ......
        backend.reviveOffers()
      }
    
    // CoarseGrainedSchedulerBackend.scala
    
      override def reviveOffers() {
        driverEndpoint.send(ReviveOffers)
      }
    
      class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) {
        private def makeOffers(executorId: String) {
          // Make sure no executor is killed while some task is launching on it
          val taskDescs = withLock {
            // Filter out executors under killing
            if (executorIsAlive(executorId)) {
              val executorData = executorDataMap(executorId)
              val workOffers = IndexedSeq(
                new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores,
                  Some(executorData.executorAddress.hostPort)))
              scheduler.resourceOffers(workOffers)
            } else {
              Seq.empty
            }
          }
          if (!taskDescs.isEmpty) {
            launchTasks(taskDescs)
          }
        }
      }
    
    // TaskSchedulerImpl.scala
    
      def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
        ......
                launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
                  currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
        ......
      }
    
      private def resourceOfferSingleTaskSet(
        ......
          if (availableCpus(i) >= CPUS_PER_TASK) {
            try {
              for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
                tasks(i) += task
                val tid = task.taskId
                taskIdToTaskSetManager.put(tid, taskSet)
                taskIdToExecutorId(tid) = execId
                executorIdToRunningTaskIds(execId).add(tid)
                availableCpus(i) -= CPUS_PER_TASK
                assert(availableCpus(i) >= 0)
                // Only update hosts for a barrier task.
                if (taskSet.isBarrier) {
                  // The executor address is expected to be non empty.
                  addressesWithDescs += (shuffledOffers(i).address.get -> task)
                }
                launchedTask = true
              }
            }
          }
        ......
      }
    
    
    // CoarseGrainedSchedulerBackend.scala
    
        private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
          ......
              val executorData = executorDataMap(task.executorId)
              executorData.freeCores -= scheduler.CPUS_PER_TASK
    
              logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
                s"${executorData.executorHost}.")
    
              executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
          ......
        }
    
    // CoarseGrainedExecutorBackend.scala
    
      override def receive: PartialFunction[Any, Unit] = {
        ......
        case LaunchTask(data) =>
          if (executor == null) {
            exitExecutor(1, "Received LaunchTask command but executor was null")
          } else {
            val taskDesc = TaskDescription.decode(data.value)
            logInfo("Got assigned task " + taskDesc.taskId)
            executor.launchTask(this, taskDesc)
          }
        ......
    
    
    // Executor.scala
    
      def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
        val tr = new TaskRunner(context, taskDescription)
        runningTasks.put(taskDescription.taskId, tr)
        threadPool.execute(tr)
      }
    
    


  • 相关阅读:
    常用基础命令
    Vim
    Linux目录结构
    稀疏数组
    数据结构概述
    天天用的命令
    Mysql和redis的安装
    回文排列
    URL化
    在word中做复选框打对勾钩
  • 原文地址:https://www.cnblogs.com/moonlight-lin/p/13941440.html
Copyright © 2011-2022 走看看