zoukankan      html  css  js  c++  java
  • Spark启动流程(Standalone)- master源码

    Master源码

     1 package org.apache.spark.deploy.master
     2 //伴生类
     3 private[deploy] class Master(
     4                                 override val rpcEnv: RpcEnv,
     5                                 address: RpcAddress,
     6                                 webUiPort: Int,
     7                                 val securityMgr: SecurityManager,
     8                                 val conf: SparkConf)
     9     extends ThreadSafeRpcEndpoint with Logging with LeaderElectable
    10 {
    11 ...
    12 }
    13 //伴生对象
    14 private[deploy] object Master extends Logging{
    15     val SYSTEM_NAME = "sparkMaster"
    16     val ENDPOINT_NAME = "Master"
    17     
    18     // 启动 Master 的入口函数
    19     def main(argStrings: Array[String]) {
    20         Utils.initDaemon(log)
    21         val conf = new SparkConf
    22         // 构建用于参数解析的实例   
    23         //--host hadoop201 --port 7077 --webui-port 8080
    24         val args = new MasterArguments(argStrings, conf)
    25         // 启动 RPC 通信环境和 MasterEndPoint(通信终端)
    26        //<<1>>
    27         val (rpcEnv, _, _): (RpcEnv, Int, Option[Int]) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    28         rpcEnv.awaitTermination()
    29     }
    30    ...
    31 }

    << 1 >>、启动Mater返回一个三元组

     1     /**
     2       * Start the Master and return a three tuple of:
     3       * 启动 Master, 并返回一个三元组
     4       * (1) The Master RpcEnv
     5       * (2) The web UI bound port
     6       * (3) The REST server bound port, if any
     7       */
     8     def startRpcEnvAndEndpoint(
     9                                   host: String,
    10                                   port: Int,
    11                                   webUiPort: Int,
    12                                   conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    13         val securityMgr = new SecurityManager(conf)
    14         // 创建 Master 端的 RpcEnv 环境, 并启动 RpcEnv
    15         // 参数: sparkMaster hadoop201 7077 conf securityMgr
    16         // 返回值  的实际类型是: NettyRpcEnv
    17         //<< 1.1 >>
    18         val rpcEnv: RpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    19         // 创建 Master对象, 该对象就是一个 RpcEndpoint, 在 RpcEnv 中注册这个 RpcEndpoint
    20         // 返回该 RpcEndpoint 的引用, 使用该引用来接收信息和发送信息
    21         //<< 1.2 >>
    22         val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
    23             new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    24         // 向 Master 的通信终端发法请求,获取 BoundPortsResponse 对象
    25         // BoundPortsResponse 是一个样例类包含三个属性: rpcEndpointPort webUIPort restPort
    26         val portsResponse: BoundPortsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    27         (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
    28     }

    << 1.1 >> RpcEnv的创建

     1 def create(
     2               name: String,
     3               bindAddress: String,
     4               advertiseAddress: String,
     5               port: Int,
     6               conf: SparkConf,
     7               securityManager: SecurityManager,
     8               clientMode: Boolean): RpcEnv = {
     9     // 保存 RpcEnv 的配置信息
    10     val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
    11         clientMode)
    12     // 创建 NettyRpcEvn
    13     //<< 1.1.1 >>
    14     new NettyRpcEnvFactory().create(config)
    15 }

    真正的创建是调用NettyRpcEnvFactory 的 create 方法创建

    创建NettyRpcEnv的时候,会创建消息分发器,收件箱和存储远程地址与发件箱的Map

    RpcEnv.scala

    << 1.1.1 >> NettyRpcEnvFactory ( NettyRpcEnv .scala)

     1 private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
     2     /*
     3     创建 NettyRpcEnv, 并且启动为后台程序
     4      */
     5     def create(config: RpcEnvConfig): RpcEnv = {
     6         val sparkConf: SparkConf = config.conf
     7         // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
     8         // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
     9         // 用于 Rpc传输对象时的序列化
    10         val javaSerializerInstance: JavaSerializerInstance = new JavaSerializer(sparkConf)
    11             .newInstance()
    12             .asInstanceOf[JavaSerializerInstance]
    13         // 实例化 NettyRpcEnv
    14         val nettyEnv = new NettyRpcEnv(
    15             sparkConf,
    16             javaSerializerInstance,
    17             config.advertiseAddress,
    18             config.securityManager)
    19         if (!config.clientMode) {
    20             // 定义 NettyRpcEnv 的启动函数
    21             val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
    22                 nettyEnv.startServer(config.bindAddress, actualPort)
    23                 (nettyEnv, nettyEnv.address.port)
    24             }
    25             try {
    26                 // 启动 NettyRpcEnv
    27                 Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
    28             } catch {
    29                 case NonFatal(e) =>
    30                     nettyEnv.shutdown()
    31                     throw e
    32             }
    33         }
    34         nettyEnv
    35     }
    36 }

    << 1.2 >> Master伴生类(Master 端的 RpcEndpoint 启动)

    Master是一个RpcEndpoint.

    他的生命周期方法是: constructor -> onStart -> receive* -> onStop

    onStart 主要代码片段

     1  // 创建 WebUI 服务器
     2  webUi = new MasterWebUI(this, webUiPort)
     3 
     4 
     5 // 按照固定的频率去启动线程来检查 Worker 是否超时. 其实就是给自己发信息: CheckForWorkerTimeOut
     6 // 默认是每分钟检查一次.
     7  checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
     8     override def run(): Unit = Utils.tryLogNonFatalError {
     9 // 在 receive 方法中对 CheckForWorkerTimeOut 进行处理
    10        //<< 1.2.1 >>
    11         self.send(CheckForWorkerTimeOut)
    12     }
    13 }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    14 
    15 
    16 private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000

    << 1.2.1 >> 检查并移除超时的worker

     1    /** Check for, and remove, any timed-out workers */
     2     private def timeOutDeadWorkers() {
     3         // Copy the workers into an array so we don't modify the hashset while iterating through it
     4         val currentTime = System.currentTimeMillis()
     5         //  把超时的 Worker 从 workers 中移除
     6 //过滤出来要移除的worker:(上次心跳时间 小于 当前时间 减去 超时时间 )即为超时
     7         val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
     8         for (worker <- toRemove) {
     9             // 如果 worker 的状态不是 DEAD
    10             if (worker.state != WorkerState.DEAD) {
    11                 logWarning("Removing %s because we got no heartbeat in %d seconds".format(
    12                     worker.id, WORKER_TIMEOUT_MS / 1000))
    13                 removeWorker(worker) //
    14             } else {
    15                 if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
    16                     workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
    17                 }
    18             }
    19         }
    20     }
  • 相关阅读:
    【js】this=>>4种用法
    【js】接口实现代码
    【es6】object.is()&&==&&===
    js apply&&call
    【javascript=>>DOM】=>>Attribute与Property的区别
    Android ListView刷新问题
    Android EditText自动换行
    Android 状态栏隐藏 ( 全屏 )
    Android 取得手机屏幕大小
    Android中使用代码改变背景颜色
  • 原文地址:https://www.cnblogs.com/hyunbar/p/12079466.html
Copyright © 2011-2022 走看看