本章将分析motan的序列化和底层通信相关部分的代码。
1.在上一章中,有一个getrefers的操作,来获取所有服务器的引用,每个服务器的引用都是由DefaultRpcReferer来创建的
public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) { super(clz, url, serviceUrl); endpointFactory = ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension( url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));//通过spi加载NettyEndpointFactory client = endpointFactory.createClient(url);//创建client }
2.NettyClient的创建过程及源码分析
public Client createClient(URL url) { LoggerUtil.info(this.getClass().getSimpleName() + " create client: url={}", url); return createClient(url, heartbeatClientEndpointManager);//创建client } private Client createClient(URL url, EndpointManager endpointManager) { Client client = innerCreateClient(url);//调用NettyEndpointFactory的创建client的方法 endpointManager.addEndpoint(client);//添加心跳管理 return client; } protected Client innerCreateClient(URL url) { return new NettyClient(url);//返回NettyClient对象 } public NettyClient(URL url) { super(url); maxClientConnection = url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue()); timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay( new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()), MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, TimeUnit.MILLISECONDS); LoggerUtil.info("client's:"+url.getUri()); }
3.Netty相关的连接建立是通过open()方法进行的
public synchronized boolean open() { if (isAvailable()) { return true; } // 初始化netty client bootstrap initClientBootstrap(); // 初始化连接池 initPool(); LoggerUtil.info("NettyClient finish Open: url={}", url); // 注册统计回调 StatsUtil.registryStatisticCallback(this); // 设置可用状态 state = ChannelState.ALIVE; return state.isAliveState(); } private void initClientBootstrap() { bootstrap = new ClientBootstrap(channelFactory); bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); // 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时, // 如果为了严格意义的timeout,那么需要应用端进行控制。 int timeout = getUrl().getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue()); if (timeout <= 0) { throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR); } bootstrap.setOption("connectTimeoutMillis", timeout); // 最大响应包限制 final int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(), URLParamType.maxContentLength.getIntValue()); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new NettyDecoder(codec, NettyClient.this, maxContentLength));//解码器 pipeline.addLast("encoder", new NettyEncoder(codec, NettyClient.this));//编码器 pipeline.addLast("handler", new NettyChannelHandler(NettyClient.this, new MessageHandler() {//业务处理的handler @Override public Object handle(Channel channel, Object message) { Response response = (Response) message; NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());//移调异步处理response信息 if (responseFuture == null) { LoggerUtil.warn( "NettyClient has response from server, but resonseFuture not exist, requestId={}", response.getRequestId()); return null; } if (response.getException() != null) { responseFuture.onFailure(response); } else { responseFuture.onSuccess(response); } return null; } })); return pipeline; } }); }
4.连接池
protected void initPool() { poolConfig = new GenericObjectPool.Config();//使用了GenericObjectPool作为连接池 poolConfig.minIdle = url.getIntParameter(URLParamType.minClientConnection.getName(), URLParamType.minClientConnection.getIntValue());//最小连接数,配置中为2个 poolConfig.maxIdle = url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue());//最大连接数,配置中为10个 poolConfig.maxActive = poolConfig.maxIdle; poolConfig.maxWait = url.getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue()); poolConfig.lifo = url.getBooleanParameter(URLParamType.poolLifo.getName(), URLParamType.poolLifo.getBooleanValue()); poolConfig.minEvictableIdleTimeMillis = defaultMinEvictableIdleTimeMillis; poolConfig.softMinEvictableIdleTimeMillis = defaultSoftMinEvictableIdleTimeMillis; poolConfig.timeBetweenEvictionRunsMillis = defaultTimeBetweenEvictionRunsMillis; factory = createChannelFactory();//创建chanalfactory pool = new GenericObjectPool(factory, poolConfig); boolean lazyInit = url.getBooleanParameter(URLParamType.lazyInit.getName(), URLParamType.lazyInit.getBooleanValue()); if (!lazyInit) { for (int i = 0; i < poolConfig.minIdle; i++) {//初始化2个长连接 try { pool.addObject(); LoggerUtil.info("init client's connection :"+i); } catch (Exception e) { LoggerUtil.error("NettyClient init pool create connect Error: url=" + url.getUri(), e); } } } }
5.NettyChannelFactory
public class NettyChannelFactory extends BasePoolableObjectFactory { private String factoryName = ""; private NettyClient nettyClient; public NettyChannelFactory(NettyClient nettyClient) { super(); this.nettyClient = nettyClient; this.factoryName = "NettyChannelFactory_" + nettyClient.getUrl().getHost() + "_" + nettyClient.getUrl().getPort(); } @Override public Object makeObject() throws Exception {//创建连接时会调用 NettyChannel nettyChannel = new NettyChannel(nettyClient);//创建channel nettyChannel.open();//打开channel return nettyChannel; } }
6.NettyChannel
public class NettyChannel implements com.weibo.api.motan.transport.Channel { private volatile ChannelState state = ChannelState.UNINIT; private NettyClient nettyClient; private org.jboss.netty.channel.Channel channel = null; private InetSocketAddress remoteAddress = null; private InetSocketAddress localAddress = null; public NettyChannel(NettyClient nettyClient) { this.nettyClient = nettyClient; this.remoteAddress = new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort());//服务器host和port } public synchronized boolean open() {//打开连接 if (isAvailable()) { LoggerUtil.warn("the channel already open, local: " + localAddress + " remote: " + remoteAddress + " url: " + nettyClient.getUrl().getUri()); return true; } try { ChannelFuture channleFuture = nettyClient.getBootstrap().connect( new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort()));//打开连接 long start = System.currentTimeMillis(); int timeout = nettyClient.getUrl().getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue()); if (timeout <= 0) { throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR); } // 不去依赖于connectTimeout boolean result = channleFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS); boolean success = channleFuture.isSuccess(); if (result && success) { channel = channleFuture.getChannel(); if (channel.getLocalAddress() != null && channel.getLocalAddress() instanceof InetSocketAddress) { localAddress = (InetSocketAddress) channel.getLocalAddress(); } state = ChannelState.ALIVE; return true; } boolean connected = false; if(channleFuture.getChannel() != null){ connected = channleFuture.getChannel().isConnected(); } if (channleFuture.getCause() != null) { channleFuture.cancel(); throw new MotanServiceException("NettyChannel failed to connect to server, url: " + nettyClient.getUrl().getUri()+ ", result: " + result + ", success: " + success + ", connected: " + connected, channleFuture.getCause()); } else { channleFuture.cancel(); throw new MotanServiceException("NettyChannel connect to server timeout url: " + nettyClient.getUrl().getUri() + ", cost: " + (System.currentTimeMillis() - start) + ", result: " + result + ", success: " + success + ", connected: " + connected); } } catch (MotanServiceException e) { throw e; } catch (Exception e) { throw new MotanServiceException("NettyChannel failed to connect to server, url: " + nettyClient.getUrl().getUri(), e); } finally { if (!state.isAliveState()) { nettyClient.incrErrorCount();//增加错误次数 } } } }
本章知识点总结:
1.使用netty作为底层通讯框架;
2.每个refer对应一个nettyclient和一个nettychannel,nettychannel是由工厂类创建;
3.每个client在初始化时,最少创建2个长连接,由配置决定;
4.使用了GenericObjectPool来作为连接池;
5.当每个client的连续调用出错数达到阀值时,将自动设置此client为不可用;
6.心跳操作由客户端发起,只针对不可用状态的client。