zoukankan      html  css  js  c++  java
  • 创建RpcEnv

    感觉这篇文章不错

    2.1.2、创建RpcEnv

     -  RpcEndpoint

     -  RpcEndpointRef

        val systemName = if (isDriver) driverSystemName else executorSystemName
        val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
          securityManager, clientMode = !isDriver)
    View Code

     进入SparkEnv create() , 实际调用`new NettyRpcEnvFactory().create(config)`

      def create(
          name: String,
          bindAddress: String,
          advertiseAddress: String,
          port: Int,
          conf: SparkConf,
          securityManager: SecurityManager,
          clientMode: Boolean): RpcEnv = {
        val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
          clientMode)
        //
        new NettyRpcEnvFactory().create(config)
      }
    View Code

    看看NettyRpcEnvFactory.create中具体做了什么

    2.1.2.1、创建java序列化器

        val javaSerializerInstance =
          new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    View Code

    2.1.2.2、创建一个NettyRpcEnv, 如果是个clientMode 就返回这个NettyRpcEnv

        val nettyEnv =
          new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
            config.securityManager)
    View Code

    2.1.2.3、非ClientMode, 调用`Utils.startServiceOnPort`, 传入startNettyRpcEnv, 是一个匿名函数,

        if (!config.clientMode) {
          val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
            nettyEnv.startServer(config.bindAddress, actualPort)
            (nettyEnv, nettyEnv.address.port)
          }
          try {
         Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } }

     调用`Utils.startServiceOnPort`, 通过startService启动一个服务在指定host,port,  实际就是回调上面的startNetyRpcEnv

    所以我们返回看`nettyEnv.startServer(config.bindAddress, actualPort)`的功能

      /**
       * 创建一个TransportServer
       * @param bindAddress
       * @param port
       */
      def startServer(bindAddress: String, port: Int): Unit = {
        val bootstraps: java.util.List[TransportServerBootstrap] =
          if (securityManager.isAuthenticationEnabled()) {
            java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
          } else {
            java.util.Collections.emptyList()
          }
    //创建server, server = transportContext.createServer(bindAddress, port, bootstraps) //dispatcher注册一个RpcEndpoint dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) }

     待续。。。

     

  • 相关阅读:
    卷积神经网络
    舍弃—Dropout
    池化—Pooling
    Python基础知识点——简单 函数
    同事将excel数据转化为pdf,提前下班了,而我还在苦逼地做表
    怎么才能隐藏的IP?打造超强IP池项目,让你自己都忘记原本的IP
    Python爬取抖音视频(没有水印的哟)
    Python可视化:matplotlib 制作雷达图进行对比分析
    用于GIS(地理信息系统)和三维可视化制图的Python库
    关于如何在文件中调用命令窗口执行代码(以python为例)
  • 原文地址:https://www.cnblogs.com/chengbao/p/10611285.html
Copyright © 2011-2022 走看看