zoukankan      html  css  js  c++  java
  • motan源码分析六:客户端与服务器的通信层分析

    本章将分析motan的序列化和底层通信相关部分的代码。

    1.在上一章中,有一个getrefers的操作,来获取所有服务器的引用,每个服务器的引用都是由DefaultRpcReferer来创建的

            public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) {
                super(clz, url, serviceUrl);
    
                endpointFactory =
                        ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(
                                url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));//通过spi加载NettyEndpointFactory
    
                client = endpointFactory.createClient(url);//创建client
            }

    2.NettyClient的创建过程及源码分析

        public Client createClient(URL url) {
            LoggerUtil.info(this.getClass().getSimpleName() + " create client: url={}", url);
            return createClient(url, heartbeatClientEndpointManager);//创建client
        }
    
        private Client createClient(URL url, EndpointManager endpointManager) {
            Client client = innerCreateClient(url);//调用NettyEndpointFactory的创建client的方法
    
            endpointManager.addEndpoint(client);//添加心跳管理
    
            return client;
        }
    
        protected Client innerCreateClient(URL url) {
            return new NettyClient(url);//返回NettyClient对象
        }
    
        public NettyClient(URL url) {
            super(url);
    
            maxClientConnection = url.getIntParameter(URLParamType.maxClientConnection.getName(),
                    URLParamType.maxClientConnection.getIntValue());
    
            timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay(
                    new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()),
                    MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD,
                    TimeUnit.MILLISECONDS);
            LoggerUtil.info("client's:"+url.getUri());
        }

    3.Netty相关的连接建立是通过open()方法进行的

    	public synchronized boolean open() {
    		if (isAvailable()) {
    			return true;
    		}
    
    		// 初始化netty client bootstrap
    		initClientBootstrap();
    
    		// 初始化连接池
    		initPool();
    
    		LoggerUtil.info("NettyClient finish Open: url={}", url);
    
    		// 注册统计回调
    		StatsUtil.registryStatisticCallback(this);
    
    		// 设置可用状态
    		state = ChannelState.ALIVE;
    		return state.isAliveState();
    	}
    	private void initClientBootstrap() {
    		bootstrap = new ClientBootstrap(channelFactory);
    		
    		bootstrap.setOption("keepAlive", true);
    		bootstrap.setOption("tcpNoDelay", true);
    
    		// 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时,
    		// 如果为了严格意义的timeout,那么需要应用端进行控制。
    		int timeout = getUrl().getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
            if (timeout <= 0) {
                throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
                        MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
            }
    		bootstrap.setOption("connectTimeoutMillis", timeout);
    
    		// 最大响应包限制
    		final int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(),
    				URLParamType.maxContentLength.getIntValue());
    
    		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    			public ChannelPipeline getPipeline() {
    				ChannelPipeline pipeline = Channels.pipeline();
    				pipeline.addLast("decoder", new NettyDecoder(codec, NettyClient.this, maxContentLength));//解码器
    				pipeline.addLast("encoder", new NettyEncoder(codec, NettyClient.this));//编码器
    				pipeline.addLast("handler", new NettyChannelHandler(NettyClient.this, new MessageHandler() {//业务处理的handler
    					@Override
    					public Object handle(Channel channel, Object message) {
    						Response response = (Response) message;
    
    						NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());//移调异步处理response信息
    
    						if (responseFuture == null) {
    							LoggerUtil.warn(
    									"NettyClient has response from server, but resonseFuture not exist,  requestId={}",
    									response.getRequestId());
    							return null;
    						}
    
    						if (response.getException() != null) {
    							responseFuture.onFailure(response);
    						} else {
    							responseFuture.onSuccess(response);
    						}
    
    						return null;
    					}
    				}));
    				return pipeline;
    			}
    		});
    	}
    

    4.连接池

        protected void initPool() {
            poolConfig = new GenericObjectPool.Config();//使用了GenericObjectPool作为连接池
            poolConfig.minIdle =
                    url.getIntParameter(URLParamType.minClientConnection.getName(), URLParamType.minClientConnection.getIntValue());//最小连接数,配置中为2个
            poolConfig.maxIdle =
                    url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue());//最大连接数,配置中为10个
            poolConfig.maxActive = poolConfig.maxIdle;
            poolConfig.maxWait = url.getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
            poolConfig.lifo = url.getBooleanParameter(URLParamType.poolLifo.getName(), URLParamType.poolLifo.getBooleanValue());
            poolConfig.minEvictableIdleTimeMillis = defaultMinEvictableIdleTimeMillis;
            poolConfig.softMinEvictableIdleTimeMillis = defaultSoftMinEvictableIdleTimeMillis;
            poolConfig.timeBetweenEvictionRunsMillis = defaultTimeBetweenEvictionRunsMillis;
            factory = createChannelFactory();//创建chanalfactory
    
            pool = new GenericObjectPool(factory, poolConfig);
    
            boolean lazyInit = url.getBooleanParameter(URLParamType.lazyInit.getName(), URLParamType.lazyInit.getBooleanValue());
    
            if (!lazyInit) {
                for (int i = 0; i < poolConfig.minIdle; i++) {//初始化2个长连接
                    try {
                        pool.addObject();
                        LoggerUtil.info("init client's connection :"+i);
                    } catch (Exception e) {
                        LoggerUtil.error("NettyClient init pool create connect Error: url=" + url.getUri(), e);
                    }
                }
            }
        }
    

    5.NettyChannelFactory

    public class NettyChannelFactory extends BasePoolableObjectFactory {
    	private String factoryName = "";
    	private NettyClient nettyClient;
    
    	public NettyChannelFactory(NettyClient nettyClient) {
    		super();
    
    		this.nettyClient = nettyClient;
    		this.factoryName = "NettyChannelFactory_" + nettyClient.getUrl().getHost() + "_"
    				+ nettyClient.getUrl().getPort();
    	}
    
    
    	@Override
    	public Object makeObject() throws Exception {//创建连接时会调用
    		NettyChannel nettyChannel = new NettyChannel(nettyClient);//创建channel
    		nettyChannel.open();//打开channel
    
    		return nettyChannel;
    	}
    
    }
    

    6.NettyChannel

    public class NettyChannel implements com.weibo.api.motan.transport.Channel {
    	private volatile ChannelState state = ChannelState.UNINIT;
    
    	private NettyClient nettyClient;
    
    	private org.jboss.netty.channel.Channel channel = null;
    
    	private InetSocketAddress remoteAddress = null;
    	private InetSocketAddress localAddress = null;
    
    	public NettyChannel(NettyClient nettyClient) {
    		this.nettyClient = nettyClient;
    		this.remoteAddress = new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort());//服务器host和port
    	}
    	public synchronized boolean open() {//打开连接
    		if (isAvailable()) {
    			LoggerUtil.warn("the channel already open, local: " + localAddress + " remote: " + remoteAddress + " url: "
    					+ nettyClient.getUrl().getUri());
    			return true;
    		}
    
    		try {
    			ChannelFuture channleFuture = nettyClient.getBootstrap().connect(
    					new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort()));//打开连接
    
    			long start = System.currentTimeMillis();
    
    			int timeout = nettyClient.getUrl().getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
    			if (timeout <= 0) {
    	            throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
    	                    MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
    			}
    			// 不去依赖于connectTimeout
    			boolean result = channleFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
                boolean success = channleFuture.isSuccess();
    
    			if (result && success) {
    				channel = channleFuture.getChannel();
    				if (channel.getLocalAddress() != null && channel.getLocalAddress() instanceof InetSocketAddress) {
    					localAddress = (InetSocketAddress) channel.getLocalAddress();
    				}
    
    				state = ChannelState.ALIVE;
    				return true;
    			}
                boolean connected = false;
                if(channleFuture.getChannel() != null){
                    connected = channleFuture.getChannel().isConnected();
                }
    
    			if (channleFuture.getCause() != null) {
    				channleFuture.cancel();
    				throw new MotanServiceException("NettyChannel failed to connect to server, url: "
    						+ nettyClient.getUrl().getUri()+ ", result: " + result + ", success: " + success + ", connected: " + connected, channleFuture.getCause());
    			} else {
    				channleFuture.cancel();
                    throw new MotanServiceException("NettyChannel connect to server timeout url: "
                            + nettyClient.getUrl().getUri() + ", cost: " + (System.currentTimeMillis() - start) + ", result: " + result + ", success: " + success + ", connected: " + connected);
                }
    		} catch (MotanServiceException e) {
    			throw e;
    		} catch (Exception e) {
    			throw new MotanServiceException("NettyChannel failed to connect to server, url: "
    					+ nettyClient.getUrl().getUri(), e);
    		} finally {
    			if (!state.isAliveState()) {
    				nettyClient.incrErrorCount();//增加错误次数
    			}
    		}
    	}
    }
    

    本章知识点总结:

    1.使用netty作为底层通讯框架;

    2.每个refer对应一个nettyclient和一个nettychannel,nettychannel是由工厂类创建;

    3.每个client在初始化时,最少创建2个长连接,由配置决定;

    4.使用了GenericObjectPool来作为连接池;

    5.当每个client的连续调用出错数达到阀值时,将自动设置此client为不可用;

    6.心跳操作由客户端发起,只针对不可用状态的client。

  • 相关阅读:
    apicloud图片上传
    APICloud上啦加载下拉刷新模块
    APICloud 获取缓存以及清除缓存(常用第三方方法)
    微信小程序跳转以及跳转的坑
    微信小程序,时间戳和日期格式互相转化
    微信小程序template使用
    APICloud开发小技巧(二)
    javax.persistence.TransactionRequiredException: Executing an update/delete query
    Spring的注解@Qualifier用法
    Spring @Service生成bean名称的规则
  • 原文地址:https://www.cnblogs.com/mantu/p/5886337.html
Copyright © 2011-2022 走看看