上文已经初步探讨了如何实现一个具体的transport,本文就来讨论一个具体的transport,本文讨论netty4的的相关实现。老规矩,看看motan-transport的目录结构。
其中最重要的类是啥,大声说出来,对,就是Netty4Client和Netty4Server。
图1-1motan-transport-netty4的源码结构
Netty4Client
首先看看NettyClinet的代码,最重要的就是open和request方法,可以看到open主要完成初始化工作。request主要请求和响应功能,heart表示心跳请求。
public class Netty4Client extends AbstractPoolClient implements StatisticCallback { // 回收过期任务 private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(4); // 异步的request,需要注册callback future // 触发remove的操作有: 1) service的返回结果处理。 2) timeout thread cancel protected ConcurrentMap<Long, NettyResponseFuture> callbackMap = new ConcurrentHashMap<Long, NettyResponseFuture>(); private ScheduledFuture<?> timeMonitorFuture = null; // 连续失败次数 private AtomicLong errorCount = new AtomicLong(0); // 最大连接数 private int maxClientConnection = 0; private Bootstrap bootstrap; public Netty4Client(URL url) { super(url); maxClientConnection = url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue()); timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay( new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()), MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, TimeUnit.MILLISECONDS); } @Override public Response request(Request request) throws TransportException { if (!isAvailable()) { throw new MotanServiceException("NettyChannel is unavaliable: url=" + url.getUri() + MotanFrameworkUtil.toString(request)); } boolean async = url.getMethodParameter(request.getMethodName(), request.getParamtersDesc() , URLParamType.async.getName(), URLParamType.async.getBooleanValue()); return request(request, async); } @Override public void heartbeat(Request request) { // 如果节点还没有初始化或者节点已经被close掉了,那么heartbeat也不需要进行了 if (state.isUnInitState() || state.isCloseState()) { LoggerUtil.warn("NettyClient heartbeat Error: state={} url={}", state.name(), url.getUri()); return; } LoggerUtil.info("NettyClient heartbeat request: url={}", url.getUri()); try { // async request后,如果service is // available,那么将会自动把该client设置成可用 request(request, true); } catch (Exception e) { LoggerUtil.error("NettyClient heartbeat Error: url=" + url.getUri(), e); } } /** * 请求remote service * <p> * <pre> * 1) get connection from pool * 2) async requset * 3) return connection to pool * 4) check if async return response, true: return ResponseFuture; false: return result * </pre> * * @param request * @param async * @return * @throws TransportException */ private Response request(Request request, boolean async) throws TransportException { Channel channel = null; Response response = null; try { // return channel or throw exception(timeout or connection_fail) channel = borrowObject(); if (channel == null) { LoggerUtil.error("NettyClient borrowObject null: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request)); return null; } // async request response = channel.request(request); // return channel to pool returnObject(channel); } catch (Exception e) { LoggerUtil.error( "NettyClient request Error: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request), e); //TODO 对特定的异常回收channel invalidateObject(channel); if (e instanceof MotanAbstractException) { throw (MotanAbstractException) e; } else { throw new MotanServiceException("NettyClient request Error: url=" + url.getUri() + " " + MotanFrameworkUtil.toString(request), e); } } // aysnc or sync result response = asyncResponse(response, async); return response; } /** * 如果async是false,那么同步获取response的数据 * * @param response * @param async * @return */ private Response asyncResponse(Response response, boolean async) { if (async || !(response instanceof NettyResponseFuture)) { return response; } return new DefaultResponse(response); } @Override public synchronized boolean open() { if (isAvailable()) { LoggerUtil.warn("NettyServer ServerChannel already Opened: url=" + url); return true; } // 初始化netty client bootstrap initClientBootstrap(); // 初始化连接池 initPool(); LoggerUtil.info("NettyClient finish Open: url={}", url); // 注册统计回调 StatsUtil.registryStatisticCallback(this); // 设置可用状态 state = ChannelState.ALIVE; return state.isAliveState(); } /** * 初始化 netty clientBootstrap */ private void initClientBootstrap() { bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new Netty4ClientInitializer(url, codec, this)); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); /* 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时, 如果为了严格意义的timeout,那么需要应用端进行控制。 */ int timeout = getUrl().getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue()); if (timeout <= 0) { throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR); } bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); } @Override public synchronized void close() { close(0); } /** * 目前close不支持timeout的概念 */ @Override public synchronized void close(int timeout) { if (state.isCloseState()) { LoggerUtil.info("NettyClient close fail: already close, url={}", url.getUri()); return; } // 如果当前nettyClient还没有初始化,那么就没有close的理由。 if (state.isUnInitState()) { LoggerUtil.info("NettyClient close Fail: don't need to close because node is unInit state: url={}", url.getUri()); return; } try { // 取消定期的回收任务 timeMonitorFuture.cancel(true); // 关闭连接池 pool.close(); // 清空callback callbackMap.clear(); // 设置close状态 state = ChannelState.CLOSE; // 解除统计回调的注册 StatsUtil.unRegistryStatisticCallback(this); LoggerUtil.info("NettyClient close Success: url={}", url.getUri()); } catch (Exception e) { LoggerUtil.error("NettyClient close Error: url=" + url.getUri(), e); } } @Override public boolean isClosed() { return state.isCloseState(); } @Override public boolean isAvailable() { return state.isAliveState(); } @Override public URL getUrl() { return url; } /** * connection factory */ @Override protected BasePoolableObjectFactory createChannelFactory() { return new NettyChannelFactory(this); } /** * 增加调用失败的次数: * <p> * <pre> * 如果连续失败的次数 >= maxClientConnection, 那么把client设置成不可用状态 * </pre> */ void incrErrorCount() { long count = errorCount.incrementAndGet(); // 如果节点是可用状态,同时当前连续失败的次数超过限制maxClientConnection次,那么把该节点标示为不可用 if (count >= maxClientConnection && state.isAliveState()) { synchronized (this) { count = errorCount.longValue(); if (count >= maxClientConnection && state.isAliveState()) { LoggerUtil.error("NettyClient unavailable Error: url=" + url.getIdentity() + " " + url.getServerPortStr()); state = ChannelState.UNALIVE; } } } } /** * 重置调用失败的计数 : * <p> * <pre> * 把节点设置成可用 * </pre> */ void resetErrorCount() { errorCount.set(0); if (state.isAliveState()) { return; } synchronized (this) { if (state.isAliveState()) { return; } // 如果节点是unalive才进行设置,而如果是 close 或者 uninit,那么直接忽略 if (state.isUnAliveState()) { long count = errorCount.longValue(); // 过程中有其他并发更新errorCount的,因此这里需要进行一次判断 if (count < maxClientConnection) { state = ChannelState.ALIVE; LoggerUtil.info("NettyClient recover available: url=" + url.getIdentity() + " " + url.getServerPortStr()); } } } } /** * 注册回调的resposne * <p> * <pre> * * 进行最大的请求并发数的控制,如果超过NETTY_CLIENT_MAX_REQUEST的话,那么throw reject exception * * </pre> * * @param requestId * @param nettyResponseFuture * @throws MotanServiceException */ public void registerCallback(long requestId, NettyResponseFuture nettyResponseFuture) { if (this.callbackMap.size() >= MotanConstants.NETTY_CLIENT_MAX_REQUEST) { // reject request, prevent from OutOfMemoryError throw new MotanServiceException("NettyClient over of max concurrent request, drop request, url: " + url.getUri() + " requestId=" + requestId, MotanErrorMsgConstant.SERVICE_REJECT); } this.callbackMap.put(requestId, nettyResponseFuture); } /** * 统计回调接口 */ @Override public String statisticCallback() { //避免消息泛滥,如果节点是可用状态,并且堆积的请求不超过100的话,那么就不记录log了 if (isAvailable() && callbackMap.size() < 100) { return null; } return String.format("identity: %s available: %s concurrent_count: %s", url.getIdentity(), isAvailable(), callbackMap.size()); } /** * 移除回调的response * * @param requestId * @return */ public NettyResponseFuture removeCallback(long requestId) { return callbackMap.remove(requestId); } public Bootstrap getBootstrap() { return bootstrap; } /** * 回收超时任务 * * @author maijunsheng */ class TimeoutMonitor implements Runnable { private String name; public TimeoutMonitor(String name) { this.name = name; } public void run() { long currentTime = System.currentTimeMillis(); for (Map.Entry<Long, NettyResponseFuture> entry : callbackMap.entrySet()) { try { NettyResponseFuture future = entry.getValue(); if (future.getCreateTime() + future.getTimeout() <currentTime) { // timeout: remove from callback list, and then cancel removeCallback(entry.getKey()); future.cancel(); } } catch (Exception e) { LoggerUtil.error( name + " clear timeout future Error: uri=" + url.getUri() + " requestId=" + entry.getKey(), e); } } } } }
open的最核心功就是各种设置。
initClientBootstrap流程中
bootstrap.group(group).channel(NioSocketChannel.class).handler(new Netty4ClientInitializer(url, codec, this));
这句中的Netty4ClientInitializer泪中,指定如下3个handler.
Netty4Decoder:协议的解码器
Netty4Encoder:协议的编码器
Netty4ClientHandler:对服务器的返回结果的处理。
1 public class Netty4ClientInitializer extends ChannelInitializer<SocketChannel> { 2 3 private URL url; 4 5 private Codec codec; 6 7 private Netty4Client client; 8 9 public Netty4ClientInitializer(URL url, Codec codec, Netty4Client client) { 10 this.url = url; 11 this.codec = codec; 12 this.client = client; 13 } 14 15 @Override 16 protected void initChannel(SocketChannel ch) throws Exception { 17 ChannelPipeline p = ch.pipeline(); 18 // 最大响应包限制 19 int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(), 20 URLParamType.maxContentLength.getIntValue()); 21 p.addLast("decoder", new Netty4Decoder(codec, client, maxContentLength)); 22 p.addLast("encoder", new Netty4Encoder(codec, client)); 23 p.addLast("handler", new Netty4ClientHandler(client)); 24 25 } 26 }
编码和解码是非常重要的过程,有个模块专门负责这个过程,现在不表。
Netty4ClientHandler主要是request()中对响应的处理,最终执行就是以下FutureListener中的operationComplete相关代码。
1 if (result && writeFuture.isSuccess()) { 2 response.addListener(new FutureListener() { 3 @Override 4 public void operationComplete(Future future) throws Exception { 5 if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) { 6 // 成功的调用 7 nettyClient.resetErrorCount(); 8 } else { 9 // 失败的调用 10 nettyClient.incrErrorCount(); 11 } 12 } 13 }); 14 return response; 15 }