netty的主从多线程模型,主线程(线程池中就一个线程)主要负责接收客户端的链接,然后转发给从线程(8个容量的线程池)进行实际的业务处理
MainReactor = NioServerBoss
SubReactor = NioWorker
从图上看NioWorker 和 NioServer都继承AbstractNioSelector,主要的业务逻辑都会在这里进行处理
在初始化的时候,会把主从的线程池进行初始化,最后都会执行得到AbstractNioSelector的run方法:
主从 区别在于processTaskQueue和process的实现一个是boss,一个是worker
worker的processTaskQueue,调用nioworker里面的Register类的run方法,主要几件事:设置channel非阻塞,注册selector,触发链接客户端(pipleline的上行链接事件)
public void run() { SocketAddress localAddress = channel.getLocalAddress(); SocketAddress remoteAddress = channel.getRemoteAddress(); if (localAddress == null || remoteAddress == null) { if (future != null) { future.setFailure(new ClosedChannelException()); } close(channel, succeededFuture(channel)); return; } try { if (server) { channel.channel.configureBlocking(false); } channel.channel.register( selector, channel.getInternalInterestOps(), channel); if (future != null) { channel.setConnected(); future.setSuccess(); } if (server || !((NioClientSocketChannel) channel).boundManually) { fireChannelBound(channel, localAddress); } fireChannelConnected(channel, remoteAddress); } catch (IOException e) { if (future != null) { future.setFailure(e); } close(channel, succeededFuture(channel)); if (!(e instanceof ClosedChannelException)) { throw new ChannelException( "Failed to register a socket to the selector.", e); } } }
process方法是nioworker里的process,开始轮询selector里面已经就绪链接,通过nioworker读取bytebuffer-》转成channelbuffer,并触发消息处理机制(fireMessageReceived)它会调用我们的在pipeline中添加的各种handler处理
protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { if (!read(k)) { // Connection already closed - no need to handle write. continue; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k); } } catch (CancelledKeyException e) { close(k); } if (cleanUpCancelledKeys()) { break; // break the loop to avoid ConcurrentModificationException } } }
protected boolean read(SelectionKey k) { final SocketChannel ch = (SocketChannel) k.channel(); final NioSocketChannel channel = (NioSocketChannel) k.attachment(); final ReceiveBufferSizePredictor predictor = channel.getConfig().getReceiveBufferSizePredictor(); final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); int ret = 0; int readBytes = 0; boolean failure = true; ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder()); try { while ((ret = ch.read(bb)) > 0) { readBytes += ret; if (!bb.hasRemaining()) { break; } } failure = false; } catch (ClosedChannelException e) { // Can happen, and does not need a user attention. } catch (Throwable t) { fireExceptionCaught(channel, t); } if (readBytes > 0) { bb.flip(); final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes); buffer.setBytes(0, bb); buffer.writerIndex(readBytes); // Update the predictor. predictor.previousReceiveBufferSize(readBytes); // Fire the event. fireMessageReceived(channel, buffer); } if (ret < 0 || failure) { k.cancel(); // Some JDK implementations run into an infinite loop without this. close(channel, succeededFuture(channel)); return false; } return true; }
再看一下boss的这边,可以肯定的是,会链接客户端因为这是入口,在一个就是来链接之后如何交给nioworker处理,并且看一下是如何分配的,
processTaskQueue的方法是由nioserverboss的registerTask类中的run方法,下面会有什么时候把这个task放到队列:
public void run() { boolean bound = false; boolean registered = false; try {
//绑定地址端口 channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); bound = true; future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress());
//注册accept事件
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel); registered = true; } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } finally { if (!registered && bound) { close(channel, future); } } }
nioserverboss的process方法
protected void process(Selector selector) { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; }
//轮询触发的链接 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment(); try { // accept connections in a for loop until no new connection is ready for (;;) { SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket == null) { break; } registerAcceptedChannel(channel, acceptedSocket, thread); } } catch (CancelledKeyException e) { // Raised by accept() when the server socket was closed. k.cancel(); channel.close(); } catch (SocketTimeoutException e) { // Thrown every second to get ClosedChannelException // raised. } catch (ClosedChannelException e) { // Closed as requested. } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn( "Failed to accept a connection.", t); } try { Thread.sleep(1000); } catch (InterruptedException e1) { // Ignore } } } }
private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
Thread currentThread) {
try {
ChannelSink sink = parent.getPipeline().getSink();
ChannelPipeline pipeline =
parent.getConfig().getPipelineFactory().getPipeline();
//轮询每个nioworker,workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]
NioWorker worker = parent.workerPool.nextWorker();
//这里把客户端的socket绑定到了nioworker上,
worker.register(new NioAcceptedSocketChannel(
parent.getFactory(), pipeline, parent, sink
, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to initialize an accepted socket.", e);
}
try {
acceptedSocket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
}
abstractNioselector的register
public void register(Channel channel, ChannelFuture future) {
//这里的register又会走一遍nioserverboss的registerTask类中的run方法
Runnable task = createRegisterTask(channel, future);
registerTask(task);
}
protected final void registerTask(Runnable task) {
//添加任务到队列
taskQueue.add(task);
Selector selector = this.selector;
if (selector != null) {
//cas操作,设置唤醒selector
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
} else {
if (taskQueue.remove(task)) {
// the selector was null this means the Worker has already been shutdown.
throw new RejectedExecutionException("Worker has already been shutdown");
}
}
}
再看一下NioAcceptedSocketChannel
final class NioAcceptedSocketChannel extends NioSocketChannel {
final Thread bossThread;
NioAcceptedSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline,
Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker, Thread bossThread) {
super(parent, factory, pipeline, sink, socket, worker);
this.bossThread = bossThread;
setConnected();
//产生链接事件,把链接交给nioworker
fireChannelOpen(this);
}
}
这里的register又会走一遍nioserverboss的registerTask类中的run方法