[netty4][netty-transport]netty之nio传输层
nio基本处理逻辑
Selector的处理
Selector实例构建
NioEventLoop.openSelector()方法先用JDK API构建出来的Selector再用反射将其中的selectedKeys、publicSelectedKeys替换成他优化过的SelectedSelectionKeySet实例。
JDK API构建出来的Selector 代码:
// NioEventLoop
private final SelectorProvider provider; // SelectorProvider.provider() 直接用JDK提供的prodvider prodiver一下,在mac上返回值是sun.nio.ch.KQueueSelectorProvider@1b26f7b2 对应的selector实现是sun.nio.ch.KQueueSelectorImpl@23fe1d71
unwrappedSelector = provider.openSelector();
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
unwrappedSelector是指我们通常用JDK的API构建出来的Selector,实际netty使用的是经过他自己优化过的SelectorTuple
。下面有分析SelectorTuple
。
用反射方法替换:
io.netty.channel.nio.NioEventLoop.openSelector() 用法反射的方式将netty声明的SelectedSelectionKeySet实例selectedKeySet (netty的SelectedSelectionKeySet类继承了java.util.AbstractSet)赋值给了selectorImplClass。因为SelectedSelectionKeySet更快,参见下面SelectorTuple分析部分。
selectorImplClass是什么呢?就是各平台的Selector的实现,netty是怎么获取的呢?
直接加载"sun.nio.ch.SelectorImpl"即可。
而且用这种方式替换完之后,每次select之后,只要处理(跌代理里面的key)刚才这个SelectedSelectionKeySet实例selectedKeys(selectedKeySet会赋值给selectedKeys),这个实例也是NioEventLoop的实例变量。简单地说,每次select之后,选中的有事件的key就已经在NioEventLoop
的实例变量selectedKeys
字段中了。
SelectorTuple分析
SelectorTuple依靠SelectedSelectionKeySetSelector
和SelectedSelectionKeySet
完成优化,SelectedSelectionKeySet
内部采用了数组来替换了JDK实现中的hashset来维护SelectionKey,提升了add reset 与迭代的效率。真是佩服!
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
// ......
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
}
// ...
final class SelectedSelectionKeySetSelector extends Selector {
private final SelectedSelectionKeySet selectionKeys;
private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
}
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
ServerSocketChannel创建与初始化
ServerSocketChannel创建过程
bind的时候会创建ServerSocketChannel,并丢给NioServerSocketChannel实例的ch字段
调用栈:
Thread [main] (Suspended (modification of field ch in AbstractNioChannel))
io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioChannel).<init>(io.netty.channel.Channel, java.nio.channels.SelectableChannel, int) line: 85
io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioMessageChannel).<init>(io.netty.channel.Channel, java.nio.channels.SelectableChannel, int) line: 42
io.netty.channel.socket.nio.NioServerSocketChannel.<init>(java.nio.channels.ServerSocketChannel) line: 88
io.netty.channel.socket.nio.NioServerSocketChannel.<init>() line: 74
sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor<?>, java.lang.Object[]) line: not available [native method]
sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) line: 62
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) line: 45
java.lang.reflect.Constructor<T>.newInstance(java.lang.Object...) line: 423
io.netty.channel.ReflectiveChannelFactory<T>.newChannel() line: 44
io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).initAndRegister() line: 320
io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).doBind(java.net.SocketAddress) line: 282
io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).bind(java.net.SocketAddress) line: 278
org.restexpress.RestExpress.bind(java.net.InetSocketAddress) line: 709
org.restexpress.RestExpress.bind(java.lang.String, int) line: 686
com.code260.ss.resetexpress.RestExpressApp1.main(java.lang.String[]) line: 26
bind的处理
AbstractBootstrap doBind逻辑
NioServerSocketChannel类继承体系:
// AttributeMap体系
251 AttributeMap
--251.2 DefaultAttributeMap
----251.2.1 AbstractChannel
------251.2.1.7 AbstractNioChannel
--------251.2.1.7.1 AbstractNioMessageChannel
----------251.2.1.7.1.5 NioServerSocketChannel
// Channel体系
--463.1 Channel
----463.1.2 AbstractChannel // 与pipeline绑定。 构建时就会创建pipeline,newChannelPipeline。
------463.1.2.7 AbstractNioChannel // 支持多种不同实现的SelectableChannel,ServerSocketChannel与SocketChannel都是SelectableChannel的实现。支持构建AbstractNioChannel时指定感兴趣的事件(readInterestOp)并与该Channel实例绑定(会赋值给实例字段readInterestOp),且在该类构造方法中将channel设置为非阻塞。可见,其意在Select的Channel与感兴趣的事件的封装
--------463.1.2.7.1 AbstractNioMessageChannel // 意在定义读写Message(其实就是消息对象,一个Object或者一组Object)接口。并对doWrite做了一些实现。
----------463.1.2.7.1.5 NioServerSocketChannel
- initAndRegister并返回ChannelFuture实例,具体逻辑参见下面小节
- doBind0 交由boss线程去做真正bind动作
io.netty.bootstrap.AbstractBootstrap.initAndRegister()逻辑
- 创建Channel,即ServerSocketChannel实例,并将值包装到netty自己封装的NioServerSocketChannel中。channel = channelFactory.newChannel();
- 初始化channel。init(channel);根据io.netty.bootstrap.AbstractBootstrap.options给Channel配置一些参数。server端chanel的参数有{SO_BACKLOG=1024, SO_REUSEADDR=true, CONNECT_TIMEOUT_MILLIS=10000, SO_RCVBUF=262140, RCVBUF_ALLOCATOR=io.netty.channel.AdaptiveRecvByteBufAllocator@17cdf2d0}。并在Channel对应的pipeline上绑定ChannelInitializer。
- 注册Channel,先 选出一个Executor即NioEventLoop , 选的方法逻辑在io.netty.util.concurrent.DefaultEventExecutorChooserFactory$PowerOfTwoEventExecutorChooser@15cafec7 中 executors[idx.getAndIncrement() & executors.length - 1]。然后用io.netty.channel.SingleThreadEventLoop.register(Channel)注册Channel到Exceutor(即NioEventLoop)中。注册时限看是否是当前EventLoop(对于单线程执行器来说就是看是否是本线程),如果是 直接注册,如果不是则new一个Runnbale出来把注册逻辑包进去 并交给当前eventLoop执行这个runnable。
注册的细节是: io.netty.channel.AbstractChannel.AbstractUnsafe.register(EventLoop, ChannelPromise)
执行的细节是: io.netty.util.concurrent.SingleThreadEventExecutor.execute(Runnable)
内部实现是将task放到io.netty.util.concurrent.SingleThreadEventExecutor.taskQueue这个队列中,这个队列目前实现是io.netty.util.internal.shaded.org.jctools.queues.MpscUnboundedArrayQueue。并在此时判断是否是本线程,如果不实现本线程就把对应的线程给启动了。
io.netty.channel.AbstractChannel.AbstractUnsafe.register0(ChannelPromise) 这是 register的后处理 :
- 先设置ChannelPromise为成功
- 重要的事情之一: 就是会触发ChannelInboundHandler的channelRegistered事件,这是业务侧可以定制的,执行是在boss线程做的
- 绑定成功了 然后 触发active事件,是否绑定成功的判断方式是:
public boolean isActive() {
return javaChannel().socket().isBound();
}
其实就是靠JDK的API isBound完成
register0的提交方式就是靠下面这种代码完成,netty中有好多这种代码,应该包装一下:
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
上述逻辑因为是异步的,所以可能执行不完,所以接下来要做出处理:
// regFuture就是上面初始化并注册的返回的Future,此处先判断他是否完成
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise); // 真正的做bind,在boss线程。且此处是regFuture已经完成的情况下 那就不要用回调了就直接调用doBind0就行了。
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() { // 给regFuture注册监听器,等他完成后回调这里,妙啊
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise); // 真正的做bind,在boss线程
}
}
});
return promise;
}
io.netty.util.concurrent.SingleThreadEventExecutor.startThread() 启动线程的写法值得学习,确保正好启动一次。
addTaskWakesUp??
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
- 创建PendingRegistrationPromise。 对初始化和注册过的ChannelFuture增加监听器,监听ChannelFutureListener.operationComplete完成后回调。因为上面初始化并注册那事是异步的,可能这时还没完成。
// 3的wakeup和4要看下
以上都没涉及到真正bind端口的地方
真正bind
调用栈:
// Thread [boss-0] (Suspended)
io.netty.channel.socket.nio.NioServerSocketChannel.doBind(java.net.SocketAddress) line: 130
io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 562
io.netty.channel.DefaultChannelPipeline$HeadContext.bind(io.netty.channel.ChannelHandlerContext, java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 1332
io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeBind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 501
io.netty.channel.DefaultChannelPipeline$TailContext(io.netty.channel.AbstractChannelHandlerContext).bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 486
io.netty.channel.DefaultChannelPipeline.bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 984
io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.AbstractChannel).bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 258
io.netty.bootstrap.AbstractBootstrap$2.run() line: 366
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404
io.netty.channel.nio.NioEventLoop.run() line: 495
io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905
java.lang.Thread.run() line: 748
boss线程中NioServerSocketChannel.doBind这才是真正bind端口的地方,调用栈如上,代码如下:
// NioServerSocketChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
select和业务处理过程的抽象与逻辑组织
NioEventLoop主要逻辑
- 根据select策略处理select的事情,可能情况包括:
- 根据ioRatio配比进行selectkey的处理(即IO事件)还是跑所有任务(业务逻辑、自定义事件等)。
关于selectkey的处理:
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
sun.nio.ch.KQueueSelectorImpl@153cf15c
3. 处理异常,注意此处抓的是Throwable异常,这样能防止线程跑飞。目前的异常处理是打了警告日志并sleep 1s防止CPU被完全吃掉。
注册OP_ACCEPT事件
真正注册accept事件的地方:
先注册感兴趣的事件为0,注册出key:
Thread [boss-0] (Suspended (breakpoint at line 386 in io.netty.channel.nio.AbstractNioChannel))
io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioChannel).doRegister() line: 386
io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register0(io.netty.channel.ChannelPromise) line: 508
io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(io.netty.channel.AbstractChannel$AbstractUnsafe, io.netty.channel.ChannelPromise) line: 427
io.netty.channel.AbstractChannel$AbstractUnsafe$1.run() line: 486
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404
io.netty.channel.nio.NioEventLoop.run() line: 495
io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905
java.lang.Thread.run() line: 748
代码
// io.netty.channel.nio.AbstractNioChannel.doRegister()
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
再在doBeginRead中注册16,真正注册:
boss调用栈:
Thread [boss-0] (Suspended (breakpoint at line 420 in io.netty.channel.nio.AbstractNioChannel))
io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioChannel).doBeginRead() line: 420
io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioMessageChannel).doBeginRead() line: 55
io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).beginRead() line: 851
io.netty.channel.DefaultChannelPipeline$HeadContext.read(io.netty.channel.ChannelHandlerContext) line: 1360
io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693
io.netty.channel.DefaultChannelPipeline$TailContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673
io.netty.channel.DefaultChannelPipeline.read() line: 1015
io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.AbstractChannel).read() line: 288
io.netty.channel.DefaultChannelPipeline$HeadContext.readIfIsAutoRead() line: 1420
io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(io.netty.channel.ChannelHandlerContext) line: 1398
io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeChannelActive() line: 213
io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(io.netty.channel.AbstractChannelHandlerContext) line: 199
io.netty.channel.DefaultChannelPipeline.fireChannelActive() line: 906
io.netty.channel.AbstractChannel$AbstractUnsafe$2.run() line: 573
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404
io.netty.channel.nio.NioEventLoop.run() line: 495
io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905
java.lang.Thread.run() line: 748
代码:
// io.netty.channel.nio.AbstractNioChannel.doBeginRead()
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp); // accept的readInterestOp是16
}
}
select出OP_ACCEPT的处理与SocketChannel的创建
也就是在run方法中select的逻辑:
在策略判断时如果有任务的情况下会做io.netty.channel.nio.NioEventLoop.selectNowSupplier的get方法调用,这个方法中会selectNow:
在策略判断时如果没有任务的话,策略判断则会返回-1即SelectStrategy.SELECT,并执行带超时时间参数的select:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
此时会走下面的调用栈:
SelectedSelectionKeySetSelector.select(long) line: 62
NioEventLoop.select(boolean) line: 786
NioEventLoop.run() line: 434
SingleThreadEventExecutor$5.run() line: 905
Thread.run() line: 748
关于io.netty.channel.SelectStrategy.calculateStrategy(IntSupplier, boolean)策略的返回值整理情况如下:
① -2,跳过本次处理;目前还未构造出这种场景。
② -3,忙等也是跳过;目前还未构造出这种场景。
③ -1,没有任务时需要select,并判断是否要wakeup;没有任务时就是这种这种场景。
④ >=0的其他值,也就是default,啥都不干往下走,走处理selectkey(即IO事件)或者所有任务(业务逻辑等)。
io.netty.channel.DefaultSelectStrategy.calculateStrategy(IntSupplier, boolean) 计算逻辑分析:
如果没有任务了 就返回 SelectStrategy.SELECT
如果有任务 就io.netty.channel.nio.NioEventLoop.new IntSupplier() {...}.() selectNow一次,注意这个selectNow
是NioEventLoop的selectNow方法,但是最终还会对应到JDK的Selector的selectNow上去。
io.netty.channel.nio.NioEventLoop.select(boolean)分析
-
select时超时时间怎么定
这个有超时时间的select,超时时间这个值是怎么确定的?
deadline减去当前时间(这两个都是用纳秒计量)后四舍五入到毫秒粒度
deadline又是怎么计算的?
默认值是1s,如果scheduledTaskQueue中有调度任务,以优先级队列顶部的scheduledTask的调度时间结合当前时间算出deadline。 -
该select(boolean)的逻辑和避免epoll select空转bug的规避
整体是一个spin循环包起来的逻辑
如果deadline已到,则selector.selectNow();并退出spin循环。
如果有任务且wakeup是false(是false要将其设置成true),则selector.selectNow();并退出spin循环。
上面两种情况都没命中则selector.select(timeoutMillis);并计数。如果selected到key了或者oldWakenUp为true或者wakenUp字段为true或者有任务或者有调度任务都退出spin循环。
如果在这个spin循环中seletc次数大于SELECTOR_AUTO_REBUILD_THRESHOLD(默认值512),则重建这个方法内局部变量Selector实例,并将之前selector上的事件注册到这个新的上面来。这个处理就是为了规避linux上epoll的bug,epoll可能select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%。参见这里
相关代码:
io.netty.channel.nio.NioEventLoop.rebuildSelector0()
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
Selector到key之后的处理
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
processSelectedKeys
调用栈 :
NioServerSocketChannel.doReadMessages(List<Object>) line: 143
AbstractNioMessageChannel$NioMessageUnsafe.read() line: 75
NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677
NioEventLoop.processSelectedKeysOptimized() line: 612
NioEventLoop.processSelectedKeys() line: 529
NioEventLoop.run() line: 491
SingleThreadEventExecutor$5.run() line: 905
Thread.run() line: 748
涉及的部分代码:
有accept出SocketChannel的关键部分
// NioEventLoop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read(); // unsafe是io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe
}
// NioServerSocketChannel
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
注册OP_READ事件
woker调用栈:
注册出key:
Thread [worker-0] (Suspended (breakpoint at line 386 in io.netty.channel.nio.AbstractNioChannel))
io.netty.channel.socket.nio.NioSocketChannel(io.netty.channel.nio.AbstractNioChannel).doRegister() line: 386
io.netty.channel.socket.nio.NioSocketChannel$NioSocketChannelUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register0(io.netty.channel.ChannelPromise) line: 508
io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(io.netty.channel.AbstractChannel$AbstractUnsafe, io.netty.channel.ChannelPromise) line: 427
io.netty.channel.AbstractChannel$AbstractUnsafe$1.run() line: 486
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404
io.netty.channel.nio.NioEventLoop.run() line: 495
io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905
java.lang.Thread.run() line: 748
真正注册:
Thread [worker-0] (Suspended (breakpoint at line 420 in io.netty.channel.nio.AbstractNioChannel))
io.netty.channel.socket.nio.NioSocketChannel(io.netty.channel.nio.AbstractNioChannel).doBeginRead() line: 420
io.netty.channel.socket.nio.NioSocketChannel$NioSocketChannelUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).beginRead() line: 851
io.netty.channel.DefaultChannelPipeline$HeadContext.read(io.netty.channel.ChannelHandlerContext) line: 1360
io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693
io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673
io.netty.handler.timeout.ReadTimeoutHandler(io.netty.channel.ChannelDuplexHandler).read(io.netty.channel.ChannelHandlerContext) line: 95
io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693
io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673
io.netty.handler.codec.http.HttpResponseEncoder(io.netty.channel.ChannelOutboundHandlerAdapter).read(io.netty.channel.ChannelHandlerContext) line: 93
io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693
io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673
io.netty.handler.stream.ChunkedWriteHandler(io.netty.channel.ChannelDuplexHandler).read(io.netty.channel.ChannelHandlerContext) line: 95
io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693
io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673
io.netty.handler.codec.http.HttpContentCompressor(io.netty.channel.ChannelDuplexHandler).read(io.netty.channel.ChannelHandlerContext) line: 95
io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693
io.netty.channel.DefaultChannelPipeline$TailContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673
io.netty.channel.DefaultChannelPipeline.read() line: 1015
io.netty.channel.socket.nio.NioSocketChannel(io.netty.channel.AbstractChannel).read() line: 288
io.netty.channel.DefaultChannelPipeline$HeadContext.readIfIsAutoRead() line: 1420
io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(io.netty.channel.ChannelHandlerContext) line: 1398
io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeChannelActive() line: 213
io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(io.netty.channel.AbstractChannelHandlerContext) line: 199
io.netty.channel.DefaultChannelPipeline.fireChannelActive() line: 906
io.netty.channel.socket.nio.NioSocketChannel$NioSocketChannelUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register0(io.netty.channel.ChannelPromise) line: 522
io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(io.netty.channel.AbstractChannel$AbstractUnsafe, io.netty.channel.ChannelPromise) line: 427
io.netty.channel.AbstractChannel$AbstractUnsafe$1.run() line: 486
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404
io.netty.channel.nio.NioEventLoop.run() line: 495
io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905
java.lang.Thread.run() line: 748
select出OP_READ的处理与buf的读取
读取数据线程栈:
Thread [worker-2] (Suspended (breakpoint at line 57 in HttpContentDecoder))
HttpContentDecompressor(HttpContentDecoder).decode(ChannelHandlerContext, HttpObject, List<Object>) line: 57
HttpContentDecompressor(HttpContentDecoder).decode(ChannelHandlerContext, Object, List) line: 47
HttpContentDecompressor(MessageToMessageDecoder<I>).channelRead(ChannelHandlerContext, Object) line: 88
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340
ByteToMessageDecoder.fireChannelRead(ChannelHandlerContext, CodecOutputList, int) line: 323
ByteToMessageDecoder.fireChannelRead(ChannelHandlerContext, List<Object>, int) line: 310
HttpRequestDecoder(ByteToMessageDecoder).callDecode(ChannelHandlerContext, ByteBuf, List<Object>) line: 426
HttpRequestDecoder(ByteToMessageDecoder).channelRead(ChannelHandlerContext, Object) line: 278
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340
PipelineInitializer$ChannelActiveTester(ChannelInboundHandlerAdapter).channelRead(ChannelHandlerContext, Object) line: 86
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340
ReadTimeoutHandler(IdleStateHandler).channelRead(ChannelHandlerContext, Object) line: 286
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340
DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) line: 1408
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362
AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348
DefaultChannelPipeline.fireChannelRead(Object) line: 930
NioSocketChannel$NioSocketChannelUnsafe(AbstractNioByteChannel$NioByteUnsafe).read() line: 163
NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677
NioEventLoop.processSelectedKeysOptimized() line: 612
NioEventLoop.processSelectedKeys() line: 529
NioEventLoop.run() line: 491
SingleThreadEventExecutor$5.run() line: 905
Thread.run() line: 748
在processSelectedKey之后会用 NioSocketChannelUnsafe将数据读到Buf中。再交给Pipeline触发后面的每一个handler。
此处要注意下读的时候 selectkey的attachment中放的是啥:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment(); // 附件中放的是AbstractNioChannel AbstractNioChannel中有unsafe对象,unsafe中能关联读取用的buf。
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
// ...
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // AbstractNioChannel中有unsafe对象
// ...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
写出数据的处理
写数据的处理可以在SocketChannelImpl [entry] - write(ByteBuffer) 处打方法断点观察。
调用堆栈如下:
Thread [worker-3] (Suspended (entry into method write in SocketChannelImpl))
SocketChannelImpl.write(ByteBuffer) line: 458
NioSocketChannel.doWrite(ChannelOutboundBuffer) line: 405
NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush0() line: 938
NioSocketChannel$NioSocketChannelUnsafe(AbstractNioChannel$AbstractNioUnsafe).flush0() line: 360
NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush() line: 905
DefaultChannelPipeline$HeadContext.flush(ChannelHandlerContext) line: 1370
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeFlush0() line: 776
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeFlush() line: 768
DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749
ReadTimeoutHandler(ChannelDuplexHandler).flush(ChannelHandlerContext) line: 117
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush0() line: 776
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush() line: 768
DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749
HttpResponseEncoder(ChannelOutboundHandlerAdapter).flush(ChannelHandlerContext) line: 115
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush0() line: 776
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush() line: 768
DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749
ChunkedWriteHandler.doFlush(ChannelHandlerContext) line: 335
ChunkedWriteHandler.channelWritabilityChanged(ChannelHandlerContext) line: 148
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
HttpContentDecompressor(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
HttpRequestDecoder(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
PipelineInitializer$ChannelActiveTester(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
ReadTimeoutHandler(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
DefaultChannelPipeline$HeadContext.channelWritabilityChanged(ChannelHandlerContext) line: 1431
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelPipeline.fireChannelWritabilityChanged() line: 942
ChannelOutboundBuffer$2.run() line: 608
AbstractEventExecutor.safeExecute(Runnable) line: 163
NioEventLoop(SingleThreadEventExecutor).runAllTasks(long) line: 404
NioEventLoop.run() line: 495
SingleThreadEventExecutor$5.run() line: 905
Thread.run() line: 748
核心逻辑在:
io.netty.channel.socket.nio.NioSocketChannel.doWrite(ChannelOutboundBuffer)
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
writeSpinCount默认是16,也就说说写出线程会尝试写16次,如果ch.write写不出去了再调用incompleteWrite,incompleteWrite里会注册OP_WRITE事件,此处是个优化,不是一上来就注册事件,是先尝试写,写不了再注册。
注册OP_WRITE事件
需要构造较大的响应包,在本地才能观察到。 我构造了将近200万字符,才达能目的。
注册的地方打断点在 SelectionKeyImpl [entry] - interestOps(int) 遍可以观察
堆栈如下:
Thread [worker-3] (Suspended (entry into method interestOps in SelectionKeyImpl))
SelectionKeyImpl.interestOps(int) line: 82
NioSocketChannel(AbstractNioByteChannel).setOpWrite() line: 332
NioSocketChannel(AbstractNioByteChannel).incompleteWrite(boolean) line: 289
NioSocketChannel.doWrite(ChannelOutboundBuffer) line: 407
NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush0() line: 938
NioSocketChannel$NioSocketChannelUnsafe(AbstractNioChannel$AbstractNioUnsafe).flush0() line: 360
NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush() line: 905
DefaultChannelPipeline$HeadContext.flush(ChannelHandlerContext) line: 1370
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeFlush0() line: 776
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeFlush() line: 768
DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749
ReadTimeoutHandler(ChannelDuplexHandler).flush(ChannelHandlerContext) line: 117
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush0() line: 776
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush() line: 768
DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749
HttpResponseEncoder(ChannelOutboundHandlerAdapter).flush(ChannelHandlerContext) line: 115
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush0() line: 776
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush() line: 768
DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749
ChunkedWriteHandler.doFlush(ChannelHandlerContext) line: 335
ChunkedWriteHandler.channelWritabilityChanged(ChannelHandlerContext) line: 148
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
HttpContentDecompressor(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
HttpRequestDecoder(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
PipelineInitializer$ChannelActiveTester(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
ReadTimeoutHandler(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119
DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409
DefaultChannelPipeline$HeadContext.channelWritabilityChanged(ChannelHandlerContext) line: 1431
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416
DefaultChannelPipeline.fireChannelWritabilityChanged() line: 942
ChannelOutboundBuffer$2.run() line: 608
AbstractEventExecutor.safeExecute(Runnable) line: 163
NioEventLoop(SingleThreadEventExecutor).runAllTasks(long) line: 404
NioEventLoop.run() line: 495
SingleThreadEventExecutor$5.run() line: 905
Thread.run() line: 748
netty里核心代码在:
// io.netty.channel.nio.AbstractNioByteChannel.setOpWrite()
protected final void setOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
select出OP_WRITE的处理与buf的写出
注册完之后再select到的写逻辑复用io.netty.channel.socket.nio.NioSocketChannel.doWrite(ChannelOutboundBuffer)
调用栈如下:
Thread [worker-3] (Suspended (entry into method write in SocketChannelImpl))
SocketChannelImpl.write(ByteBuffer) line: 458
NioSocketChannel.doWrite(ChannelOutboundBuffer) line: 405
NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush0() line: 938
NioSocketChannel$NioSocketChannelUnsafe(AbstractNioChannel$AbstractNioUnsafe).forceFlush() line: 367
NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 671
NioEventLoop.processSelectedKeysOptimized() line: 612
NioEventLoop.processSelectedKeys() line: 529
NioEventLoop.run() line: 491
SingleThreadEventExecutor$5.run() line: 905
Thread.run() line: 748
main线程与boss线程切换,boss线程与worker线程切换
boss与worker的NioEventLoop的spin run是什么时候启动触发的
main启动boss
Thread [main] (Suspended (breakpoint at line 707 in java.lang.Thread))
owns: java.lang.Thread (id=80)
java.lang.Thread.start() line: 707
io.netty.util.concurrent.ThreadPerTaskExecutor.execute(java.lang.Runnable) line: 33
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).doStartThread() line: 894
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).startThread() line: 865
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).execute(java.lang.Runnable) line: 758
io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register(io.netty.channel.EventLoop, io.netty.channel.ChannelPromise) line: 483
io.netty.channel.nio.NioEventLoop(io.netty.channel.SingleThreadEventLoop).register(io.netty.channel.ChannelPromise) line: 80
io.netty.channel.nio.NioEventLoop(io.netty.channel.SingleThreadEventLoop).register(io.netty.channel.Channel) line: 74
io.netty.channel.nio.NioEventLoopGroup(io.netty.channel.MultithreadEventLoopGroup).register(io.netty.channel.Channel) line: 86
io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).initAndRegister() line: 333
io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).doBind(java.net.SocketAddress) line: 282
io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).bind(java.net.SocketAddress) line: 278
org.restexpress.RestExpress.bind(java.net.InetSocketAddress) line: 709
org.restexpress.RestExpress.bind(java.lang.String, int) line: 686
com.code260.ss.resetexpress.RestExpressApp1.main(java.lang.String[]) line: 26
boss启动worker
Thread [boss-0] (Suspended (breakpoint at line 707 in java.lang.Thread))
owns: java.lang.Thread (id=102)
java.lang.Thread.start() line: 707
io.netty.util.concurrent.ThreadPerTaskExecutor.execute(java.lang.Runnable) line: 33
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).doStartThread() line: 894
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).startThread() line: 865
io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).execute(java.lang.Runnable) line: 758
io.netty.channel.socket.nio.NioSocketChannel$NioSocketChannelUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register(io.netty.channel.EventLoop, io.netty.channel.ChannelPromise) line: 483
io.netty.channel.nio.NioEventLoop(io.netty.channel.SingleThreadEventLoop).register(io.netty.channel.ChannelPromise) line: 80
io.netty.channel.nio.NioEventLoop(io.netty.channel.SingleThreadEventLoop).register(io.netty.channel.Channel) line: 74
io.netty.channel.nio.NioEventLoopGroup(io.netty.channel.MultithreadEventLoopGroup).register(io.netty.channel.Channel) line: 86
io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor.channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) line: 255
io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeChannelRead(java.lang.Object) line: 362
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object) line: 348
io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).fireChannelRead(java.lang.Object) line: 340
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) line: 1408
io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeChannelRead(java.lang.Object) line: 362
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object) line: 348
io.netty.channel.DefaultChannelPipeline.fireChannelRead(java.lang.Object) line: 930
io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe.read() line: 93
io.netty.channel.nio.NioEventLoop.processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) line: 677
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized() line: 612
io.netty.channel.nio.NioEventLoop.processSelectedKeys() line: 529
io.netty.channel.nio.NioEventLoop.run() line: 491
io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905
java.lang.Thread.run() line: 748
selector与线程绑定的问题
一个selector对应一个channel,一个selector又对应固定的一个线程,所以一个channel上的数据多次读取不会在不同线程漂移。
当然,一个线程可以对应多个channel,但因为一个selector又对应固定的一个线程,所以这些channel用的同一个selector。
多boss问题也即多accept问题
什么时候用多accept?
一般是用在多个端口bind,或者一个端口不同ip网卡平面bind时,多accept(即boss)线程才有意义。否则一个port通常只能bind出一个ServerSocketChannel出来,一个channel又对应一个Selector,一个Selector又对应一个线程在spin的方式去select,所以多个线程对一个bind也没用。如果把一个selector对应到多个线程去用问题在于,selector的方法线程不安全,对应到多个线程会有问题。
当然后来的JDK版本,包括linux开始支持一个port+ip可以被多个进程多次绑定,这样多accept就有意义了。参见 《NIO trick and trap》这个ppt的 题外:SO_REUSEPORT 关键字。 应用场景是 “适合大量短连接的web server”