zoukankan      html  css  js  c++  java
  • motan源码分析六:客户端与服务器的通信层分析

    本章将分析motan的序列化和底层通信相关部分的代码。

    1.在上一章中,有一个getrefers的操作,来获取所有服务器的引用,每个服务器的引用都是由DefaultRpcReferer来创建的

            public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) {
                super(clz, url, serviceUrl);
    
                endpointFactory =
                        ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
                                url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));//通过spi加载NettyEndpointFactory
    
                client = endpointFactory.createClient(url);//创建client
            }

    2.NettyClient的创建过程及源码分析

        public Client createClient(URL url) {
            LoggerUtil.info(this.getClass().getSimpleName() + " create client: url={}", url);
            return createClient(url, heartbeatClientEndpointManager);//创建client
        }
    
        private Client createClient(URL url, EndpointManager endpointManager) {
            Client client = innerCreateClient(url);//调用NettyEndpointFactory的创建client的方法
    
            endpointManager.addEndpoint(client);//添加心跳管理
    
            return client;
        }
    
        protected Client innerCreateClient(URL url) {
            return new NettyClient(url);//返回NettyClient对象
        }
    
        public NettyClient(URL url) {
            super(url);
    
            maxClientConnection = url.getIntParameter(URLParamType.maxClientConnection.getName(),
                    URLParamType.maxClientConnection.getIntValue());
    
            timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay(
                    new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()),
                    MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD,
                    TimeUnit.MILLISECONDS);
            LoggerUtil.info("client's:"+url.getUri());
        }

    3.Netty相关的连接建立是通过open()方法进行的

    	public synchronized boolean open() {
    		if (isAvailable()) {
    			return true;
    		}
    
    		// 初始化netty client bootstrap
    		initClientBootstrap();
    
    		// 初始化连接池
    		initPool();
    
    		LoggerUtil.info("NettyClient finish Open: url={}", url);
    
    		// 注册统计回调
    		StatsUtil.registryStatisticCallback(this);
    
    		// 设置可用状态
    		state = ChannelState.ALIVE;
    		return state.isAliveState();
    	}
    	private void initClientBootstrap() {
    		bootstrap = new ClientBootstrap(channelFactory);
    		
    		bootstrap.setOption("keepAlive", true);
    		bootstrap.setOption("tcpNoDelay", true);
    
    		// 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时,
    		// 如果为了严格意义的timeout,那么需要应用端进行控制。
    		int timeout = getUrl().getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
            if (timeout <= 0) {
                throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
                        MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
            }
    		bootstrap.setOption("connectTimeoutMillis", timeout);
    
    		// 最大响应包限制
    		final int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(),
    				URLParamType.maxContentLength.getIntValue());
    
    		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    			public ChannelPipeline getPipeline() {
    				ChannelPipeline pipeline = Channels.pipeline();
    				pipeline.addLast("decoder", new NettyDecoder(codec, NettyClient.this, maxContentLength));//解码器
    				pipeline.addLast("encoder", new NettyEncoder(codec, NettyClient.this));//编码器
    				pipeline.addLast("handler", new NettyChannelHandler(NettyClient.this, new MessageHandler() {//业务处理的handler
    					@Override
    					public Object handle(Channel channel, Object message) {
    						Response response = (Response) message;
    
    						NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());//移调异步处理response信息
    
    						if (responseFuture == null) {
    							LoggerUtil.warn(
    									"NettyClient has response from server, but resonseFuture not exist,  requestId={}",
    									response.getRequestId());
    							return null;
    						}
    
    						if (response.getException() != null) {
    							responseFuture.onFailure(response);
    						} else {
    							responseFuture.onSuccess(response);
    						}
    
    						return null;
    					}
    				}));
    				return pipeline;
    			}
    		});
    	}
    

    4.连接池

        protected void initPool() {
            poolConfig = new GenericObjectPool.Config();//使用了GenericObjectPool作为连接池
            poolConfig.minIdle =
                    url.getIntParameter(URLParamType.minClientConnection.getName(), URLParamType.minClientConnection.getIntValue());//最小连接数,配置中为2个
            poolConfig.maxIdle =
                    url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue());//最大连接数,配置中为10个
            poolConfig.maxActive = poolConfig.maxIdle;
            poolConfig.maxWait = url.getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
            poolConfig.lifo = url.getBooleanParameter(URLParamType.poolLifo.getName(), URLParamType.poolLifo.getBooleanValue());
            poolConfig.minEvictableIdleTimeMillis = defaultMinEvictableIdleTimeMillis;
            poolConfig.softMinEvictableIdleTimeMillis = defaultSoftMinEvictableIdleTimeMillis;
            poolConfig.timeBetweenEvictionRunsMillis = defaultTimeBetweenEvictionRunsMillis;
            factory = createChannelFactory();//创建chanalfactory
    
            pool = new GenericObjectPool(factory, poolConfig);
    
            boolean lazyInit = url.getBooleanParameter(URLParamType.lazyInit.getName(), URLParamType.lazyInit.getBooleanValue());
    
            if (!lazyInit) {
                for (int i = 0; i < poolConfig.minIdle; i++) {//初始化2个长连接
                    try {
                        pool.addObject();
                        LoggerUtil.info("init client's connection :"+i);
                    } catch (Exception e) {
                        LoggerUtil.error("NettyClient init pool create connect Error: url=" + url.getUri(), e);
                    }
                }
            }
        }
    

    5.NettyChannelFactory

    public class NettyChannelFactory extends BasePoolableObjectFactory {
    	private String factoryName = "";
    	private NettyClient nettyClient;
    
    	public NettyChannelFactory(NettyClient nettyClient) {
    		super();
    
    		this.nettyClient = nettyClient;
    		this.factoryName = "NettyChannelFactory_" + nettyClient.getUrl().getHost() + "_"
    				+ nettyClient.getUrl().getPort();
    	}
    
    
    	@Override
    	public Object makeObject() throws Exception {//创建连接时会调用
    		NettyChannel nettyChannel = new NettyChannel(nettyClient);//创建channel
    		nettyChannel.open();//打开channel
    
    		return nettyChannel;
    	}
    
    }
    

    6.NettyChannel

    public class NettyChannel implements com.weibo.api.motan.transport.Channel {
    	private volatile ChannelState state = ChannelState.UNINIT;
    
    	private NettyClient nettyClient;
    
    	private org.jboss.netty.channel.Channel channel = null;
    
    	private InetSocketAddress remoteAddress = null;
    	private InetSocketAddress localAddress = null;
    
    	public NettyChannel(NettyClient nettyClient) {
    		this.nettyClient = nettyClient;
    		this.remoteAddress = new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort());//服务器host和port
    	}
    	public synchronized boolean open() {//打开连接
    		if (isAvailable()) {
    			LoggerUtil.warn("the channel already open, local: " + localAddress + " remote: " + remoteAddress + " url: "
    					+ nettyClient.getUrl().getUri());
    			return true;
    		}
    
    		try {
    			ChannelFuture channleFuture = nettyClient.getBootstrap().connect(
    					new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort()));//打开连接
    
    			long start = System.currentTimeMillis();
    
    			int timeout = nettyClient.getUrl().getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
    			if (timeout <= 0) {
    	            throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
    	                    MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
    			}
    			// 不去依赖于connectTimeout
    			boolean result = channleFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
                boolean success = channleFuture.isSuccess();
    
    			if (result && success) {
    				channel = channleFuture.getChannel();
    				if (channel.getLocalAddress() != null && channel.getLocalAddress() instanceof InetSocketAddress) {
    					localAddress = (InetSocketAddress) channel.getLocalAddress();
    				}
    
    				state = ChannelState.ALIVE;
    				return true;
    			}
                boolean connected = false;
                if(channleFuture.getChannel() != null){
                    connected = channleFuture.getChannel().isConnected();
                }
    
    			if (channleFuture.getCause() != null) {
    				channleFuture.cancel();
    				throw new MotanServiceException("NettyChannel failed to connect to server, url: "
    						+ nettyClient.getUrl().getUri()+ ", result: " + result + ", success: " + success + ", connected: " + connected, channleFuture.getCause());
    			} else {
    				channleFuture.cancel();
                    throw new MotanServiceException("NettyChannel connect to server timeout url: "
                            + nettyClient.getUrl().getUri() + ", cost: " + (System.currentTimeMillis() - start) + ", result: " + result + ", success: " + success + ", connected: " + connected);
                }
    		} catch (MotanServiceException e) {
    			throw e;
    		} catch (Exception e) {
    			throw new MotanServiceException("NettyChannel failed to connect to server, url: "
    					+ nettyClient.getUrl().getUri(), e);
    		} finally {
    			if (!state.isAliveState()) {
    				nettyClient.incrErrorCount();//增加错误次数
    			}
    		}
    	}
    }
    

    本章知识点总结:

    1.使用netty作为底层通讯框架;

    2.每个refer对应一个nettyclient和一个nettychannel,nettychannel是由工厂类创建;

    3.每个client在初始化时,最少创建2个长连接,由配置决定;

    4.使用了GenericObjectPool来作为连接池;

    5.当每个client的连续调用出错数达到阀值时,将自动设置此client为不可用;

    6.心跳操作由客户端发起,只针对不可用状态的client。

  • 相关阅读:
    42. Trapping Rain Water
    223. Rectangle Area
    645. Set Mismatch
    541. Reverse String II
    675. Cut Off Trees for Golf Event
    安装 VsCode 插件安装以及配置
    向上取整 向下取整 四舍五入 产生100以内随机数
    JS 判断是否为数字 数字型特殊值
    移动端初始配置,兼容不同浏览器的渲染内核
    Flex移动布局中单行和双行布局的区别以及使用
  • 原文地址:https://www.cnblogs.com/mantu/p/5886337.html
Copyright © 2011-2022 走看看