zoukankan      html  css  js  c++  java
  • [netty4][netty-transport]netty之nio传输层

    [netty4][netty-transport]netty之nio传输层

    nio基本处理逻辑

    查看这里
    nio基本处理逻辑

    Selector的处理

    Selector实例构建

    NioEventLoop.openSelector()方法先用JDK API构建出来的Selector再用反射将其中的selectedKeys、publicSelectedKeys替换成他优化过的SelectedSelectionKeySet实例。
    JDK API构建出来的Selector 代码:

    // NioEventLoop
    private final SelectorProvider provider; // SelectorProvider.provider() 直接用JDK提供的prodvider prodiver一下,在mac上返回值是sun.nio.ch.KQueueSelectorProvider@1b26f7b2 对应的selector实现是sun.nio.ch.KQueueSelectorImpl@23fe1d71
    unwrappedSelector = provider.openSelector();
    return new SelectorTuple(unwrappedSelector,
                                     new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    

    unwrappedSelector是指我们通常用JDK的API构建出来的Selector,实际netty使用的是经过他自己优化过的SelectorTuple。下面有分析SelectorTuple

    用反射方法替换:
    io.netty.channel.nio.NioEventLoop.openSelector() 用法反射的方式将netty声明的SelectedSelectionKeySet实例selectedKeySet (netty的SelectedSelectionKeySet类继承了java.util.AbstractSet)赋值给了selectorImplClass。因为SelectedSelectionKeySet更快,参见下面SelectorTuple分析部分。
    selectorImplClass是什么呢?就是各平台的Selector的实现,netty是怎么获取的呢?
    直接加载"sun.nio.ch.SelectorImpl"即可。
    而且用这种方式替换完之后,每次select之后,只要处理(跌代理里面的key)刚才这个SelectedSelectionKeySet实例selectedKeys(selectedKeySet会赋值给selectedKeys),这个实例也是NioEventLoop的实例变量。简单地说,每次select之后,选中的有事件的key就已经在NioEventLoop的实例变量selectedKeys字段中了。

    SelectorTuple分析

    SelectorTuple依靠SelectedSelectionKeySetSelectorSelectedSelectionKeySet完成优化,SelectedSelectionKeySet内部采用了数组来替换了JDK实现中的hashset来维护SelectionKey,提升了add reset 与迭代的效率。真是佩服

    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    
        SelectionKey[] keys;
        int size;
    // ......
    
        @Override
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
    
            keys[size++] = o;
            if (size == keys.length) {
                increaseCapacity();
            }
    
            return true;
        }
    
        void reset(int start) {
            Arrays.fill(keys, start, size, null);
            size = 0;
        }
    }
    // ...
    final class SelectedSelectionKeySetSelector extends Selector {
        private final SelectedSelectionKeySet selectionKeys;
        private final Selector delegate;
    
        SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
            this.delegate = delegate;
            this.selectionKeys = selectionKeys;
        }
    }
    
    private static final class SelectorTuple {
        final Selector unwrappedSelector;
        final Selector selector;
    
        SelectorTuple(Selector unwrappedSelector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = unwrappedSelector;
        }
    
        SelectorTuple(Selector unwrappedSelector, Selector selector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = selector;
        }
    }
    

    ServerSocketChannel创建与初始化

    ServerSocketChannel创建过程

    bind的时候会创建ServerSocketChannel,并丢给NioServerSocketChannel实例的ch字段
    调用栈:

    Thread [main] (Suspended (modification of field ch in AbstractNioChannel))	
    io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioChannel).<init>(io.netty.channel.Channel, java.nio.channels.SelectableChannel, int) line: 85	
    io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioMessageChannel).<init>(io.netty.channel.Channel, java.nio.channels.SelectableChannel, int) line: 42	
    io.netty.channel.socket.nio.NioServerSocketChannel.<init>(java.nio.channels.ServerSocketChannel) line: 88	
    io.netty.channel.socket.nio.NioServerSocketChannel.<init>() line: 74	
    sun.reflect.NativeConstructorAccessorImpl.newInstance0(java.lang.reflect.Constructor<?>, java.lang.Object[]) line: not available [native method]	
    sun.reflect.NativeConstructorAccessorImpl.newInstance(java.lang.Object[]) line: 62	
    sun.reflect.DelegatingConstructorAccessorImpl.newInstance(java.lang.Object[]) line: 45	
    java.lang.reflect.Constructor<T>.newInstance(java.lang.Object...) line: 423	
    io.netty.channel.ReflectiveChannelFactory<T>.newChannel() line: 44	
    io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).initAndRegister() line: 320	
    io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).doBind(java.net.SocketAddress) line: 282	
    io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).bind(java.net.SocketAddress) line: 278	
    org.restexpress.RestExpress.bind(java.net.InetSocketAddress) line: 709	
    org.restexpress.RestExpress.bind(java.lang.String, int) line: 686	
    com.code260.ss.resetexpress.RestExpressApp1.main(java.lang.String[]) line: 26	
    

    bind的处理

    AbstractBootstrap doBind逻辑

    NioServerSocketChannel类继承体系:

    // AttributeMap体系
    251 AttributeMap
    --251.2 DefaultAttributeMap
    ----251.2.1 AbstractChannel
    ------251.2.1.7 AbstractNioChannel
    --------251.2.1.7.1 AbstractNioMessageChannel
    ----------251.2.1.7.1.5 NioServerSocketChannel
    
    // Channel体系
    --463.1 Channel
    ----463.1.2 AbstractChannel  // 与pipeline绑定。 构建时就会创建pipeline,newChannelPipeline。
    ------463.1.2.7 AbstractNioChannel  // 支持多种不同实现的SelectableChannel,ServerSocketChannel与SocketChannel都是SelectableChannel的实现。支持构建AbstractNioChannel时指定感兴趣的事件(readInterestOp)并与该Channel实例绑定(会赋值给实例字段readInterestOp),且在该类构造方法中将channel设置为非阻塞。可见,其意在Select的Channel与感兴趣的事件的封装  
    --------463.1.2.7.1 AbstractNioMessageChannel // 意在定义读写Message(其实就是消息对象,一个Object或者一组Object)接口。并对doWrite做了一些实现。
    ----------463.1.2.7.1.5 NioServerSocketChannel
    
    1. initAndRegister并返回ChannelFuture实例,具体逻辑参见下面小节
    2. doBind0 交由boss线程去做真正bind动作

    io.netty.bootstrap.AbstractBootstrap.initAndRegister()逻辑

    1. 创建Channel,即ServerSocketChannel实例,并将值包装到netty自己封装的NioServerSocketChannel中。channel = channelFactory.newChannel();
    2. 初始化channel。init(channel);根据io.netty.bootstrap.AbstractBootstrap.options给Channel配置一些参数。server端chanel的参数有{SO_BACKLOG=1024, SO_REUSEADDR=true, CONNECT_TIMEOUT_MILLIS=10000, SO_RCVBUF=262140, RCVBUF_ALLOCATOR=io.netty.channel.AdaptiveRecvByteBufAllocator@17cdf2d0}。并在Channel对应的pipeline上绑定ChannelInitializer
    3. 注册Channel,先 选出一个Executor即NioEventLoop , 选的方法逻辑在io.netty.util.concurrent.DefaultEventExecutorChooserFactory$PowerOfTwoEventExecutorChooser@15cafec7 中 executors[idx.getAndIncrement() & executors.length - 1]。然后用io.netty.channel.SingleThreadEventLoop.register(Channel)注册Channel到Exceutor(即NioEventLoop)中。注册时限看是否是当前EventLoop(对于单线程执行器来说就是看是否是本线程),如果是 直接注册,如果不是则new一个Runnbale出来把注册逻辑包进去 并交给当前eventLoop执行这个runnable。
      注册的细节是: io.netty.channel.AbstractChannel.AbstractUnsafe.register(EventLoop, ChannelPromise)
      执行的细节是: io.netty.util.concurrent.SingleThreadEventExecutor.execute(Runnable)
      内部实现是将task放到io.netty.util.concurrent.SingleThreadEventExecutor.taskQueue这个队列中,这个队列目前实现是io.netty.util.internal.shaded.org.jctools.queues.MpscUnboundedArrayQueue。并在此时判断是否是本线程,如果不实现本线程就把对应的线程给启动了。
      io.netty.channel.AbstractChannel.AbstractUnsafe.register0(ChannelPromise) 这是 register的后处理
    • 先设置ChannelPromise为成功
    • 重要的事情之一: 就是会触发ChannelInboundHandler的channelRegistered事件,这是业务侧可以定制的,执行是在boss线程做的
    • 绑定成功了 然后 触发active事件,是否绑定成功的判断方式是:
     public boolean isActive() {
            return javaChannel().socket().isBound();
        }
    

    其实就是靠JDK的API isBound完成
    register0的提交方式就是靠下面这种代码完成,netty中有好多这种代码,应该包装一下:

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    

    上述逻辑因为是异步的,所以可能执行不完,所以接下来要做出处理:

    // regFuture就是上面初始化并注册的返回的Future,此处先判断他是否完成  
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise); // 真正的做bind,在boss线程。且此处是regFuture已经完成的情况下 那就不要用回调了就直接调用doBind0就行了。  
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() { // 给regFuture注册监听器,等他完成后回调这里,妙啊
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
    
                    doBind0(regFuture, channel, localAddress, promise); // 真正的做bind,在boss线程
                }
            }
        });
        return promise;
    }
    

    io.netty.util.concurrent.SingleThreadEventExecutor.startThread() 启动线程的写法值得学习,确保正好启动一次。
    addTaskWakesUp??

     if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
    
    1. 创建PendingRegistrationPromise。 对初始化和注册过的ChannelFuture增加监听器,监听ChannelFutureListener.operationComplete完成后回调。因为上面初始化并注册那事是异步的,可能这时还没完成。
      // 3的wakeup和4要看下
      以上都没涉及到真正bind端口的地方

    真正bind

    调用栈:

    // Thread [boss-0] (Suspended)	
    io.netty.channel.socket.nio.NioServerSocketChannel.doBind(java.net.SocketAddress) line: 130	
    io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 562	
    io.netty.channel.DefaultChannelPipeline$HeadContext.bind(io.netty.channel.ChannelHandlerContext, java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 1332	
    io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeBind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 501	
    io.netty.channel.DefaultChannelPipeline$TailContext(io.netty.channel.AbstractChannelHandlerContext).bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 486	
    io.netty.channel.DefaultChannelPipeline.bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 984	
    io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.AbstractChannel).bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) line: 258	
    io.netty.bootstrap.AbstractBootstrap$2.run() line: 366	
    io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163	
    io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404	
    io.netty.channel.nio.NioEventLoop.run() line: 495	
    io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905	
    java.lang.Thread.run() line: 748	
    

    boss线程中NioServerSocketChannel.doBind这才是真正bind端口的地方,调用栈如上,代码如下:

    // NioServerSocketChannel
        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            if (PlatformDependent.javaVersion() >= 7) {
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }
    

    select和业务处理过程的抽象与逻辑组织

    NioEventLoop主要逻辑

    1. 根据select策略处理select的事情,可能情况包括:
    2. 根据ioRatio配比进行selectkey的处理(即IO事件)还是跑所有任务(业务逻辑、自定义事件等)。
      关于selectkey的处理:
      Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
      Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

    sun.nio.ch.KQueueSelectorImpl@153cf15c
    3. 处理异常,注意此处抓的是Throwable异常,这样能防止线程跑飞。目前的异常处理是打了警告日志并sleep 1s防止CPU被完全吃掉。

    注册OP_ACCEPT事件

    真正注册accept事件的地方:
    先注册感兴趣的事件为0,注册出key:

    Thread [boss-0] (Suspended (breakpoint at line 386 in io.netty.channel.nio.AbstractNioChannel))	
    	io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioChannel).doRegister() line: 386	
    	io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register0(io.netty.channel.ChannelPromise) line: 508	
    	io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(io.netty.channel.AbstractChannel$AbstractUnsafe, io.netty.channel.ChannelPromise) line: 427	
    	io.netty.channel.AbstractChannel$AbstractUnsafe$1.run() line: 486	
    	io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404	
    	io.netty.channel.nio.NioEventLoop.run() line: 495	
    	io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905	
    	java.lang.Thread.run() line: 748	
    	
    

    代码

    // io.netty.channel.nio.AbstractNioChannel.doRegister()
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    

    再在doBeginRead中注册16,真正注册:
    boss调用栈:

    Thread [boss-0] (Suspended (breakpoint at line 420 in io.netty.channel.nio.AbstractNioChannel))	
    	io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioChannel).doBeginRead() line: 420	
    	io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.nio.AbstractNioMessageChannel).doBeginRead() line: 55	
    	io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).beginRead() line: 851	
    	io.netty.channel.DefaultChannelPipeline$HeadContext.read(io.netty.channel.ChannelHandlerContext) line: 1360	
    	io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693	
    	io.netty.channel.DefaultChannelPipeline$TailContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673	
    	io.netty.channel.DefaultChannelPipeline.read() line: 1015	
    	io.netty.channel.socket.nio.NioServerSocketChannel(io.netty.channel.AbstractChannel).read() line: 288	
    	io.netty.channel.DefaultChannelPipeline$HeadContext.readIfIsAutoRead() line: 1420	
    	io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(io.netty.channel.ChannelHandlerContext) line: 1398	
    	io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeChannelActive() line: 213	
    	io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(io.netty.channel.AbstractChannelHandlerContext) line: 199	
    	io.netty.channel.DefaultChannelPipeline.fireChannelActive() line: 906	
    	io.netty.channel.AbstractChannel$AbstractUnsafe$2.run() line: 573	
    	io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404	
    	io.netty.channel.nio.NioEventLoop.run() line: 495	
    	io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905	
    	java.lang.Thread.run() line: 748	
    

    代码:

    // io.netty.channel.nio.AbstractNioChannel.doBeginRead()
    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
    
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp); // accept的readInterestOp是16
        }
    }
    

    select出OP_ACCEPT的处理与SocketChannel的创建

    也就是在run方法中select的逻辑:
    在策略判断时如果有任务的情况下会做io.netty.channel.nio.NioEventLoop.selectNowSupplier的get方法调用,这个方法中会selectNow:
    在策略判断时如果没有任务的话,策略判断则会返回-1即SelectStrategy.SELECT,并执行带超时时间参数的select:

     case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    

    此时会走下面的调用栈:

    SelectedSelectionKeySetSelector.select(long) line: 62	
    NioEventLoop.select(boolean) line: 786	
    NioEventLoop.run() line: 434	
    SingleThreadEventExecutor$5.run() line: 905	
    Thread.run() line: 748	
    

    关于io.netty.channel.SelectStrategy.calculateStrategy(IntSupplier, boolean)策略的返回值整理情况如下:
    ① -2,跳过本次处理;目前还未构造出这种场景。
    ② -3,忙等也是跳过;目前还未构造出这种场景。
    ③ -1,没有任务时需要select,并判断是否要wakeup;没有任务时就是这种这种场景。
    ④ >=0的其他值,也就是default,啥都不干往下走,走处理selectkey(即IO事件)或者所有任务(业务逻辑等)。
    io.netty.channel.DefaultSelectStrategy.calculateStrategy(IntSupplier, boolean) 计算逻辑分析:
    如果没有任务了 就返回 SelectStrategy.SELECT
    如果有任务 就io.netty.channel.nio.NioEventLoop.new IntSupplier() {...}.() selectNow一次,注意这个selectNow是NioEventLoop的selectNow方法,但是最终还会对应到JDK的Selector的selectNow上去。

    io.netty.channel.nio.NioEventLoop.select(boolean)分析

    1. select时超时时间怎么定
      这个有超时时间的select,超时时间这个值是怎么确定的?
      deadline减去当前时间(这两个都是用纳秒计量)后四舍五入到毫秒粒度
      deadline又是怎么计算的?
      默认值是1s,如果scheduledTaskQueue中有调度任务,以优先级队列顶部的scheduledTask的调度时间结合当前时间算出deadline。

    2. 该select(boolean)的逻辑和避免epoll select空转bug的规避
      整体是一个spin循环包起来的逻辑
      如果deadline已到,则selector.selectNow();并退出spin循环。
      如果有任务且wakeup是false(是false要将其设置成true),则selector.selectNow();并退出spin循环。
      上面两种情况都没命中则selector.select(timeoutMillis);并计数。如果selected到key了或者oldWakenUp为true或者wakenUp字段为true或者有任务或者有调度任务都退出spin循环。
      如果在这个spin循环中seletc次数大于SELECTOR_AUTO_REBUILD_THRESHOLD(默认值512),则重建这个方法内局部变量Selector实例,并将之前selector上的事件注册到这个新的上面来。这个处理就是为了规避linux上epoll的bug,epoll可能select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%。参见这里
      相关代码:

    io.netty.channel.nio.NioEventLoop.rebuildSelector0()
    SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
    

    Selector到key之后的处理

    final int ioRatio = this.ioRatio;
    if (ioRatio == 100) {
        try {
            processSelectedKeys();
        } finally {
            // Ensure we always run tasks.
            runAllTasks();
        }
    } else {
        final long ioStartTime = System.nanoTime();
        try {
            processSelectedKeys();
        } finally {
            // Ensure we always run tasks.
            final long ioTime = System.nanoTime() - ioStartTime;
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }
    

    processSelectedKeys

    调用栈 :

    NioServerSocketChannel.doReadMessages(List<Object>) line: 143	
    AbstractNioMessageChannel$NioMessageUnsafe.read() line: 75	
    NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677	
    NioEventLoop.processSelectedKeysOptimized() line: 612	
    NioEventLoop.processSelectedKeys() line: 529	
    NioEventLoop.run() line: 491	
    SingleThreadEventExecutor$5.run() line: 905	
    Thread.run() line: 748	
    

    涉及的部分代码:
    有accept出SocketChannel的关键部分

    // NioEventLoop
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read(); // unsafe是io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe
    }
    //  NioServerSocketChannel
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());
    
        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
    
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
    
        return 0;
    }
    

    注册OP_READ事件

    woker调用栈:
    注册出key:

    Thread [worker-0] (Suspended (breakpoint at line 386 in io.netty.channel.nio.AbstractNioChannel))	
    	io.netty.channel.socket.nio.NioSocketChannel(io.netty.channel.nio.AbstractNioChannel).doRegister() line: 386	
    	io.netty.channel.socket.nio.NioSocketChannel$NioSocketChannelUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register0(io.netty.channel.ChannelPromise) line: 508	
    	io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(io.netty.channel.AbstractChannel$AbstractUnsafe, io.netty.channel.ChannelPromise) line: 427	
    	io.netty.channel.AbstractChannel$AbstractUnsafe$1.run() line: 486	
    	io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404	
    	io.netty.channel.nio.NioEventLoop.run() line: 495	
    	io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905	
    	java.lang.Thread.run() line: 748	
    

    真正注册:

    Thread [worker-0] (Suspended (breakpoint at line 420 in io.netty.channel.nio.AbstractNioChannel))	
    	io.netty.channel.socket.nio.NioSocketChannel(io.netty.channel.nio.AbstractNioChannel).doBeginRead() line: 420	
    	io.netty.channel.socket.nio.NioSocketChannel$NioSocketChannelUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).beginRead() line: 851	
    	io.netty.channel.DefaultChannelPipeline$HeadContext.read(io.netty.channel.ChannelHandlerContext) line: 1360	
    	io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693	
    	io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673	
    	io.netty.handler.timeout.ReadTimeoutHandler(io.netty.channel.ChannelDuplexHandler).read(io.netty.channel.ChannelHandlerContext) line: 95	
    	io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693	
    	io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673	
    	io.netty.handler.codec.http.HttpResponseEncoder(io.netty.channel.ChannelOutboundHandlerAdapter).read(io.netty.channel.ChannelHandlerContext) line: 93	
    	io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693	
    	io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673	
    	io.netty.handler.stream.ChunkedWriteHandler(io.netty.channel.ChannelDuplexHandler).read(io.netty.channel.ChannelHandlerContext) line: 95	
    	io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693	
    	io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673	
    	io.netty.handler.codec.http.HttpContentCompressor(io.netty.channel.ChannelDuplexHandler).read(io.netty.channel.ChannelHandlerContext) line: 95	
    	io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeRead() line: 693	
    	io.netty.channel.DefaultChannelPipeline$TailContext(io.netty.channel.AbstractChannelHandlerContext).read() line: 673	
    	io.netty.channel.DefaultChannelPipeline.read() line: 1015	
    	io.netty.channel.socket.nio.NioSocketChannel(io.netty.channel.AbstractChannel).read() line: 288	
    	io.netty.channel.DefaultChannelPipeline$HeadContext.readIfIsAutoRead() line: 1420	
    	io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(io.netty.channel.ChannelHandlerContext) line: 1398	
    	io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeChannelActive() line: 213	
    	io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(io.netty.channel.AbstractChannelHandlerContext) line: 199	
    	io.netty.channel.DefaultChannelPipeline.fireChannelActive() line: 906	
    	io.netty.channel.socket.nio.NioSocketChannel$NioSocketChannelUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register0(io.netty.channel.ChannelPromise) line: 522	
    	io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(io.netty.channel.AbstractChannel$AbstractUnsafe, io.netty.channel.ChannelPromise) line: 427	
    	io.netty.channel.AbstractChannel$AbstractUnsafe$1.run() line: 486	
    	io.netty.util.concurrent.AbstractEventExecutor.safeExecute(java.lang.Runnable) line: 163	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).runAllTasks(long) line: 404	
    	io.netty.channel.nio.NioEventLoop.run() line: 495	
    	io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905	
    	java.lang.Thread.run() line: 748	
    
    

    select出OP_READ的处理与buf的读取

    读取数据线程栈:

    Thread [worker-2] (Suspended (breakpoint at line 57 in HttpContentDecoder))	
    	HttpContentDecompressor(HttpContentDecoder).decode(ChannelHandlerContext, HttpObject, List<Object>) line: 57	
    	HttpContentDecompressor(HttpContentDecoder).decode(ChannelHandlerContext, Object, List) line: 47	
    	HttpContentDecompressor(MessageToMessageDecoder<I>).channelRead(ChannelHandlerContext, Object) line: 88	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362	
    	AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340	
    	ByteToMessageDecoder.fireChannelRead(ChannelHandlerContext, CodecOutputList, int) line: 323	
    	ByteToMessageDecoder.fireChannelRead(ChannelHandlerContext, List<Object>, int) line: 310	
    	HttpRequestDecoder(ByteToMessageDecoder).callDecode(ChannelHandlerContext, ByteBuf, List<Object>) line: 426	
    	HttpRequestDecoder(ByteToMessageDecoder).channelRead(ChannelHandlerContext, Object) line: 278	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362	
    	AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340	
    	PipelineInitializer$ChannelActiveTester(ChannelInboundHandlerAdapter).channelRead(ChannelHandlerContext, Object) line: 86	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362	
    	AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340	
    	ReadTimeoutHandler(IdleStateHandler).channelRead(ChannelHandlerContext, Object) line: 286	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362	
    	AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelRead(Object) line: 340	
    	DefaultChannelPipeline$HeadContext.channelRead(ChannelHandlerContext, Object) line: 1408	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelRead(Object) line: 362	
    	AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext, Object) line: 348	
    	DefaultChannelPipeline.fireChannelRead(Object) line: 930	
    	NioSocketChannel$NioSocketChannelUnsafe(AbstractNioByteChannel$NioByteUnsafe).read() line: 163	
    	NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 677	
    	NioEventLoop.processSelectedKeysOptimized() line: 612	
    	NioEventLoop.processSelectedKeys() line: 529	
    	NioEventLoop.run() line: 491	
    	SingleThreadEventExecutor$5.run() line: 905	
    	Thread.run() line: 748	
    

    在processSelectedKey之后会用 NioSocketChannelUnsafe将数据读到Buf中。再交给Pipeline触发后面的每一个handler。
    此处要注意下读的时候 selectkey的attachment中放的是啥:

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;
    
            final Object a = k.attachment(); // 附件中放的是AbstractNioChannel  AbstractNioChannel中有unsafe对象,unsafe中能关联读取用的buf。
    
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
    
                // ...
            }
            
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // AbstractNioChannel中有unsafe对象
        // ...
         if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
    

    写出数据的处理

    写数据的处理可以在SocketChannelImpl [entry] - write(ByteBuffer) 处打方法断点观察。
    调用堆栈如下:

    Thread [worker-3] (Suspended (entry into method write in SocketChannelImpl))	
    	SocketChannelImpl.write(ByteBuffer) line: 458	
    	NioSocketChannel.doWrite(ChannelOutboundBuffer) line: 405	
    	NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush0() line: 938	
    	NioSocketChannel$NioSocketChannelUnsafe(AbstractNioChannel$AbstractNioUnsafe).flush0() line: 360	
    	NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush() line: 905	
    	DefaultChannelPipeline$HeadContext.flush(ChannelHandlerContext) line: 1370	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeFlush0() line: 776	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeFlush() line: 768	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749	
    	ReadTimeoutHandler(ChannelDuplexHandler).flush(ChannelHandlerContext) line: 117	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush0() line: 776	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush() line: 768	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749	
    	HttpResponseEncoder(ChannelOutboundHandlerAdapter).flush(ChannelHandlerContext) line: 115	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush0() line: 776	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush() line: 768	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749	
    	ChunkedWriteHandler.doFlush(ChannelHandlerContext) line: 335	
    	ChunkedWriteHandler.channelWritabilityChanged(ChannelHandlerContext) line: 148	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	HttpContentDecompressor(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	HttpRequestDecoder(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	PipelineInitializer$ChannelActiveTester(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	ReadTimeoutHandler(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	DefaultChannelPipeline$HeadContext.channelWritabilityChanged(ChannelHandlerContext) line: 1431	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelPipeline.fireChannelWritabilityChanged() line: 942	
    	ChannelOutboundBuffer$2.run() line: 608	
    	AbstractEventExecutor.safeExecute(Runnable) line: 163	
    	NioEventLoop(SingleThreadEventExecutor).runAllTasks(long) line: 404	
    	NioEventLoop.run() line: 495	
    	SingleThreadEventExecutor$5.run() line: 905	
    	Thread.run() line: 748	
    

    核心逻辑在:
    io.netty.channel.socket.nio.NioSocketChannel.doWrite(ChannelOutboundBuffer)

     case 1: {
        // Only one ByteBuf so use non-gathering write
        // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
        // to check if the total size of all the buffers is non-zero.
        ByteBuffer buffer = nioBuffers[0];
        int attemptedBytes = buffer.remaining();
        final int localWrittenBytes = ch.write(buffer);
        if (localWrittenBytes <= 0) {
            incompleteWrite(true);
            return;
        }
        adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
        in.removeBytes(localWrittenBytes);
        --writeSpinCount;
        break;
    }
    

    writeSpinCount默认是16,也就说说写出线程会尝试写16次,如果ch.write写不出去了再调用incompleteWrite,incompleteWrite里会注册OP_WRITE事件,此处是个优化,不是一上来就注册事件,是先尝试写,写不了再注册。

    注册OP_WRITE事件

    需要构造较大的响应包,在本地才能观察到。 我构造了将近200万字符,才达能目的。
    注册的地方打断点在 SelectionKeyImpl [entry] - interestOps(int) 遍可以观察
    堆栈如下:

    Thread [worker-3] (Suspended (entry into method interestOps in SelectionKeyImpl))	
    	SelectionKeyImpl.interestOps(int) line: 82	
    	NioSocketChannel(AbstractNioByteChannel).setOpWrite() line: 332	
    	NioSocketChannel(AbstractNioByteChannel).incompleteWrite(boolean) line: 289	
    	NioSocketChannel.doWrite(ChannelOutboundBuffer) line: 407	
    	NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush0() line: 938	
    	NioSocketChannel$NioSocketChannelUnsafe(AbstractNioChannel$AbstractNioUnsafe).flush0() line: 360	
    	NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush() line: 905	
    	DefaultChannelPipeline$HeadContext.flush(ChannelHandlerContext) line: 1370	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeFlush0() line: 776	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeFlush() line: 768	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749	
    	ReadTimeoutHandler(ChannelDuplexHandler).flush(ChannelHandlerContext) line: 117	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush0() line: 776	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush() line: 768	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749	
    	HttpResponseEncoder(ChannelOutboundHandlerAdapter).flush(ChannelHandlerContext) line: 115	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush0() line: 776	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush() line: 768	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 749	
    	ChunkedWriteHandler.doFlush(ChannelHandlerContext) line: 335	
    	ChunkedWriteHandler.channelWritabilityChanged(ChannelHandlerContext) line: 148	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	HttpContentDecompressor(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	HttpRequestDecoder(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	PipelineInitializer$ChannelActiveTester(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	ReadTimeoutHandler(ChannelInboundHandlerAdapter).channelWritabilityChanged(ChannelHandlerContext) line: 119	
    	DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelWritabilityChanged() line: 409	
    	DefaultChannelPipeline$HeadContext.channelWritabilityChanged(ChannelHandlerContext) line: 1431	
    	DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).invokeChannelWritabilityChanged() line: 434	
    	AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext) line: 416	
    	DefaultChannelPipeline.fireChannelWritabilityChanged() line: 942	
    	ChannelOutboundBuffer$2.run() line: 608	
    	AbstractEventExecutor.safeExecute(Runnable) line: 163	
    	NioEventLoop(SingleThreadEventExecutor).runAllTasks(long) line: 404	
    	NioEventLoop.run() line: 495	
    	SingleThreadEventExecutor$5.run() line: 905	
    	Thread.run() line: 748	
    

    netty里核心代码在:

    // io.netty.channel.nio.AbstractNioByteChannel.setOpWrite()
    protected final void setOpWrite() {
        final SelectionKey key = selectionKey();
        // Check first if the key is still valid as it may be canceled as part of the deregistration
        // from the EventLoop
        // See https://github.com/netty/netty/issues/2104
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
            key.interestOps(interestOps | SelectionKey.OP_WRITE);
        }
    }
    

    select出OP_WRITE的处理与buf的写出

    注册完之后再select到的写逻辑复用io.netty.channel.socket.nio.NioSocketChannel.doWrite(ChannelOutboundBuffer)
    调用栈如下:

    Thread [worker-3] (Suspended (entry into method write in SocketChannelImpl))	
    	SocketChannelImpl.write(ByteBuffer) line: 458	
    	NioSocketChannel.doWrite(ChannelOutboundBuffer) line: 405	
    	NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel$AbstractUnsafe).flush0() line: 938	
    	NioSocketChannel$NioSocketChannelUnsafe(AbstractNioChannel$AbstractNioUnsafe).forceFlush() line: 367	
    	NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) line: 671	
    	NioEventLoop.processSelectedKeysOptimized() line: 612	
    	NioEventLoop.processSelectedKeys() line: 529	
    	NioEventLoop.run() line: 491	
    	SingleThreadEventExecutor$5.run() line: 905	
    	Thread.run() line: 748	
    
    

    main线程与boss线程切换,boss线程与worker线程切换

    boss与worker的NioEventLoop的spin run是什么时候启动触发的
    main启动boss

    Thread [main] (Suspended (breakpoint at line 707 in java.lang.Thread))	
    	owns: java.lang.Thread  (id=80)	
    	java.lang.Thread.start() line: 707	
    	io.netty.util.concurrent.ThreadPerTaskExecutor.execute(java.lang.Runnable) line: 33	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).doStartThread() line: 894	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).startThread() line: 865	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).execute(java.lang.Runnable) line: 758	
    	io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register(io.netty.channel.EventLoop, io.netty.channel.ChannelPromise) line: 483	
    	io.netty.channel.nio.NioEventLoop(io.netty.channel.SingleThreadEventLoop).register(io.netty.channel.ChannelPromise) line: 80	
    	io.netty.channel.nio.NioEventLoop(io.netty.channel.SingleThreadEventLoop).register(io.netty.channel.Channel) line: 74	
    	io.netty.channel.nio.NioEventLoopGroup(io.netty.channel.MultithreadEventLoopGroup).register(io.netty.channel.Channel) line: 86	
    	io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).initAndRegister() line: 333	
    	io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).doBind(java.net.SocketAddress) line: 282	
    	io.netty.bootstrap.ServerBootstrap(io.netty.bootstrap.AbstractBootstrap<B,C>).bind(java.net.SocketAddress) line: 278	
    	org.restexpress.RestExpress.bind(java.net.InetSocketAddress) line: 709	
    	org.restexpress.RestExpress.bind(java.lang.String, int) line: 686	
    	com.code260.ss.resetexpress.RestExpressApp1.main(java.lang.String[]) line: 26	
    

    boss启动worker

    Thread [boss-0] (Suspended (breakpoint at line 707 in java.lang.Thread))	
    	owns: java.lang.Thread  (id=102)	
    	java.lang.Thread.start() line: 707	
    	io.netty.util.concurrent.ThreadPerTaskExecutor.execute(java.lang.Runnable) line: 33	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).doStartThread() line: 894	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).startThread() line: 865	
    	io.netty.channel.nio.NioEventLoop(io.netty.util.concurrent.SingleThreadEventExecutor).execute(java.lang.Runnable) line: 758	
    	io.netty.channel.socket.nio.NioSocketChannel$NioSocketChannelUnsafe(io.netty.channel.AbstractChannel$AbstractUnsafe).register(io.netty.channel.EventLoop, io.netty.channel.ChannelPromise) line: 483	
    	io.netty.channel.nio.NioEventLoop(io.netty.channel.SingleThreadEventLoop).register(io.netty.channel.ChannelPromise) line: 80	
    	io.netty.channel.nio.NioEventLoop(io.netty.channel.SingleThreadEventLoop).register(io.netty.channel.Channel) line: 74	
    	io.netty.channel.nio.NioEventLoopGroup(io.netty.channel.MultithreadEventLoopGroup).register(io.netty.channel.Channel) line: 86	
    	io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor.channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) line: 255	
    	io.netty.channel.DefaultChannelHandlerContext(io.netty.channel.AbstractChannelHandlerContext).invokeChannelRead(java.lang.Object) line: 362	
    	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object) line: 348	
    	io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).fireChannelRead(java.lang.Object) line: 340	
    	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) line: 1408	
    	io.netty.channel.DefaultChannelPipeline$HeadContext(io.netty.channel.AbstractChannelHandlerContext).invokeChannelRead(java.lang.Object) line: 362	
    	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object) line: 348	
    	io.netty.channel.DefaultChannelPipeline.fireChannelRead(java.lang.Object) line: 930	
    	io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe.read() line: 93	
    	io.netty.channel.nio.NioEventLoop.processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) line: 677	
    	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized() line: 612	
    	io.netty.channel.nio.NioEventLoop.processSelectedKeys() line: 529	
    	io.netty.channel.nio.NioEventLoop.run() line: 491	
    	io.netty.util.concurrent.SingleThreadEventExecutor$5.run() line: 905	
    	java.lang.Thread.run() line: 748	
    

    selector与线程绑定的问题

    一个selector对应一个channel,一个selector又对应固定的一个线程,所以一个channel上的数据多次读取不会在不同线程漂移
    当然,一个线程可以对应多个channel,但因为一个selector又对应固定的一个线程,所以这些channel用的同一个selector。

    多boss问题也即多accept问题

    什么时候用多accept?
    一般是用在多个端口bind,或者一个端口不同ip网卡平面bind时,多accept(即boss)线程才有意义。否则一个port通常只能bind出一个ServerSocketChannel出来,一个channel又对应一个Selector,一个Selector又对应一个线程在spin的方式去select,所以多个线程对一个bind也没用。如果把一个selector对应到多个线程去用问题在于,selector的方法线程不安全,对应到多个线程会有问题。
    当然后来的JDK版本,包括linux开始支持一个port+ip可以被多个进程多次绑定,这样多accept就有意义了。参见 《NIO trick and trap》这个ppt的 题外:SO_REUSEPORT 关键字。 应用场景是 “适合大量短连接的web server”

  • 相关阅读:
    218. The Skyline Problem
    327. 区间和的个数
    37 Sudoku Solver
    36. Valid Sudoku
    差分数组(1109. 航班预订统计)
    android开发里跳过的坑——onActivityResult在启动另一个activity的时候马上回调
    重启系统media服务
    android源码mm时的编译错误no ruler to make target `out/target/common/obj/JAVA_LIBRARIES/xxxx/javalib.jar', needed by `out/target/common/obj/APPS/xxxx_intermediates/classes-full-debug.jar'. Stop.
    关于android系统启动不同activity默认过渡动画不同的一些认识
    android开发里跳过的坑——android studio 错误Error:Execution failed for task ':processDebugManifest'. > Manifest merger failed with multiple errors, see logs
  • 原文地址:https://www.cnblogs.com/simoncook/p/11953311.html
Copyright © 2011-2022 走看看