2.1.2、创建RpcEnv
- RpcEndpoint
- RpcEndpointRef

val systemName = if (isDriver) driverSystemName else executorSystemName val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf, securityManager, clientMode = !isDriver)
进入SparkEnv create() , 实际调用`new NettyRpcEnvFactory().create(config)`

def create( name: String, bindAddress: String, advertiseAddress: String, port: Int, conf: SparkConf, securityManager: SecurityManager, clientMode: Boolean): RpcEnv = { val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager, clientMode) // new NettyRpcEnvFactory().create(config) }
看看NettyRpcEnvFactory.create中具体做了什么
2.1.2.1、创建java序列化器

val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
2.1.2.2、创建一个NettyRpcEnv, 如果是个clientMode 就返回这个NettyRpcEnv

val nettyEnv = new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager)
2.1.2.3、非ClientMode, 调用`Utils.startServiceOnPort`, 传入startNettyRpcEnv, 是一个匿名函数,
if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } }
调用`Utils.startServiceOnPort`, 通过startService启动一个服务在指定host,port, 实际就是回调上面的startNetyRpcEnv
所以我们返回看`nettyEnv.startServer(config.bindAddress, actualPort)`的功能
/** * 创建一个TransportServer * @param bindAddress * @param port */ def startServer(bindAddress: String, port: Int): Unit = { val bootstraps: java.util.List[TransportServerBootstrap] = if (securityManager.isAuthenticationEnabled()) { java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager)) } else { java.util.Collections.emptyList() }
//创建server, server = transportContext.createServer(bindAddress, port, bootstraps) //dispatcher注册一个RpcEndpoint dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) }
待续。。。