zoukankan      html  css  js  c++  java
  • Spark之Yarn Cluster运行机制内核源码解读

    简介

    Spark有3种集群管理器:

    • Standalone
    • Hadoop YARN
      • 又分为yarn client与yarn cluser
    • Apache Mesos

    生产环境中一般使用yarn cluser模式

    个人理解

    yarn主要有两个作用.

    • 一个是创建Container以此来分配计算资源
    • 另外一个是在Container上运行ExecutorBackend建立起除yarn之外的第二套RPC服务, 之后driver基于此RPC分配计算任务

    ApplicationMaster

    • yarn启动的第一个container, 由ResourceManager创建, 之后的container的创建与维护交个Appliction.

    container

    • 一个container容器就是一个java进程, 容器是分配了一定内存和线程数的java进程. 然后可以在这个java进程跑一些代码.

    如何查看源码:

    • 可以借助idea的debug功能, 在运行的时候, 跟随进程的启动, 去查看进程的启动, 线程的启动, 类的加载, 对象的创建(以什么样的参数),方法的调用.

    Yarn cluster模式的主要运行过程

    1. 通过Spark-Submit脚本, 启动SparkSubmit 进程
    2. SparkSubmit进程通过反射的方式调用Client的main方法
    3. Client向ResourceManager发送指令启动ApplicationMaster
    4. ResouceManager选择一个NodeManager, 并在该NM上启动ApplictionMaster

      ApplictionMaster是一个yarn任务运行时第一个由RM启动的container,然后负责整个任务的运行,包括container的申请、启动、kill、状态检查等。ApplicationMaster属于应用程序级,其实现不是由Yarn框架提供(历史原因,yarn提供了MapReduce的ApplicationMaster的实现),需要用户自己实现ApplicationMaster进程的具体实现。

    5. ApplictionMaster进程启动后, 会启动Driver子线程, 执行用户作业
    6. ApplictionMaster进程向RM申请资源, 在NM申请一个container启动ExecutorBackend.

      ExecutorBackend用于进程间的通信

    7. AM发送指令, 在NM上启动ExecutorBackend进程
    8. NM启动ExecutorBackend进程
    9. ExecutorBackendDriver注册自己
    10. Driver注册成功后, ExecutorBackend创建Executor对象
    11. 之后Driver给Executor分配任务

    源码解析

    Yarn cluster模式在执行启动脚本后会依此运行以下3种java进程

    1. SparkSubmit
    2. ApplicationMaster: Driver作为一个线程运行在该进程中.
    3. CoarseGrainedExecutorBackend

    SparkSubmit进程

    1. 通过Spark-Submit脚本, 启动SparkSubmit 进程
      Spark-Submit脚本启动SparkSubmit 进程

      bin/spark-submit 
      --class org.apache.spark.examples.SparkPi 
      --master yarn 
      --deploy-mode cluster 
      ./examples/jars/spark-examples_2.11-2.1.1.jar 
      100

      之后yarn会依次启动以下3个进程:

      1. SparkSubmit
      2. ApplicationMaster
      3. CoarseGrainedExecutorBackend
    2. 启动SparkSubmit的主类是org.apache.spark.deploy.SparkSubmit. 查看改主类
      • mian方法
        def main(args: Array[String]): Unit = {
        /*
        参数
        --master yarn
        --deploy-mode cluster
        --class org.apache.spark.examples.SparkPi
        ./examples/jars/spark-examples_2.11-2.1.1.jar 100
        */
         val appArgs = new SparkSubmitArguments(args)
         appArgs.action match {
             // 如果没有指定 action, 则 action 的默认值是: action = Option(action).getOrElse(SUBMIT)
             case SparkSubmitAction.SUBMIT => submit(appArgs)  // 接下来调用该方法
             case SparkSubmitAction.KILL => kill(appArgs)
             case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
         }
        }
      • Submit`方法
        /**
        * 使用提供的参数提交应用程序
        * 有 2 步:
        * 1. 准备启动环境.
        * 根据集群管理器和部署模式为 child main class 设置正确的 classpath,
        系统属性,应用参数
        * 2. 使用启动环境调用 child main class 的 main 方法
        */
        @tailrec
        private def submit(args: SparkSubmitArguments): Unit = {
         // 准备提交环境 childMainClass = "org.apache.spark.deploy.yarn.Client", 获得Client的主类
         val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
        
         def doRunMain(): Unit = {
             if (args.proxyUser != null) {
        
             } else {
                 runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
             }
          }
        
         if (args.isStandaloneCluster && args.useRest) {
             // 在其他任何模式, 仅仅运行准备好的主类
             } else {
                 doRunMain()
         }
        }
      • prepareSubmitEnvironment 方法
        // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
        if (isYarnCluster) {
         // 在 yarn 集群模式下, 使用 yarn.Client 来封装一下 user class
         childMainClass = "org.apache.spark.deploy.yarn.Client"
        }
      • doRunMain 方法
        def doRunMain(): Unit = {
         if (args.proxyUser != null) {
         } else {
             runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
         }
        }
      • runMain方法: 通过反射的方式调用 "org.apache.spark.deploy.yarn.Client"的main 方法
        /**
        **
        使用给定启动环境运行 child class 的 main 方法
        * 注意: 如果使用了 cluster deploy mode, 主类并不是用户提供
        */
        private def runMain(
         childArgs: Seq[String],
         childClasspath: Seq[String],
         sysProps: Map[String, String],
         childMainClass: String,
         verbose: Boolean): Unit = {
        
             var mainClass: Class[_] = null
        
             try {
                     // 使用反射的方式加载 childMainClass = "org.apache.spark.deploy.yarn.Client"
                     mainClass = Utils.classForName(childMainClass)
                     } catch {
              }//
        
             反射出来 Client 的 main 方法
             val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
        
             if (!Modifier.isStatic(mainMethod.getModifiers)) {
                 throw new IllegalStateException("The main method in the given main class must be static")
             }
             try {
                 // 通过反射的方式调用 main 方法
                 mainMethod.invoke(null, childArgs.toArray)
                 } catch {
             }
        }
    3. org.apache.spark.deploy.yarn.Client 源码分析

      主要关注Cilent是如何通过RM创建Application

      • main 方法
        def main(argStrings: Array[String]) {
         // 设置环境变量 SPARK_YARN_MODE 表示运行在 YARN mode
         // 注意: 任何带有 SPARK_ 前缀的环境变量都会分发到所有的进程, 也包括远程
         进程
         System.setProperty("SPARK_YARN_MODE", "true")
         val sparkConf = new SparkConf
        
         // 对传递来的参数进一步封装
         val args = new ClientArguments(argStrings)
         new Client(args, sparkConf).run()
        }
      • Client.run 方法
        def run(): Unit = {
         // 提交应用, 返回应用的 id
         this.appId = submitApplication()
        }
      • client.submitApplication 方法: 向 ResourceManager 提交运行 ApplicationMaster

        调用org.apache.hadoop.yarn.client.api.YarnClient的两个api方法

        1. createApplication方法通过RPC与ResourceManager进程通信(rmClient.getNewApplication(request)),让其分配一个新的Application,结果存在GetNewApplicationResponse实体中,其中包括ApplicationId、集群最大可分配资源。createApplication的结果存在YarnClientApplication实体中。
        2. 客户端获取到YarnClientApplication后需要设置其中的上下文对象中的信息org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext,包括aplicationName、资源、队列、优先级、ApplicationMaster启动命令(在ContainerLaunchContext实体中,普通Container启动也使用这个实体),最后调用上面提到的第二个方法submitApplication,将ApplicationSubmissionContext实体传到ResourceManger端(rmClient.submitApplication(request);)。
      • /**
        *
        * 向 ResourceManager 提交运行 ApplicationMaster 的应用程序。
        *
        */
        def submitApplication(): ApplicationId = {
         var appId: ApplicationId = null
         try {
             // 初始化 yarn 客户端
             yarnClient.init(yarnConf)
             // 启动 yarn 客户端
             yarnClient.start()
        
             // 从 RM 创建一个应用程序
             val newApp = yarnClient.createApplication()
             val newAppResponse = newApp.getNewApplicationResponse() // 与ResourceManager进程通信, 获得ApplicationId、集群最大可分配资源
             appId = newAppResponse.getApplicationId()
             reportLauncherState(SparkAppHandle.State.SUBMITTED)
             launcherBackend.setAppId(appId.toString)
        
             // Set up the appropriate contexts to launch our AM
             // 设置正确的上下文对象来启动 ApplicationMaster
             val containerContext = createContainerLaunchContext(newAppResponse)
             // 创建应用程序提交任务上下文
             val appContext = createApplicationSubmissionContext(newApp, containerContext)
        
             // 提交应用给 ResourceManager 启动 ApplicationMaster
             // "org.apache.spark.deploy.yarn.ApplicationMaster"
             yarnClient.submitApplication(appContext)
             appId
         } catch {
        
         }
        }
      • 方法: createContainerLaunchContext
        private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
        : ContainerLaunchContext = {
         val amClass =
             if (isClusterMode) {  // 如果是 Cluster 模式
                 Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
             } else { // 如果是 Client 模式
                 Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
             }
         amContainer
        }

        至此, SparkSubmit 进程启动完毕. 主要:

        • 运行了Client的main方法
        • 向RM提交作业, 运行AppMaster

    ApplicationMaster进程

    org.apache.spark.deploy.yarn.ApplicationMaster伴生对象的main方法

    关注点:

    • 如何启动driver线程
    • 如何申请container, 运行executorBackend
    def main(args: Array[String]): Unit = {
        SignalUtils.registerLogger(log)
        // 构建 ApplicationMasterArguments 对象, 对传来的参数做封装
        val amArgs: ApplicationMasterArguments = new ApplicationMasterArguments(args)
    
        SparkHadoopUtil.get.runAsSparkUser { () =>
            // 构建 ApplicationMaster 实例  ApplicationMaster 需要与 RM通讯
            master = new ApplicationMaster(amArgs, new YarnRMClient)
            // 运行 ApplicationMaster 的 run 方法, run 方法结束之后, 结束 ApplicationMaster 进程
            System.exit(master.run())
        }
    }

    ApplicationMaster 伴生类的 run方法

    final def run(): Int = {
        // 关键核心代码
        try {
    
            val fs = FileSystem.get(yarnConf)
    
            if (isClusterMode) {
                runDriver(securityMgr)
            } else {
                runExecutorLauncher(securityMgr)
            }
        } catch {
    
        }
        exitCode
    }

    runDriver 方法

    在该方法中运行了driver线程

    并且appMaster想RM注册了自己

    private def runDriver(securityMgr: SecurityManager): Unit = {
        addAmIpFilter()
        // 开始执行用户类. 启动一个子线程来执行用户类的 main 方法.  返回值就是运行用户类的子线程.
        // 线程名就叫 "Driver"
        userClassThread = startUserApplication()
    
        val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
        try {
            // 注册 ApplicationMaster , 其实就是请求资源
            registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""),
                        securityMgr)
            // 线程 join: 把userClassThread线程执行完毕之后再继续执行当前线程.
            userClassThread.join()
        } catch {
    
        }
    }

    startUserApplication 方法

    private def startUserApplication(): Thread = {
        // 得到用户类的 main 方法
        val mainMethod = userClassLoader.loadClass(args.userClass)
            .getMethod("main", classOf[Array[String]])
        // 创建及线程
        val userThread = new Thread {
            override def run() {
                try {
                    // 调用用户类的主函数
                    mainMethod.invoke(null, userArgs.toArray)
                } catch {
    
                } finally {
    
                }
            }
        }
        userThread.setContextClassLoader(userClassLoader)
        userThread.setName("Driver")
        userThread.start()
        userThread
    }

    registerAM 方法

    private def registerAM(
                              _sparkConf: SparkConf,
                              _rpcEnv: RpcEnv,
                              driverRef: RpcEndpointRef,
                              uiAddress: String,
                              securityMgr: SecurityManager) = {
    
        // 向 RM 注册, 得到 YarnAllocator
        // RPC的标准操作, 一旦启动一个进程, 该进程就会去主进程上注册自己
        allocator = client.register(driverUrl,
            driverRef,
            yarnConf,
            _sparkConf,
            uiAddress,
            historyAddress,
            securityMgr,
            localResources)
        // 请求分配资源
        allocator.allocateResources()
    }

    allocator.allocateResources() 方法

    /**
      请求资源,如果 Yarn 满足了我们的所有要求,我们就会得到一些容器(数量: maxExecutors)。
    
    通过在这些容器中启动 Executor 来处理 YARN 授予我们的任何容器。
    
    必须同步,因为在此方法中读取的变量会被其他方法更改。
      */
    def allocateResources(): Unit = synchronized {
    
        if (allocatedContainers.size > 0) {  // allocatedContainers就是一个List[Container]
    
            handleAllocatedContainers(allocatedContainers.asScala)
        }
    }

    handleAllocatedContainers方法

    /**
    处理 RM 授权给我们的容器
    */
    def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
        val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
        runAllocatedContainers(containersToUse)
    }

    runAllocatedContainers 方法

    /**
      * Launches executors in the allocated containers.
      在已经分配的容器中启动 Executors
      */
    private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
        // 每个容器上启动一个 Executor
        for (container <- containersToUse) {
            if (numExecutorsRunning < targetNumExecutors) {
                if (launchContainers) {
                    launcherPool.execute(new Runnable {
                        override def run(): Unit = {
                            try {
                                new ExecutorRunnable(
                                    Some(container),
                                    conf,
                                    sparkConf,
                                    driverUrl,
                                    executorId,
                                    executorHostname,
                                    executorMemory,
                                    executorCores,
                                    appAttemptId.getApplicationId.toString,
                                    securityMgr,
                                    localResources
                                ).run()  // 启动 executor
                                updateInternalState()
                            } catch {
    
                            }
                        }
                    })
                } else {
    
                }
            } else {
    
            }
        }
    }

    ExecutorRunnable.run方法

    def run(): Unit = {
        logDebug("Starting Executor Container")
        // 创建 NodeManager 客户端
        nmClient = NMClient.createNMClient()
        // 初始化 NodeManager 客户端
        nmClient.init(conf)
        // 启动 NodeManager 客户端
        nmClient.start()
        // 启动容器
        startContainer()
    }

    ExecutorRunnable.startContainer()

    def startContainer(): java.util.Map[String, ByteBuffer] = {
        val ctx = Records.newRecord(classOf[ContainerLaunchContext])
            .asInstanceOf[ContainerLaunchContext]
        // 准备要执行的命令
        val commands = prepareCommand()
    
        ctx.setCommands(commands.asJava)
        // Send the start request to the ContainerManager
        try {
            // 启动容器
            nmClient.startContainer(container.get, ctx)
        } catch {
    
        }
    }

    ExecutorRunnable.prepareCommand 方法

    private def prepareCommand(): List[String] = {
    
        val commands = prefixEnv ++ Seq(
            YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
            "-server") ++
            javaOpts ++
            // 要执行的类
            Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", 
                "--driver-url", masterAddress,
                "--executor-id", executorId,
                "--hostname", hostname,
                "--cores", executorCores.toString,
                "--app-id", appId) ++
            userClassPath ++
            Seq(
                s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
                s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
    
        commands.map(s => if (s == null) "null" else s).toList
    }

    至此, ApplicationMaster进程启动完毕, 其中主要启动了:

    • driver线程
    • 申请了List[Container]
    • 发出命令启动Container以及在Container上运行CoarseGrainedExecutorBackend

    CoarseGrainedExecutorBackend进程

    org.apache.spark.executor.CoarseGrainedExecutorBackend的伴生对象

    main方法

    def main(args: Array[String]) {
    
      // 启动 CoarseGrainedExecutorBackend
      run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
      // 运行结束之后退出进程
      System.exit(0)
    }

    run 方法

    /**
        准备 RpcEnv
    */
    private def run(
                       driverUrl: String,
                       executorId: String,
                       hostname: String,
                       cores: Int,
                       appId: String,
                       workerUrl: Option[String],
                       userClassPath: Seq[URL]) {
    
        SparkHadoopUtil.get.runAsSparkUser { () =>     
            val env = SparkEnv.createExecutorEnv(
                driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
    
            env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
                env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
        }
    }

    CoarseGrainedExecutorBackend的伴生对象

    继承自: ThreadSafeRpcEndpoint 是一个RpcEndpoint

    查看生命周期方法

    onStart 方法

    连接到 Driver, 并向 Driver注册Executor

    生命周期方法onStart

    override def onStart() {
        rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
            // This is a very fast action so we can use "ThreadUtils.sameThread"
            driver = Some(ref)
            // 向驱动注册 Executor 关键方法
            ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
        }(ThreadUtils.sameThread).onComplete {
            case Success(msg) =>
            case Failure(e) =>
                // 注册失败, 退出 executor
                exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
        }(ThreadUtils.sameThread)
    }

    Driver端的CoarseGrainedSchedulerBackend.DriverEndPoint 的 receiveAndReply 方法

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
        // 接收注册 Executor
        case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
            if (executorDataMap.contains(executorId)) {  // 已经注册过了
    
            } else {
                // 给 Executor  发送注册成功的信息
                executorRef.send(RegisteredExecutor)
    
            }
    }

    Eexcutor端的CoarseGrainedExecutorBackend的receive方法

    override def receive: PartialFunction[Any, Unit] = {
        // 向 Driver 注册成功
        case RegisteredExecutor =>
            logInfo("Successfully registered with driver")
            try {
                // 创建 Executor 对象   注意: Executor 其实是一个对象
                executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
            } catch {
    
            }
    }

    查看生命周期方法

    onStart 方法

    连接到 Driver, 并向 Driver注册Executor

    参考

    yarn任务提交过程源码分析

  • 相关阅读:
    前端利用百度开发文档给的web服务接口实现对某个区域周边配套的检索
    libevent源码学习(13):事件主循环event_base_loop
    libevent源码学习(11):超时管理之min_heap
    libevent源码学习(10):min_heap数据结构解析
    libevent源码学习(8):event_signal_map解析
    libevent源码学习(9):事件event
    libevent源码学习(6):事件处理基础——event_base的创建
    libevent源码学习(5):TAILQ_QUEUE解析
    仿Neo4j里的知识图谱,利用d3+vue开发的一个网络拓扑图
    element表格内每一行删除提示el-popover的使用要点
  • 原文地址:https://www.cnblogs.com/bitbitbyte/p/12946181.html
Copyright © 2011-2022 走看看