zoukankan      html  css  js  c++  java
  • Spark源码分析 – SchedulerBackend

    SchedulerBackend, 两个任务, 申请资源和task执行和管理

    对于SparkDeploySchedulerBackend, 基于actor模式, 主要就是启动和管理两个actor
    Deploy.Client Actor, 负责资源申请, 在SparkDeploySchedulerBackend初始化的时候就会被创建, 然后Client会去到Master上注册, 最终完成在Worker上的ExecutorBackend的创建(参考, Spark源码分析 – Deploy), 并且这些ExecutorBackend都会被注册到Driver Actor上
    Driver Actor, 负责task的执行
    由于Spark是原先基于Mesos的, 然后为了兼容性才提供Standalone模式, 所以你可以看到Driver Actor中的接口都是mesos风格的, 在mesos的情况下应该是动态的申请资源, 然后执行task (猜测, 还没有看源码)
    但对于coarse-grained Mesos mode和Spark's standalone deploy mode, 这步被简化成当TaskScheduler初始化的时候, 直接就将资源分配好了, 然后Driver Actor只是负责调度task在这些executor上执行
    所以在makeOffers的注释上, 写的是Make fake resource offers, 因为这里其实没有真正的offer resources
    关于Driver Actor如何调用task去执行, 关键在scheduler.resourceOffers

    SchedulerBackend

    package org.apache.spark.scheduler.cluster
    /**
     * A backend interface for cluster scheduling systems that allows plugging in different ones under
     * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
     * machines become available and can launch tasks on them.
     */
    private[spark] trait SchedulerBackend {
      def start(): Unit
      def stop(): Unit
      def reviveOffers(): Unit
      def defaultParallelism(): Int
    
      // Memory used by each executor (in megabytes)
      protected val executorMemory: Int = SparkContext.executorMemoryRequested
    
      // TODO: Probably want to add a killTask too
    }

     

    StandaloneSchedulerBackend

    用于coarse-grained Mesos mode和Spark's standalone deploy mode
    可用看到主要目的, 就是创建并维护driverActor
    主要的逻辑都在driverActor 中

    /**
     * A standalone scheduler backend, which waits for standalone executors to connect to it through
     * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
     * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
     */
    private[spark]
    class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
      extends SchedulerBackend with Logging
    {
      // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
      var totalCoreCount = new AtomicInteger(0)
    
      class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
      // ……后面分析
      }
    
      var driverActor: ActorRef = null
      val taskIdsOnSlave = new HashMap[String, HashSet[String]]
    
      override def start() {
        val properties = new ArrayBuffer[(String, String)]
        val iterator = System.getProperties.entrySet.iterator
        while (iterator.hasNext) {
          val entry = iterator.next
          val (key, value) = (entry.getKey.toString, entry.getValue.toString)
          if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
            properties += ((key, value))
          }
        }
        driverActor = actorSystem.actorOf( // 关键就是创建driverActor
          Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
      }
    
      private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
    
      override def stop() {
        try {
          if (driverActor != null) {
            val future = driverActor.ask(StopDriver)(timeout) // 关闭driverActor
            Await.result(future, timeout)
          }
        } catch {
          case e: Exception =>
            throw new SparkException("Error stopping standalone scheduler's driver actor", e)
        }
      }
    
      override def reviveOffers() {
        driverActor ! ReviveOffers  // 发送ReviveOffers event给driverActor 
      }
    
      override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
          .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
    
      // Called by subclasses when notified of a lost worker
      def removeExecutor(executorId: String, reason: String) {
        try {
          val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
          Await.result(future, timeout)
        } catch {
          case e: Exception =>
            throw new SparkException("Error notifying standalone scheduler's driver actor", e)
        }
      }
    }


    DriverActor

    关键的函数, makeOffers, 在executors上launch tasks, 什么时候调用?
    RegisterExecutor的时候,
    Task StatusUpdate的时候,
    收到ReviveOffers event的时候, 新的task被submit的时候, delay scheduling被触发的时候(per second)
    关于delay scheduling, 应该是为了保持活度, 当没有任何状态变化时, 仍然需要继续保持launch tasks

      class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
        private val executorActor = new HashMap[String, ActorRef] // track所有executorActor Ref
        private val executorAddress = new HashMap[String, Address]
        private val executorHost = new HashMap[String, String]
        private val freeCores = new HashMap[String, Int]
        private val actorToExecutorId = new HashMap[ActorRef, String]
        private val addressToExecutorId = new HashMap[Address, String]
    
        override def preStart() {
          // Listen for remote client disconnection events, since they don't go through Akka's watch()
          context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
    
          // Periodically revive offers to allow delay scheduling to work
          val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
          context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
        }
    
        def receive = {
          case RegisterExecutor(executorId, hostPort, cores) =>  // 接收从StandaloneExecutorBackend发来的RegisterExecutor
            Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
            if (executorActor.contains(executorId)) {
              sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
            } else {
              logInfo("Registered executor: " + sender + " with ID " + executorId)
              sender ! RegisteredExecutor(sparkProperties)
              context.watch(sender) // watch executor actor
              executorActor(executorId) = sender
              executorHost(executorId) = Utils.parseHostPort(hostPort)._1
              freeCores(executorId) = cores
              executorAddress(executorId) = sender.path.address
              actorToExecutorId(sender) = executorId
              addressToExecutorId(sender.path.address) = executorId
              totalCoreCount.addAndGet(cores)
              makeOffers()
            }
    
          case StatusUpdate(executorId, taskId, state, data) =>
            scheduler.statusUpdate(taskId, state, data.value)
            if (TaskState.isFinished(state)) {
              freeCores(executorId) += 1
              makeOffers(executorId)
            }
    
          case ReviveOffers => // 接收从StandaloneSchedulerBackend发来的ReviveOffers 
            makeOffers()
    
          case StopDriver =>
            sender ! true
            context.stop(self)
    
          case RemoveExecutor(executorId, reason) =>
            removeExecutor(executorId, reason)
            sender ! true
    
          case Terminated(actor) =>
            actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
    
          case RemoteClientDisconnected(transport, address) =>
            addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
    
          case RemoteClientShutdown(transport, address) =>
            addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
        }
    
        // Make fake resource offers on all executors
        def makeOffers() {
          launchTasks(scheduler.resourceOffers(
            executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
        }
    
        // Make fake resource offers on just one executor
    // 可以看到这里传给scheduler.resourceOffers的WorkOffer,是根据之前已经分布好的executor静态生成的
    // 而不是动态得到的workeroffer, 如果用mesos, 这里应该是动态获取workeroffer, 然后传给scheduler.resourceOffers
    def makeOffers(executorId: String) { launchTasks(scheduler.resourceOffers( Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { freeCores(task.executorId) -= 1 executorActor(task.executorId) ! LaunchTask(task) // launch就是给executorActor发送LaunchTask event } } // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") val numCores = freeCores(executorId) actorToExecutorId -= executorActor(executorId) addressToExecutorId -= executorAddress(executorId) executorActor -= executorId executorHost -= executorId freeCores -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } } }

     

    SparkDeploySchedulerBackend

    关键就是创建和管理Driver and Client Actor

    private[spark] class SparkDeploySchedulerBackend(
        scheduler: ClusterScheduler,
        sc: SparkContext,
        master: String,
        appName: String)
      extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
      with ClientListener
      with Logging {
      var client: Client = null
    
      override def start() {
        super.start() // 调用StandaloneSchedulerBackend的start,创建DriverActor
    
        // The endpoint for executors to talk to us
        val driverUrl = "akka://spark@%s:%s/user/%s".format(
          System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
          StandaloneSchedulerBackend.ACTOR_NAME)
        val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
        val command = Command(  // 生成worker中ExecutorRunner中执行的command, 其实就是运行StandaloneExecutorBackend
          "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
        val sparkHome = sc.getSparkHome().getOrElse(null)
        val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
            "http://" + sc.ui.appUIAddress) // 生成application description
    
        client = new Client(sc.env.actorSystem, master, appDesc, this) // 创建Client Actor, 并start
        client.start()
      }
    
      override def stop() {
        stopping = true
        super.stop()
        client.stop()
        if (shutdownCallback != null) {
          shutdownCallback(this)
        }
      }
    }
  • 相关阅读:
    Jmeter之http性能测试实战 非GUI模式压测 NON-GUI模式 结果解析TPS——干货(十一)
    UI Recorder 自动化测试 回归原理(九)
    UI Recorder 自动化测试 录制原理(八)
    UI Recorder 自动化测试 整体架构(七)
    UI Recorder 自动化测试 配置项(六)
    UI Recorder 自动化测试 工具栏使用(五)
    UI Recorder 自动化测试 回归测试(四)
    UI Recorder 自动化测试 录制(三)
    UI Recorder 自动化测试工具安装问题疑难杂症解决(二)
    UI Recorder 自动化测试安装教程(一)
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3504052.html
Copyright © 2011-2022 走看看