前言
RocketMQ的网络通信是基于Netty实现的RPC框架,这些RPC框架实现的功能都具有通用性,如sofa-bolt,分布式服务框架Dubbo,实现的网络通信模型都具有协议定义,同步请求,异步请求,单向请求,负载均衡,流控,心跳,重连等机制。
服务端NettyRemotingServer
通信的服务端继承NettyRemotingAbstract实现RemotingServer
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer
server端实现的功能:
public interface RemotingServer extends RemotingService { // 注册处理器 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); int localListenPort(); Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException; void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
server端初始化
实现Reactor主从多线程模型,初始化3个eventloop,
private final EventLoopGroup eventLoopGroupSelector; // 用于就绪选择,可以使Epoll或者NIO的模式。默认3个线程数
private final EventLoopGroup eventLoopGroupBoss; //作为acceptor负责与client建立连接 一个线程足够了
private DefaultEventExecutorGroup defaultEventExecutorGroup; // 工作线程池用于处理handler,默认8个线程
server端启动
初始化serverBootstrap,设置TCP参数,定义handler调用链。
@Override public void start() { // 处理handler i/o 线程组 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); // 对于线程池的配置 可以参考一下 Reactor 主从多线程池模型 ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler()); } }); // 启用字节缓冲池 if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } // 启动netty事件监听 if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } // 定时任务处理当前响应 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
这里面主要关注这几个handler,NettyEncoder,NettyDecoder,IdleStateHandler,NettyConnectManageHandler,NettyServerHandler,将会在下文介绍。
客户端 NettyRemotingClient
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient
client端实现的功能,与server端类似
public interface RemotingClient extends RemotingService { public void updateNameServerAddressList(final List<String> addrs); public List<String> getNameServerAddressList(); public RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException; public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; public void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); public boolean isChannelWriteable(final String addr); }
客户端发起连接,是在发送消息的时候实时建立连接的。
client初始化bootstrap
跟server不同的是,client需要一个eventLoopGroupWorker负责建立连接,并且初始化一个defaultEventExecutorGroup处理handler。
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// nettyClientConfig.getClientWorkerThreads(), // new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } }); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }
相关的handler将会在下文介绍。
通信协议RemotingCommand
定义了远程调用的数据结构,定义了私有化协议,具体看一下这些属性:
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND // 1, RESPONSE_COMMAND private static final int RPC_ONEWAY = 1; // 0, RPC // 1, Oneway /** * 缓存了具体的CommandCustomHeader都有哪些field */ private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP = new HashMap<Class<? extends CommandCustomHeader>, Field[]>(); /** * 缓存具体的CommandCustomHeader对应的简单类名 */ private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>(); /** * 缓存一个field和对应的注解信息,用于后续解码后,数据的非空校验等 */ private static final Map<Field, Annotation> NOT_NULL_ANNOTATION_CACHE = new HashMap<Field, Annotation>(); // 具体的请求编号 private int code; private LanguageCode language = LanguageCode.JAVA; private int version = 0; // 一次rpc的请求ID private int opaque = requestId.getAndIncrement(); private int flag = 0; // 异常信息记录在remark private String remark; //扩展字段,数据序列化前后存储结构 private HashMap<String, String> extFields; // 数据编码之前 把customHeader中的属性转化成extFields 再序列化 // 数据反序列化之后 存在在extFields 再装换成具体的customHeader private transient CommandCustomHeader customHeader; /** * 有rocketmq和json两种序列化方式,默认为json */ private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; // 消息message,调用接口传入 private transient byte[] body;
编解码
NettyEncoder
实际上就是把请求header与body的数据发送出去
ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); // body的编码是上面在message中序列化好的 byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); }
具体的编码过程:
header返回的缓冲字节结构如下:
|-- int 4byte 总长度(包括body长度)--|--int 4bytes 序列化信息--| -- headerData--|
private ByteBuffer encodeHeader(final int bodyLength) { //计算分配的内存大小 最前面需要四个字节,放一个int整型 // 1> header length size 最前面放一个Int整型 占用四个字节 int length = 4; // 2> header data length 这里面包含很多信息 byte[] headerData; headerData = this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; // 放 总长度(4) + 序列化类型(4)+ header的长度 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // 目前为止 length的值包含了 4 + headerData.length + body.length // length 这个值不包含 ProtocolType长度 result.putInt(length); // header length 4个字节 result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }
4个字节的的整型int序列化信息,包含着一个字节的序列化code和3个字节保存一个4字节的headerLength
public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; result[0] = type.getCode(); result[1] = (byte) ((source >> 16) & 0xFF); result[2] = (byte) ((source >> 8) & 0xFF); result[3] = (byte) (source & 0xFF); return result; }
// 获取headerlength
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
接下来看一下headerData = this.headerEncode(); 具体如何编码header的
private byte[] headerEncode() { // 这个方法实际上就是 把自定义header类里面的字段,封装成map,放在extFields里面 以便后续序列化 this.makeCustomHeaderToNet(); if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { return RocketMQSerializable.rocketMQProtocolEncode(this); } else { // json序列化很好理解 就是 JSON.toJSONString(obj, prettyFormat); return RemotingSerializable.encode(this); } }
herderData在RocketMQ自定义序列化的条件下,结构是这样的,
private static int calTotalLen(int remark, int ext) { // int code(~32767) int length = 2 // LanguageCode language + 1 // int version(~32767) + 2 // int opaque + 4 // int flag + 4 // String remark + 4 + remark // HashMap<String, String> extFields + 4 + ext; return length;
其中ext的序列化方式是
// 都是长度加数据的方式追加 Iterator<Map.Entry<String, String>> it = map.entrySet().iterator(); while (it.hasNext()) { int kvLength; // keylength(short 2)+ keybyte + valuelength(int 4) + valuebyte Map.Entry<String, String> entry = it.next(); if (entry.getKey() != null && entry.getValue() != null) { kvLength = 2 + entry.getKey().getBytes(CHARSET_UTF8).length + 4 + entry.getValue().getBytes(CHARSET_UTF8).length; totalLength += kvLength; }
NettyDecoder
public class NettyDecoder extends LengthFieldBasedFrameDecoder
public NettyDecoder() { /** * 1.从消息开头偏移lengthFieldOffset长度, 到达A位置 * * 2.再从A位置读取lengthFieldLength长度, 到达B位置, 内容是d * * 3.再从B位置读取(d+lengthAdjustment)长度, 达到D位置 * * 4.从消息开头跳过initialBytesToStrip长度到达C位置 * * 5.将C位置-D位置之间的内容传送给接下来的处理器进行后续处理 * * 根据编码器定义 实际上一个数据包是 4字节的序列化信息 + headerData + body */ super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
主要关注 RemotingCommand.decode(byteBuffer);逻辑,
public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit(); // 4+ headerlength + bodylength int oriHeaderLen = byteBuffer.getInt(); // 4个字节 序列化方式信息 // HeaderData长度 int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); // 反序列化headerData RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); // 获取body字节 int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { switch (type) { case JSON: // 相对简单 这边不详细介绍了 RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: // 主要看一下 RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; }
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) { RemotingCommand cmd = new RemotingCommand(); ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray); // int code(~32767) cmd.setCode(headerBuffer.getShort()); // LanguageCode language cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get())); // int version(~32767) cmd.setVersion(headerBuffer.getShort()); // int opaque cmd.setOpaque(headerBuffer.getInt()); // int flag cmd.setFlag(headerBuffer.getInt()); // String remark int remarkLength = headerBuffer.getInt(); if (remarkLength > 0) { byte[] remarkContent = new byte[remarkLength]; headerBuffer.get(remarkContent); cmd.setRemark(new String(remarkContent, CHARSET_UTF8)); } // HashMap<String, String> extFields int extFieldsLength = headerBuffer.getInt(); if (extFieldsLength > 0) { byte[] extFieldsBytes = new byte[extFieldsLength]; headerBuffer.get(extFieldsBytes); //mapDeserialize方法会把extmap逐个解码 cmd.setExtFields(mapDeserialize(extFieldsBytes)); } return cmd; }
网络事件处理机制
这里主要介绍IdleStateHandler 和 NettyConnectManageHandler
IdleStateHandler
// IdleStateHandler 是netty提供的心跳机制 其本身不会发送心跳数据
// 用来检测读空闲 写空闲 还是读写空闲,根据设置的时间来触发对应的事件
// 用户自定义的handler的userEventTriggered会捕捉到空闲超时类型,用户可以
//自定义处理逻辑 比如写空闲了 可以发送心跳数据
// 设置0表示不启用设置
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
NettyConnectManageHandler
class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress); final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress); log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote); super.connect(ctx, remoteAddress, localAddress, promise); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel())); } } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress); closeChannel(ctx.channel()); super.disconnect(ctx, promise); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress); closeChannel(ctx.channel()); super.close(ctx, promise); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } /** * 空闲时,客户端会主动断开连接 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress); closeChannel(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel())); } } } ctx.fireUserEventTriggered(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress); log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause); closeChannel(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel())); } } }
读者可以发现,在每一次网络事件都会去组装一个NettyEvent的事件信息放到一个网络事件处理队列中
NettyRemotingClient.this .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
这边是典型的生产者和消费者线程模型
/** * 这个内部类的作用: * 当nettyclient监听到Netty事件,比如连接 断开 异常等情况 执行客户端自定义的事件处理 */ class NettyEventExecutor extends ServiceThread { private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<>(); private final int maxSize = 10000; public void putNettyEvent(final NettyEvent event) { if (this.eventQueue.size() <= maxSize) { this.eventQueue.add(event); } else { log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString()); } } @Override public String getServiceName() { return NettyEventExecutor.class.getSimpleName(); } @Override public void run() { log.info(this.getServiceName() + " service started"); final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener(); while (!this.isStopped()) { try { NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS); if (event != null && listener != null) { switch (event.getType()) { case IDLE: listener.onChannelIdle(event.getRemoteAddr(), event.getChannel()); break; case CLOSE: listener.onChannelClose(event.getRemoteAddr(), event.getChannel()); break; case CONNECT: listener.onChannelConnect(event.getRemoteAddr(), event.getChannel()); break; case EXCEPTION: listener.onChannelException(event.getRemoteAddr(), event.getChannel()); break; default: break; } } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); } } } }
这边延伸介绍一下后台线程基类ServiceThread,本身是一个runnable对象,stop方法和shutdown方法 与 waitForRunning方法 实现等待通知的模型。在后面的刷盘,数据同步等都采用这种方式
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.remoting.common; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Base class for background thread */ public abstract class ServiceThread implements Runnable { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final long JOIN_TIME = 90 * 1000; protected final Thread thread; protected volatile boolean hasNotified = false; protected volatile boolean stopped = false; public ServiceThread() { this.thread = new Thread(this, this.getServiceName()); } public abstract String getServiceName(); public void start() { this.thread.start(); } public void shutdown() { this.shutdown(false); } public void shutdown(final boolean interrupt) { this.stopped = true; log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt); synchronized (this) { if (!this.hasNotified) { this.hasNotified = true; this.notify(); } } try { if (interrupt) { this.thread.interrupt(); } long beginTime = System.currentTimeMillis(); this.thread.join(this.getJointime()); long eclipseTime = System.currentTimeMillis() - beginTime; log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " + this.getJointime()); } catch (InterruptedException e) { log.error("Interrupted", e); } } public long getJointime() { return JOIN_TIME; } public void stop() { this.stop(false); } public void stop(final boolean interrupt) { this.stopped = true; log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt); synchronized (this) { if (!this.hasNotified) { this.hasNotified = true; this.notify(); } } if (interrupt) { this.thread.interrupt(); } } public void makeStop() { this.stopped = true; log.info("makestop thread " + this.getServiceName()); } public void wakeup() { synchronized (this) { if (!this.hasNotified) { this.hasNotified = true; this.notify(); } } } protected void waitForRunning(long interval) { synchronized (this) { if (this.hasNotified) { this.hasNotified = false; this.onWaitEnd(); return; } try { this.wait(interval); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { this.hasNotified = false; this.onWaitEnd(); } } } protected void onWaitEnd() { } public boolean isStopped() { return stopped; } }
超时请求定时清除
在client和server端还维护了一个timer的定时任务
this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000);
/** * <p> * This method is periodically invoked to scan and expire deprecated request. * </p> */ public void scanResponseTable() { final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>(); Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator(); while (it.hasNext()) { Entry<Integer, ResponseFuture> next = it.next(); ResponseFuture rep = next.getValue(); // 停止等待超时响应 并删除缓存 if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { rep.release(); it.remove(); rfList.add(rep); log.warn("remove timeout request, " + rep); } } // 同时对超时的异步请求 执行回调 for (ResponseFuture rf : rfList) { try { executeInvokeCallback(rf); } catch (Throwable e) { log.warn("scanResponseTable, operationComplete Exception", e); } } }
处理接收的远程数据 NettyClientHandler 和 NettyServerHandler
客户端或者服务端对接收的数据可以分两类,一个是远程服务请求操作,另外一个是远程服务响应返回。
REQUEST_COMMAND
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
REQUEST_COMMAND
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { // 根据请求code,获取已经注册的处理器与线程池配对组 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); // 如果找不到 则使用默认处理器 在server端使用adminprocess作为默认处理器 final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { // rpchook RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } // 处理请求的具体逻辑,返回RemotingCommand响应数据 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } // 对于远程过来的请求类型不是单向请求的话 则设置opaque ResponseType if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, // RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { // 把任务包装成RequestTask submit给对应的线程池 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) // + ", too many requests and system thread pool busy, RejectedExecutionException " // + pair.getObject2().toString() // + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }
RESPONSE_COMMAND
返回响应数据相对简单一些,同步本地请求的话,就直接设置返回值,如果本地是异步请求的话,使用回调线程池处理回调函数,最后在缓存中删除对应的responseFuture
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseFuture.release(); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
同步请求
1 客户端主动发起连接,如果没传addr,请求namesrv,否则获取addr对应的channel,如果channel没有在缓存中找到,则创建channel。
2 执行RpcHook
3 同步请求会创建一个ResponseFuture,并缓存起来,等到数据发送成功之后,设置发送请求成功并立刻返回。如果发送失败,则设置异常信息,并返回。
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } });
那么如何实现同步的呢?
接着调用下面的代码,
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
这个就得益于ResponseFuture中的countDownLatch,实现同步等待,每一个response都有有一个独立的countDownLatch,当前调用线程执行完waitResponse会等待中,等到响应回调的时候(见RESPONSE_COMMAND中的代码),会调用putResponse,将会设置返回结果,同步等待得到唤醒。
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); }
最后在缓存中删除对应的ResponseFuture
异步请求
相对于同步请求,异步请求只有前置的rpchook,同时实现异步流控,并在发送完数据之后,释放信号量。另外调用线程不需要调用waitResponse方法同步等待,
等到响应回调的时候(见RESPONSE_COMMAND中的代码),会调用executeInvokeCallback(responseFuture);
private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor(); if (executor != null) { try { executor.submit(new Runnable() { @Override public void run() { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("execute callback in executor exception, and callback throw", e); } } }); } catch (Exception e) { runInThisThread = true; log.warn("execute callback in executor exception, maybe executor busy", e); } } else { runInThisThread = true; } if (runInThisThread) { try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn("executeInvokeCallback Exception", e); } } }
/** * 包装回调只处理一次 ,可能存在网络回调和定时调度并行处理 使用布尔原子类能保证对个线程只处理一次 */ public void executeInvokeCallback() { if (invokeCallback != null) { if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) { invokeCallback.operationComplete(this); } } }
单向请求
单向请求不需要同步等待,也不需要回调,其他与异步请求调用类似。