zoukankan      html  css  js  c++  java
  • motan源码分析八:涉及到底层的客户端调用

    之前我们分析了客户端调用服务端的源码,但是没有涉及到通讯层和序列化层,本文将之前讲过的内容做一次串联。

    1.上层通过动态代理调用refer的call,每个refer又对应一个nettyclient,下面来看一下nettyclient的调用服务端操作

    	private Response request(Request request, boolean async) throws TransportException {
    		Channel channel = null;
    
    		Response response = null;
    
    		try {
    			// return channel or throw exception(timeout or connection_fail)
    			channel = borrowObject();//向连接池拿连接
    
    			if (channel == null) {
    				LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " "
    						+ MotanFrameworkUtil.toString(request));
    				return null;
    			}
    
    			// async request
    			response = channel.request(request);//调用channel的request
    			// return channel to pool
    			returnObject(channel);//归还连接
    		} catch (Exception e) {
    			LoggerUtil.error(
    					"NettyClient request Error: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request), e);
    			//TODO 对特定的异常回收channel
    			invalidateObject(channel);//销毁坏的连接
    
    			if (e instanceof MotanAbstractException) {
    				throw (MotanAbstractException) e;
    			} else {
    				throw new MotanServiceException("NettyClient request Error: url=" + url.getUri() + " "
    						+ MotanFrameworkUtil.toString(request), e);
    			}
    		}
    
    		// aysnc or sync result
    		response = asyncResponse(response, async);//处理response
    
    		return response;
    	}
    

    2.nettychannel的request操作

    	public Response request(Request request) throws TransportException {
    	    int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(),
    	            URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
    		if (timeout <= 0) {
                   throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
                           MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
               }
    		NettyResponseFuture response = new NettyResponseFuture(request, timeout, this.nettyClient);//创建异步response对象
    		this.nettyClient.registerCallback(request.getRequestId(), response);//将此response存入到map,处理完后,会移出
    		ChannelFuture writeFuture = this.channel.write(request);//向服务端传递request对象,写之前会进行序列化的操作
    
    		boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);//标识是否成功
    
    		if (result && writeFuture.isSuccess()) {
    			response.addListener(new FutureListener() {//增加response的监听器
    				@Override
    				public void operationComplete(Future future) throws Exception {
    					if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
    						// 成功的调用 
    						nettyClient.resetErrorCount();//成功
    					} else {
    						// 失败的调用 
    						nettyClient.incrErrorCount();//对失败次数+1,如果同一个client连续失败达到所有的连接次数时,标识此client不可用,由心跳管理器负责恢复此client的可用状态
    					}
    				}
    			});
    			return response;//返回此response,此response为异步的response,由业务线程接手后续接收的过程
    		}
    
    		writeFuture.cancel();
    		response = this.nettyClient.removeCallback(request.getRequestId());//在map中移出此response
    
    		if (response != null) {
    			response.cancel();
    		}
    
    		// 失败的调用 
    		nettyClient.incrErrorCount();
    
    		if (writeFuture.getCause() != null) {
    			throw new MotanServiceException("NettyChannel send request to server Error: url="
    					+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
    					+ MotanFrameworkUtil.toString(request), writeFuture.getCause());
    		} else {
    			throw new MotanServiceException("NettyChannel send request to server Timeout: url="
    					+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
    					+ MotanFrameworkUtil.toString(request));
    		}
    	}
    

    3.异步的response NettyResponseFuture

    	public Object getValue() {
    		synchronized (lock) {
    			if (!isDoing()) {
    				return getValueOrThrowable();//返回成功值或失败
    			}
    
    			if (timeout <= 0) {
    				try {
    					lock.wait();//未接收完毕则一直等待
    				} catch (Exception e) {
    					cancel(new MotanServiceException("NettyResponseFuture getValue InterruptedException : "
    							+ MotanFrameworkUtil.toString(request) + " cost="
    							+ (System.currentTimeMillis() - createTime), e));
    				}
    
    				// don't need to notifylisteners, because onSuccess or
    				// onFailure or cancel method already call notifylisteners
    				return getValueOrThrowable();
    			} else {
    				long waitTime = timeout - (System.currentTimeMillis() - createTime);//等待的时间
    
    				if (waitTime > 0) {
    					for (;;) {
    						try {
    							lock.wait(waitTime);//要么被通知,要么超时
    						} catch (InterruptedException e) {
    						}
    
    						if (!isDoing()) {
    							break;
    						} else {
    							waitTime = timeout - (System.currentTimeMillis() - createTime);
    							if (waitTime <= 0) {
    								break;
    							}
    						}
    					}
    				}
    
    				if (isDoing()) {
    					timeoutSoCancel();
    				}
    			}
    			return getValueOrThrowable();
    		}
    	}
    

    本章知识点:

    1.motan通过NettyResponseFuture来实现在框架层面异步处理同一笔业务,提升了框架的性能;

    2.对于连续失败的client,进行下线操作。

      

  • 相关阅读:
    hadoop shell 命令
    java正则提取括号中的关键词
    java使用ac算法实现关键词高亮
    mysql事务级别和spring中应用
    elasticsearch java工具类
    【记录】研究生周练题目清单
    【记录】研究生已阅文献清单
    论文阅读(11)RoBERTa: A Robustly Optimized BERT Pretraining Approach(2019)
    论文阅读(10)Shallow Convolutional Neural Network for Implicit Discourse Relation Recognition
    论文阅读(9)Towards Cross-Domain PDTB-Style Discourse Parsing(2014)
  • 原文地址:https://www.cnblogs.com/mantu/p/5886544.html
Copyright © 2011-2022 走看看