zoukankan      html  css  js  c++  java
  • Spark源码分析 – Deploy

    参考, Spark源码分析之-deploy模块

     

    Client

    Client在SparkDeploySchedulerBackend被start的时候, 被创建, 代表一个application和spark cluster进行通信
    Client的逻辑很简单, 封装ClientActor, 并负责该Actor的start和stop
    而ClientActor的关键在于preStart的时候, 向master注册该application, 并且在执行过程中接收master发来的event

    /**
     * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
     * and a listener for cluster events, and calls back the listener when various events occur.
     */
    private[spark] class Client(
        actorSystem: ActorSystem,
        masterUrl: String,
        appDescription: ApplicationDescription,
        listener: ClientListener)
      extends Logging {
      var actor: ActorRef = null
      var appId: String = null
    
      class ClientActor extends Actor with Logging {
        var master: ActorRef = null
        var masterAddress: Address = null
        var alreadyDisconnected = false  // To avoid calling listener.disconnected() multiple times
    
        override def preStart() {
          try {
            master = context.actorFor(Master.toAkkaUrl(masterUrl)) // 创建master ActorRef, 用于和master通信
            masterAddress = master.path.address
            master ! RegisterApplication(appDescription) // 向master注册该application
            context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
            context.watch(master)  // Doesn't work with remote actors, but useful for testing
          } catch {
            case e: Exception =>
              markDisconnected()
              context.stop(self)
          }
        }
    
        override def receive = { // 接收master发来的各种events
          case RegisteredApplication(appId_) =>
          case ApplicationRemoved(message) =>
          case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
          case ExecutorUpdated(id, state, message, exitStatus) =>
          case Terminated(actor_) if actor_ == master =>
          case RemoteClientDisconnected(transport, address) if address == masterAddress =>
          case RemoteClientShutdown(transport, address) if address == masterAddress =>
          case StopClient =>
        }
    
      }
    
      def start() {
        // Just launch an actor; it will call back into the listener.
        actor = actorSystem.actorOf(Props(new ClientActor))
      }
    
      def stop() {
        if (actor != null) {
          try {
            val future = actor.ask(StopClient)(timeout)
            Await.result(future, timeout)
          } catch {
            case e: TimeoutException =>
              logInfo("Stop request to Master timed out; it may already be shut down.")
          }
          actor = null
        }
      }
    }

    Master

    client负责提交application给master, 而worker也会向master注册
    所以Master作为Spark cluster的接口, 负责从client接收application请求, 并分配相应的worker资源给这个app 
    处理的关键消息, RegisterWorker, RegisterApplication或ExecutorStateChanged, 最终都会调用schedule
    schedule是他的核心函数, 这里首先只会根据worker的CPU cores进行schedule, 而不会考虑其他的资源, 可用考虑让app尽可能分布在更多或更少的workers上
    最后向worker actor发送LaunchExecutor, 真正启动ExecutorBackend

    private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { 
      var nextAppNumber = 0
      val workers = new HashSet[WorkerInfo] // track workers
      val idToWorker = new HashMap[String, WorkerInfo]
      val actorToWorker = new HashMap[ActorRef, WorkerInfo]
      val addressToWorker = new HashMap[Address, WorkerInfo]
    
      val apps = new HashSet[ApplicationInfo] // track apps 
      val idToApp = new HashMap[String, ApplicationInfo]
      val actorToApp = new HashMap[ActorRef, ApplicationInfo]
      val addressToApp = new HashMap[Address, ApplicationInfo]
    
      val waitingApps = new ArrayBuffer[ApplicationInfo] // 未完成的apps, schedule的对象 
      val completedApps = new ArrayBuffer[ApplicationInfo]
      override def receive = {
        case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
          logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
            host, workerPort, cores, Utils.megabytesToString(memory)))
          if (idToWorker.contains(id)) {
            sender ! RegisterWorkerFailed("Duplicate worker ID")
          } else {
            addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
            context.watch(sender)  // This doesn't work with remote actors but helps for testing
            sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
            schedule() // 重新schedule 
          }
        }
    
        case RegisterApplication(description) => {
          logInfo("Registering app " + description.name)
          val app = addApplication(description, sender)
          logInfo("Registered app " + description.name + " with ID " + app.id)
          waitingApps += app
          context.watch(sender)  // This doesn't work with remote actors but helps for testing
          sender ! RegisteredApplication(app.id)
          schedule() // 重新schedule
        }
    
        // 当executor的状态发生变化时, 这里只处理失败的case
        case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
          val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
          execOption match {
            case Some(exec) => { // 说明该executor是有记录的,合法的
              exec.state = state
              exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) // 向driver actor发送ExecutorUpdated事件
              if (ExecutorState.isFinished(state)) { // isFinished, means KILLED, FAILED, LOST, 即失败的case,名字起的不好
                val appInfo = idToApp(appId)
                // 先删除该executor, 释放出coresLeft, 重新schedule
    // Remove this executor from the worker and app
    logInfo("Removing executor " + exec.fullId + " because it is " + state) appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) // Only retry certain number of times so we don't go into an infinite loop.
    // 在retry次数以内, 则重新schedule执行
    if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) { schedule() } else { // 超过retry次数, 则整个application失败 logError("Application %s with ID %s failed %d times, removing it".format( appInfo.desc.name, appInfo.id, appInfo.retryCount)) removeApplication(appInfo, ApplicationState.FAILED) } } } case None => logWarning("Got status update for unknown executor " + appId + "/" + execId) } } case Heartbeat(workerId) => { // 更新worker的hb idToWorker.get(workerId) match { case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() case None => logWarning("Got heartbeat from unregistered worker " + workerId) } } case Terminated(actor)
        case RemoteClientDisconnected(transport, address) 
        case RemoteClientShutdown(transport, address) 
        case RequestMasterState 
        case CheckForWorkerTimeOut 
        case RequestWebUIPort 
      }
      
      /**
       * Schedule the currently available resources among waiting apps. This method will be called
       * every time a new app joins or resource availability changes.
       */
      def schedule() {
        // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
        // in the queue, then the second app, etc.
        if (spreadOutApps) { // 让app分布到尽可能多的worker上去
          // Try to spread out each app among all the nodes, until it has all its cores
          for (app <- waitingApps if app.coresLeft > 0) { // coresLeft表示该app是否还需要更多的cores, 表示并发度
            val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) // 找出可以使用的workers,本身alive,可以run这个app,最终按coresFree排序
                                       .filter(canUse(app, _)).sortBy(_.coresFree).reverse // canUse的定义,worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
            val numUsable = usableWorkers.length
            val assigned = new Array[Int](numUsable) // Number of cores to give on each node
            var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) // 一共可以分配的cores数,取需要的和可用的的min
            // 下面的过程是平均从每个可用的workers上获取cores
    var pos = 0 while (toAssign > 0) { if (usableWorkers(pos).coresFree - assigned(pos) > 0) { // 不能assign的比free的多 toAssign -= 1 assigned(pos) += 1 } pos = (pos + 1) % numUsable // 如果一轮不够,就需要循环分配 } // Now that we've decided how many cores to give on each node, let's actually give them for (pos <- 0 until numUsable) { if (assigned(pos) > 0) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome) //launch executorbackend app.state = ApplicationState.RUNNING } } } } else { // 让app分配到尽可能少的workers上去, 逻辑更简单点 // Pack each app into as few nodes as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse > 0) { val exec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec, app.desc.sparkHome) app.state = ApplicationState.RUNNING } } } } } }
     
      def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
        logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
        worker.addExecutor(exec)
        worker.actor ! LaunchExecutor( // 向work actor发送LaunchExecutor事件
          exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
        exec.application.driver ! ExecutorAdded( // 向driver actor发送ExecutorAdded事件
          exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
      }

     

    Worker

    Worker作为actor进程, 在启动时首先创建工作目录, 并向master注册自己
    最主要是接收LaunchExecutor事件, 使用ExecutorRunner来run executorbackend

    private[spark] class Worker(
        host: String,
        port: Int,
        webUiPort: Int,
        cores: Int,
        memory: Int,
        masterUrl: String,
        workDirPath: String = null)
      extends Actor with Logging {
     
      override def preStart() {
        sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
        logInfo("Spark home: " + sparkHome)
        createWorkDir() // 根据用户配置的sparkHome创建工作目录
        connectToMaster() // 向master注册自己
      }
      override def receive = {
        case RegisteredWorker(url) => // 注册成功,master的反馈
          masterWebUiUrl = url
          logInfo("Successfully registered with master")
          context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
            master ! Heartbeat(workerId) // 设置scheduler.schedule定期发送hb
          }
    
        case RegisterWorkerFailed(message) =>
          logError("Worker registration failed: " + message)
          System.exit(1)
    
        case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          val manager = new ExecutorRunner( // 创建ExecutorRunner, 并start
            appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
          executors(appId + "/" + execId) = manager
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None) // 发送给master ExecutorStateChanged事件,汇报ExecutorState.RUNNING
    
        case ExecutorStateChanged(appId, execId, state, message, exitStatus) => // 接收ExecutorRunner发来的ExecutorStateChanged事件
          master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) // 转发给master
          val fullId = appId + "/" + execId
          if (ExecutorState.isFinished(state)) {
            val executor = executors(fullId)
            logInfo("Executor " + fullId + " finished with state " + state +
              message.map(" message " + _).getOrElse("") +
              exitStatus.map(" exitStatus " + _).getOrElse(""))
            finishedExecutors(fullId) = executor
            executors -= fullId
            coresUsed -= executor.cores
            memoryUsed -= executor.memory
          }
    
        case KillExecutor(appId, execId) =>
          val fullId = appId + "/" + execId
          executors.get(fullId) match {
            case Some(executor) =>
              logInfo("Asked to kill executor " + fullId)
              executor.kill()
            case None =>
              logInfo("Asked to kill unknown executor " + fullId)
          }
    
        case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
          masterDisconnected()
    
        case RequestWorkerState => {
          sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
            finishedExecutors.values.toList, masterUrl, cores, memory,
            coresUsed, memoryUsed, masterWebUiUrl)
        }
      }

     

    ExecutorRunner
    创建线程执行fetchAndRunExecutor
    并且在线程中, 使用ProcessBuilder启动StandaloneExecutorBackend子进程

    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
    val command = Command("org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)

    为何不直接创建子进程?

     

    /**
     * Manages the execution of one executor process.
     */
    private[spark] class ExecutorRunner(
        val appId: String,
        val execId: Int,
        val appDesc: ApplicationDescription,
        val cores: Int,
        val memory: Int,
        val worker: ActorRef,
        val workerId: String,
        val host: String,
        val sparkHome: File,
        val workDir: File)
      extends Logging {
     
      val fullId = appId + "/" + execId
      var workerThread: Thread = null
      var process: Process = null
      var shutdownHook: Thread = null
     
      def start() {
        workerThread = new Thread("ExecutorRunner for " + fullId) {
          override def run() { fetchAndRunExecutor() } // 创建线程执行fetchAndRunExecutor
        }
        workerThread.start()
      }
     
      def buildCommandSeq(): Seq[String] = {
        val command = appDesc.command
        val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
        // SPARK-698: do not call the run.cmd script, as process.destroy()
        // fails to kill a process tree on Windows
        Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++ //java执行command.mainClass
          command.arguments.map(substituteVariables)
      }
     
      /**
       * Download and run the executor described in our ApplicationDescription
       */
      def fetchAndRunExecutor() {
        try {
          //调用ProcessBuilder, 使用进程执行command
          //Launch the process
          val command = buildCommandSeq()
          val builder = new ProcessBuilder(command: _*).directory(executorDir)
          val env = builder.environment()
          for ((key, value) <- appDesc.command.environment) {
            env.put(key, value)
          }
          // In case we are running this from within the Spark Shell, avoid creating a "scala"
          // parent process for the executor command
          env.put("SPARK_LAUNCH_WITH_SCALA", "0")
          process = builder.start()
    
          // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
          // long-lived processes only. However, in the future, we might restart the executor a few
          // times on the same machine.
          val exitCode = process.waitFor()
          worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), Some(exitCode))
        } catch {
            worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
          }
        }
      }
    }
  • 相关阅读:
    入门activiti-------1简单运行
    JS对象、构造器函数和原型对象之间的关系
    myeclipse配背景色
    maven的pom.xml文件错误
    oracleXE简易版---使用基础
    ognl表达式注意事项
    Executors、ExecutorService、ThreadPoolExecutor
    ThreadPoolExecutor
    Phaser相位(工具的实战案例使用)
    ForkJoin
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3502285.html
Copyright © 2011-2022 走看看