zoukankan      html  css  js  c++  java
  • Spark源码分析 – SchedulerBackend

    SchedulerBackend, 两个任务, 申请资源和task执行和管理

    对于SparkDeploySchedulerBackend, 基于actor模式, 主要就是启动和管理两个actor
    Deploy.Client Actor, 负责资源申请, 在SparkDeploySchedulerBackend初始化的时候就会被创建, 然后Client会去到Master上注册, 最终完成在Worker上的ExecutorBackend的创建(参考, Spark源码分析 – Deploy), 并且这些ExecutorBackend都会被注册到Driver Actor上
    Driver Actor, 负责task的执行
    由于Spark是原先基于Mesos的, 然后为了兼容性才提供Standalone模式, 所以你可以看到Driver Actor中的接口都是mesos风格的, 在mesos的情况下应该是动态的申请资源, 然后执行task (猜测, 还没有看源码)
    但对于coarse-grained Mesos mode和Spark's standalone deploy mode, 这步被简化成当TaskScheduler初始化的时候, 直接就将资源分配好了, 然后Driver Actor只是负责调度task在这些executor上执行
    所以在makeOffers的注释上, 写的是Make fake resource offers, 因为这里其实没有真正的offer resources
    关于Driver Actor如何调用task去执行, 关键在scheduler.resourceOffers

    SchedulerBackend

    package org.apache.spark.scheduler.cluster
    /**
     * A backend interface for cluster scheduling systems that allows plugging in different ones under
     * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
     * machines become available and can launch tasks on them.
     */
    private[spark] trait SchedulerBackend {
      def start(): Unit
      def stop(): Unit
      def reviveOffers(): Unit
      def defaultParallelism(): Int
    
      // Memory used by each executor (in megabytes)
      protected val executorMemory: Int = SparkContext.executorMemoryRequested
    
      // TODO: Probably want to add a killTask too
    }

     

    StandaloneSchedulerBackend

    用于coarse-grained Mesos mode和Spark's standalone deploy mode
    可用看到主要目的, 就是创建并维护driverActor
    主要的逻辑都在driverActor 中

    /**
     * A standalone scheduler backend, which waits for standalone executors to connect to it through
     * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
     * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
     */
    private[spark]
    class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
      extends SchedulerBackend with Logging
    {
      // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
      var totalCoreCount = new AtomicInteger(0)
    
      class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
      // ……后面分析
      }
    
      var driverActor: ActorRef = null
      val taskIdsOnSlave = new HashMap[String, HashSet[String]]
    
      override def start() {
        val properties = new ArrayBuffer[(String, String)]
        val iterator = System.getProperties.entrySet.iterator
        while (iterator.hasNext) {
          val entry = iterator.next
          val (key, value) = (entry.getKey.toString, entry.getValue.toString)
          if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
            properties += ((key, value))
          }
        }
        driverActor = actorSystem.actorOf( // 关键就是创建driverActor
          Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
      }
    
      private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
    
      override def stop() {
        try {
          if (driverActor != null) {
            val future = driverActor.ask(StopDriver)(timeout) // 关闭driverActor
            Await.result(future, timeout)
          }
        } catch {
          case e: Exception =>
            throw new SparkException("Error stopping standalone scheduler's driver actor", e)
        }
      }
    
      override def reviveOffers() {
        driverActor ! ReviveOffers  // 发送ReviveOffers event给driverActor 
      }
    
      override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
          .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
    
      // Called by subclasses when notified of a lost worker
      def removeExecutor(executorId: String, reason: String) {
        try {
          val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
          Await.result(future, timeout)
        } catch {
          case e: Exception =>
            throw new SparkException("Error notifying standalone scheduler's driver actor", e)
        }
      }
    }


    DriverActor

    关键的函数, makeOffers, 在executors上launch tasks, 什么时候调用?
    RegisterExecutor的时候,
    Task StatusUpdate的时候,
    收到ReviveOffers event的时候, 新的task被submit的时候, delay scheduling被触发的时候(per second)
    关于delay scheduling, 应该是为了保持活度, 当没有任何状态变化时, 仍然需要继续保持launch tasks

      class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
        private val executorActor = new HashMap[String, ActorRef] // track所有executorActor Ref
        private val executorAddress = new HashMap[String, Address]
        private val executorHost = new HashMap[String, String]
        private val freeCores = new HashMap[String, Int]
        private val actorToExecutorId = new HashMap[ActorRef, String]
        private val addressToExecutorId = new HashMap[Address, String]
    
        override def preStart() {
          // Listen for remote client disconnection events, since they don't go through Akka's watch()
          context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
    
          // Periodically revive offers to allow delay scheduling to work
          val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
          context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
        }
    
        def receive = {
          case RegisterExecutor(executorId, hostPort, cores) =>  // 接收从StandaloneExecutorBackend发来的RegisterExecutor
            Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
            if (executorActor.contains(executorId)) {
              sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
            } else {
              logInfo("Registered executor: " + sender + " with ID " + executorId)
              sender ! RegisteredExecutor(sparkProperties)
              context.watch(sender) // watch executor actor
              executorActor(executorId) = sender
              executorHost(executorId) = Utils.parseHostPort(hostPort)._1
              freeCores(executorId) = cores
              executorAddress(executorId) = sender.path.address
              actorToExecutorId(sender) = executorId
              addressToExecutorId(sender.path.address) = executorId
              totalCoreCount.addAndGet(cores)
              makeOffers()
            }
    
          case StatusUpdate(executorId, taskId, state, data) =>
            scheduler.statusUpdate(taskId, state, data.value)
            if (TaskState.isFinished(state)) {
              freeCores(executorId) += 1
              makeOffers(executorId)
            }
    
          case ReviveOffers => // 接收从StandaloneSchedulerBackend发来的ReviveOffers 
            makeOffers()
    
          case StopDriver =>
            sender ! true
            context.stop(self)
    
          case RemoveExecutor(executorId, reason) =>
            removeExecutor(executorId, reason)
            sender ! true
    
          case Terminated(actor) =>
            actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
    
          case RemoteClientDisconnected(transport, address) =>
            addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
    
          case RemoteClientShutdown(transport, address) =>
            addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
        }
    
        // Make fake resource offers on all executors
        def makeOffers() {
          launchTasks(scheduler.resourceOffers(
            executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
        }
    
        // Make fake resource offers on just one executor
    // 可以看到这里传给scheduler.resourceOffers的WorkOffer,是根据之前已经分布好的executor静态生成的
    // 而不是动态得到的workeroffer, 如果用mesos, 这里应该是动态获取workeroffer, 然后传给scheduler.resourceOffers
    def makeOffers(executorId: String) { launchTasks(scheduler.resourceOffers( Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { freeCores(task.executorId) -= 1 executorActor(task.executorId) ! LaunchTask(task) // launch就是给executorActor发送LaunchTask event } } // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") val numCores = freeCores(executorId) actorToExecutorId -= executorActor(executorId) addressToExecutorId -= executorAddress(executorId) executorActor -= executorId executorHost -= executorId freeCores -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } } }

     

    SparkDeploySchedulerBackend

    关键就是创建和管理Driver and Client Actor

    private[spark] class SparkDeploySchedulerBackend(
        scheduler: ClusterScheduler,
        sc: SparkContext,
        master: String,
        appName: String)
      extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
      with ClientListener
      with Logging {
      var client: Client = null
    
      override def start() {
        super.start() // 调用StandaloneSchedulerBackend的start,创建DriverActor
    
        // The endpoint for executors to talk to us
        val driverUrl = "akka://spark@%s:%s/user/%s".format(
          System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
          StandaloneSchedulerBackend.ACTOR_NAME)
        val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
        val command = Command(  // 生成worker中ExecutorRunner中执行的command, 其实就是运行StandaloneExecutorBackend
          "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
        val sparkHome = sc.getSparkHome().getOrElse(null)
        val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
            "http://" + sc.ui.appUIAddress) // 生成application description
    
        client = new Client(sc.env.actorSystem, master, appDesc, this) // 创建Client Actor, 并start
        client.start()
      }
    
      override def stop() {
        stopping = true
        super.stop()
        client.stop()
        if (shutdownCallback != null) {
          shutdownCallback(this)
        }
      }
    }
  • 相关阅读:
    锁:synchronized原理
    锁:synchronized与Lock的区别
    锁:java内存模型JMM(JMM)
    spring:Beanfactory和ApplicationContext、BeanFactory 和 FactoryBean
    锁:synchronized(synchronized保证三大特性、synchronized的特性)
    JS类的继承
    JS类—class
    json数据格式
    软件工程的bug
    软件工程的历史
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3504052.html
Copyright © 2011-2022 走看看