zoukankan      html  css  js  c++  java
  • Spark Deploy 模块

    Spark Scheduler 模块的文章中,介绍到 Spark 将底层的资源管理和上层的任务调度分离开来,一般而言,底层的资源管理会使用第三方的平台,如 YARN 和 Mesos。为了方便用户测试和使用,Spark 也单独实现了一个简单的资源管理平台,也就是本文介绍的 Deploy 模块。

    一些有经验的读者已经使用过该功能。

    本文参考:http://jerryshao.me/architecture/2013/04/30/Spark%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B-deploy%E6%A8%A1%E5%9D%97/

    Spark RPC 的实现

    细心的读者在阅读 Scheduler 相关代码时,已经注意到很多地方使用了 RPC 的方式通讯,比如 driver 和 executor 之间传递消息。

    在旧版本的 Spark 中,直接使用了 akka.Actor 作为并发通讯的基础。很多模块是继承于 akka.Actor 的。为了剥离对 akka 的依赖性, Spark 抽象出一个独立的模块,org.apache.spark.rpc。里面定义了 RpcEndpoint 和 RpcEndpointRef,与 Actor 和 ActorRef 的意义和作用一模一样。并且该 RPC 模块仅有一个实现 org.apache.spark.rpc.akka。所以其通讯方式依然使用了 akka。优势是接口已经抽象出来,随时可以用其他方案替换 akka。

    Spark 的风格似乎就是这样,什么都喜欢自己实现,包括调度、存储、shuffle,和刚推出的 Tungsten 项目(自己管理内存,而非 JVM 托管)。

    Deploy 模块的整体架构

    deploy 木块主要包括三个模块:master, worker, client。

    Master:集群的管理者,接受 worker 的注册,接受 client 提交的 application,调度所有的 application。

    Worker:一个 worker 上有多个 ExecutorRunner,这些 executor 是真正运行 task 的地方。worker 启动时,会向 master 注册自己。

    Client:向 master 提交和监控 application。

    代码详解

    启动 master 和 worker

    object org.apache.spark.deploy.master.Master 中,有 master 启动的 main 函数:

    private[deploy] object Master extends Logging {
      val SYSTEM_NAME = "sparkMaster"
      val ENDPOINT_NAME = "Master"
    
      def main(argStrings: Array[String]) {
        SignalLogger.register(log)
        val conf = new SparkConf
        val args = new MasterArguments(argStrings, conf)
        val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
        rpcEnv.awaitTermination()
      }
    
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
        val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
        val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
          new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) // 启动 Master 和 master RPC
        val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
      }
    }

    这里最主要的一行是:

        val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
          new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) // 启动 Master 的 RPC

    Master 继承于 RpcEndpoint,所以这里启动工作,都是在 Master.onStart 中完成,主要是启动了 restful 的 http 服务,用于展示状态。

    object org.apache.spark.deploy.worker.Worker 中,有 worker 启动的 main 函数:

    private[deploy] object Worker extends Logging {
      val SYSTEM_NAME = "sparkWorker"
      val ENDPOINT_NAME = "Worker"
    
      // 需要传入 master 的 url
      def main(argStrings: Array[String]) {
        SignalLogger.register(log)
        val conf = new SparkConf
        val args = new WorkerArguments(argStrings, conf)
        val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
          args.memory, args.masters, args.workDir)
        rpcEnv.awaitTermination()
      }
    
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          cores: Int,
          memory: Int,
          masterUrls: Array[String],
          workDir: String,
          workerNumber: Option[Int] = None,
          conf: SparkConf = new SparkConf): RpcEnv = {
    
        // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
        val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
        val securityMgr = new SecurityManager(conf)
        val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
        val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
        rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
          masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr)) // 启动 Worker
        rpcEnv
      }
      ...
    }

    worker 启动方式与 master 非常相似。然后

    override def onStart() {
      assert(!registered)
      createWorkDir()  // 创建工作目录
      shuffleService.startIfEnabled() // 启动 shuffle 服务
      webUi = new WorkerWebUI(this, workDir, webUiPort) // 驱动 web 服务
      webUi.bind()
      registerWithMaster()  // 向 master 注册自己
    
      metricsSystem.registerSource(workerSource) // 这侧 worker 的资源
      metricsSystem.start()
      metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    }
    private def registerWithMaster() {
      registrationRetryTimer match {
        case None =>
          registered = false
          registerMasterFutures = tryRegisterAllMasters()
          connectionAttemptCount = 0
          registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( // 不断向 master 注册,直到成功
            new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                self.send(ReregisterWithMaster)
              }
            },
            INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
            INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
            TimeUnit.SECONDS))
        ...
      }
    }
    
    
    override def receive: PartialFunction[Any, Unit] = {
      case RegisteredWorker(masterRef, masterWebUiUrl) =>  // master 告知 worker 已经注册成功
        logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
        registered = true
        changeMaster(masterRef, masterWebUiUrl)
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable { // worker 不断向 master 发送心跳
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(SendHeartbeat)
          }
        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
      ...
    }

    如此,master 和 worker 使用心跳的方式一直保持连接。

    这里有两个 client,一是 org.apache.spark.deploy.Client,这个是我们 spark-submit 使用的 client,另外一个是 org.apache.spark.deploy.client.AppClient,这是用户 application 中启动的 client,也是本文介绍的 client。

    client 提交 application

    在 Spark Sceduler 模块中,我们有提到 AppClient 是在 SparkDeploySchedulerBackend 中被创建的,而 SparkDeploySchedulerBackend 是在 SparkContext 中被创建的。

    // SparkDeploySchedulerBackend.scala
    override def start() {
      super.start()
    
      // The endpoint for executors to talk to us
      val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
          RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
          CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
      val args = Seq(
        "--driver-url", driverUrl,
        "--executor-id", "{{EXECUTOR_ID}}",
        "--hostname", "{{HOSTNAME}}",
        "--cores", "{{CORES}}",
        "--app-id", "{{APP_ID}}",
        "--worker-url", "{{WORKER_URL}}")  
      ....
      val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
          args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
      val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
          command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
      client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
      client.start()
      waitForRegistration()
    }

    这里的创建了一个 client:AppClient,它会连接到 masters(spark://master:7077) 上,具体是在 AppClient.start 方法中:

    def start() {
      // Just launch an rpcEndpoint; it will call back into the listener.
      endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
    }

    ClientEndpoint 是一个 RpcEndpoint 的子类,被创建是会调用 onStart 方法,该方法向 master 注册自己,并提交新的 application 请求:

    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered) {
              return
            }
            val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self)) // 向 master 发送 application 的注册请求,并且 appDescription 包含如何启动 executor 的命令
        ...

    当 Master 接受到这个消息:

    case RegisterApplication(description, driver) => {
      if (state == RecoveryState.STANDBY) {
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        registerApplication(app) // 加入等待列表
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self)) // 返回注册成功的消息
        schedule() // 调度资源和 application
      }
    }

    schedule 是 master 最核心的方法,即资源调度和分配,这里的资源是指 CPU(core) 数量和内存大小。

    首先是把存在的 driver 的任务尽可能运行起来:

    private def schedule(): Unit = {
      if (state != RecoveryState.ALIVE) { return }
      val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
      for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
        for (driver <- waitingDrivers) {
          if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
            launchDriver(worker, driver) // 首先把 driver 的任务启动起来
            waitingDrivers -= driver
          }
        }
      }
      startExecutorsOnWorkers()
    }

    然后给每个 application 分配 executor:

    private def startExecutorsOnWorkers(): Unit = {
      // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
      // in the queue, then the second app, etc.
      for (app <- waitingApps if app.coresLeft > 0) {
        val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
        // Filter out workers that don't have enough resources to launch an executor
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
            worker.coresFree >= coresPerExecutor.getOrElse(1))
          .sortBy(_.coresFree).reverse
        // 在满足内存和cpu条件的 worker 中选择一些 executor
        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
    
        // Now that we've decided how many cores to allocate on each worker, let's allocate them
        for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
          allocateWorkerResourceToExecutors(
            app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
        }
      }
    }
    
    // 给一个 worker 调度一些 executors
    private def allocateWorkerResourceToExecutors(
        app: ApplicationInfo,
        assignedCores: Int,
        coresPerExecutor: Option[Int],
        worker: WorkerInfo): Unit = {
      val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
      val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
      for (i <- 1 to numExecutors) {
        val exec = app.addExecutor(worker, coresToAssign)
        launchExecutor(worker, exec)
        app.state = ApplicationState.RUNNING
      }
    }
    
    // 发送注册信息
    private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
      worker.addExecutor(exec) // master 端记录 worker 状态
      worker.endpoint.send(LaunchExecutor(masterUrl,
        exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) // 向 worker 端 rpc 发送注册信息
      exec.application.driver.send(ExecutorAdded(
        exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) // 向 driver 端 rpc 发送注册信息
    }

    Worker 在接收到消息,会创建一个 ExecutorRunner,并向 master 更新 executor 信息。

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      val manager = new ExecutorRunner(
              appId,
              execId,
              appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
              cores_,
              memory_,
              self,
              workerId,
              host,
              webUi.boundPort,
              publicAddress,
              sparkHome,
              executorDir,
              workerUri,
              conf,
              appLocalDirs, ExecutorState.LOADING)
            executors(appId + "/" + execId) = manager
            manager.start()
            coresUsed += cores_
            memoryUsed += memory_
            sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))

    ExecutorRunner.start 启动一个独立线程,具体的 task 运算逻辑:

    private def fetchAndRunExecutor() {
      try {
        val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
          memory, sparkHome.getAbsolutePath, substituteVariables) // 新进程的准备工作
        val command = builder.command()
        logInfo("Launch command: " + command.mkString(""", "" "", """))
    
        builder.directory(executorDir)
        builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
        builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
    
        // Add webUI log urls
        val baseUrl =
          s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
        builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
    
        process = builder.start() // 启动一个新的进程执行 application 的 task
        val header = "Spark Executor Command: %s
    %s
    
    ".format(
          command.mkString(""", "" "", """), "=" * 40)
    
        // Redirect its stdout and stderr to files
        val stdout = new File(executorDir, "stdout")
        stdoutAppender = FileAppender(process.getInputStream, stdout, conf) // 绑定 process 的标准输入
    
        val stderr = new File(executorDir, "stderr")
        Files.write(header, stderr, UTF_8)
        stderrAppender = FileAppender(process.getErrorStream, stderr, conf) // 绑定 process 的标准错误输出
    
        // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
        // or with nonzero exit code
        val exitCode = process.waitFor() // 等待线程执行完毕
        state = ExecutorState.EXITED
        val message = "Command exited with code " + exitCode
        worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))) // 通知 worker 任务结束,worker会收回一些资源,并通知 master 任务结束
      } catch {
        case interrupted: InterruptedException => {
          logInfo("Runner thread for executor " + fullId + " interrupted")
          state = ExecutorState.KILLED
          killProcess(None)
        }
        case e: Exception => {
          logError("Error running executor", e)
          state = ExecutorState.FAILED
          killProcess(Some(e.toString))
        }
      }
    }

    application 结束

    如果 application 是非正常原因被杀掉,master 会调用 handleKillExecutors,于是 master 通知 worker 杀掉 executor,executor 又interrupt 其内部进程,各个组件分别收回各自的资源。这个步骤 与http://jerryshao.me/architecture/2013/04/30/Spark%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B-deploy%E6%A8%A1%E5%9D%97/ 描述是一模一样的。

    总结

    至此,对于 Spark 自身的 Deploy 介绍已经完毕。这个模块相对简单,因为只是一个简单的资源管理系统,应该也不会被用于实际的生产环境中。不过读懂 Spark 的资源管理器,对于一些不熟悉 YARN 和 Mesos 的同学,还是很有学习意义的。

  • 相关阅读:
    WeakReference体验
    扩展Jquery自定义的一个错误警告控件ErrorProvider
    提高WCF的吞吐效率
    (三)资源
    替换WCF默认序列化方式
    (二)画刷
    JS字符串函数扩展
    索引
    Jquery ajax传递复杂参数给WebService
    有意义的整数正则表达式
  • 原文地址:https://www.cnblogs.com/keepthinking/p/4857015.html
Copyright © 2011-2022 走看看