zoukankan      html  css  js  c++  java
  • 创建RpcEnv

    感觉这篇文章不错

    2.1.2、创建RpcEnv

     -  RpcEndpoint

     -  RpcEndpointRef

        val systemName = if (isDriver) driverSystemName else executorSystemName
        val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
          securityManager, clientMode = !isDriver)
    View Code

     进入SparkEnv create() , 实际调用`new NettyRpcEnvFactory().create(config)`

      def create(
          name: String,
          bindAddress: String,
          advertiseAddress: String,
          port: Int,
          conf: SparkConf,
          securityManager: SecurityManager,
          clientMode: Boolean): RpcEnv = {
        val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
          clientMode)
        //
        new NettyRpcEnvFactory().create(config)
      }
    View Code

    看看NettyRpcEnvFactory.create中具体做了什么

    2.1.2.1、创建java序列化器

        val javaSerializerInstance =
          new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    View Code

    2.1.2.2、创建一个NettyRpcEnv, 如果是个clientMode 就返回这个NettyRpcEnv

        val nettyEnv =
          new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
            config.securityManager)
    View Code

    2.1.2.3、非ClientMode, 调用`Utils.startServiceOnPort`, 传入startNettyRpcEnv, 是一个匿名函数,

        if (!config.clientMode) {
          val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
            nettyEnv.startServer(config.bindAddress, actualPort)
            (nettyEnv, nettyEnv.address.port)
          }
          try {
         Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } }

     调用`Utils.startServiceOnPort`, 通过startService启动一个服务在指定host,port,  实际就是回调上面的startNetyRpcEnv

    所以我们返回看`nettyEnv.startServer(config.bindAddress, actualPort)`的功能

      /**
       * 创建一个TransportServer
       * @param bindAddress
       * @param port
       */
      def startServer(bindAddress: String, port: Int): Unit = {
        val bootstraps: java.util.List[TransportServerBootstrap] =
          if (securityManager.isAuthenticationEnabled()) {
            java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
          } else {
            java.util.Collections.emptyList()
          }
    //创建server, server = transportContext.createServer(bindAddress, port, bootstraps) //dispatcher注册一个RpcEndpoint dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) }

     待续。。。

     

  • 相关阅读:
    BUAA_OO_2020_Unit3_Overview
    BUAA_OS_2020_Lab2_Code_Review
    BUAA_OO_2020_Unit2_Overview
    BUAA_OS_2020_Lab1_Code_Review
    BUAA_OO_2020_Unit1_Overview
    实验十 团队作业6:团队项目用户验收&Beta冲刺
    【Beta】Scrum meeting 4
    【Beta】Scrum meeting 3
    【Beta】Scrum meeting 2
    【Beta】Scrum meeting 1
  • 原文地址:https://www.cnblogs.com/chengbao/p/10611285.html
Copyright © 2011-2022 走看看