zoukankan      html  css  js  c++  java
  • Netty writeAndFlush() 流程与异步

    Netty writeAndFlush()方法分为两步, 先 write 再 flush

        @Override
        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
            DefaultChannelHandlerContext next;
            next = findContextOutbound(MASK_WRITE);
            ReferenceCountUtil.touch(msg, next);
            next.invoker.invokeWrite(next, msg, promise);
            next = findContextOutbound(MASK_FLUSH);
            next.invoker.invokeFlush(next);
            return promise;
        }

    以上是DefaultChannelHandlerContext中的writeAndFlush方法, 可见实际上是先调用了write, 然后调用flush

    1. write

    write方法从TailHandler开始, 穿过中间自定义的各种handler以后到达HeadHandler, 然后调用了HeadHandler的成员变量Unsafe的write

    如下

            @Override
            public void write(Object msg, ChannelPromise promise) {
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    // If the outboundBuffer is null we know the channel was closed and so
                    // need to fail the future right away. If it is not null the handling of the rest
                    // will be done in flush0()
                    // See https://github.com/netty/netty/issues/2362
                    safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
                    // release message now to prevent resource-leak
                    ReferenceCountUtil.release(msg);
                    return;
                }
                outboundBuffer.addMessage(msg, promise);
            }

    最终会把需要write的msg和promise(也就是一个future, 我们拿到手的future, 添加Listener的也是这个)放入到outboundBuffer中, msg和promise在outboundBuffer中的存在形式是一个自定义的结构体Entry.

    也就是说调用write方法实际上并不是真的将消息写出去, 而是将消息和此次操作的promise放入到了一个队列中

    2. flush

    flush也是从Tail开始, 最后到Head, 最终调用的也是Head里的unsafe的flush0()方法, 然后flush0()里再调用doWrite()方法, 如下:

     @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            int writeSpinCount = -1;
    
            for (;;) {
                Object msg = in.current();
                if (msg == null) {
                    // Wrote all messages.
                    clearOpWrite();
                    break;
                }
    
                if (msg instanceof ByteBuf) {
                    ByteBuf buf = (ByteBuf) msg;
                    int readableBytes = buf.readableBytes();
                    if (readableBytes == 0) {
                        in.remove();
                        continue;
                    }
    
                    boolean setOpWrite = false;
                    boolean done = false;
                    long flushedAmount = 0;
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }
                    for (int i = writeSpinCount - 1; i >= 0; i --) {
                        int localFlushedAmount = doWriteBytes(buf); // 这里才是实际将数据写出去的地方if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }
    
                        flushedAmount += localFlushedAmount;
                        if (!buf.isReadable()) {
                            done = true;
                            break;
                        }
                    }
    
                    in.progress(flushedAmount);
    
                    if (done) {
                        in.remove();
                    } else {
                        incompleteWrite(setOpWrite);
                        break;
                    }
                } else if (msg instanceof FileRegion) {
                    FileRegion region = (FileRegion) msg;
                    boolean setOpWrite = false;
                    boolean done = false;
                    long flushedAmount = 0;
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }
                    for (int i = writeSpinCount - 1; i >= 0; i --) {
                        long localFlushedAmount = doWriteFileRegion(region);
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }
    
                        flushedAmount += localFlushedAmount;
                        if (region.transfered() >= region.count()) {
                            done = true;
                            break;
                        }
                    }
    
                    in.progress(flushedAmount);
    
                    if (done) {
                        in.remove(); // 根据写出的数据的数量情况, 来判断操作是否完成, 如果完成则调用 in.remove()
                    } else {
                        incompleteWrite(setOpWrite);
                        break;
                    }
                } else {
                    throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
                }
            }
        }

    红字部分就是最后将数据写出去的地方, 这里写数据最终调用的是 GatheringByteChannel 的 write() 方法, 这是个原生Java接口, 具体实现依赖于实现这个接口的Java类, 例如会调用 NIO 的 SocketChannel 的write()方法, 至此, 实际写数据的过程出现了, SocketChannel可以运行在non-blocking模式, 也就是非阻塞异步模式, write数据会马上返回写入的数据数量 (并不一定是所有数据都写入成功, 对于是否写入了所有数据, Netty有自己的处理逻辑, 也就是上面代码中的红字的那段for循环, 具体参看下SocketChannel的javadoc和netty源码).

    当所有数据写入SocketChannel成功, 开始调用in.remove(), 这个 in 就是第一步 1. write 里的那个 outboundBuffer, 他的类型是 ChannelOutboundBuffer, 代码如下:

        public final boolean remove() {
            if (isEmpty()) {
                return false;
            }
    
            Entry e = buffer[flushed];
            Object msg = e.msg;
            if (msg == null) {
                return false;
            }
    
            ChannelPromise promise = e.promise;
            int size = e.pendingSize;
    
            e.clear();
    
            flushed = flushed + 1 & buffer.length - 1;
    
            if (!e.cancelled) {
                // only release message, notify and decrement if it was not canceled before.
                safeRelease(msg);
                safeSuccess(promise); // 这里, 调用了promise的trySuccess()方法, 触发Listener
                decrementPendingOutboundBytes(size);
            }
    
            return true;
        }

    最后会调用Promise的notifyListeners()操作, 触发Listener完成整个异步流程

    ---------

    最后, 回到我们应用netty的时候的代码

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ctx.writeAndFlush(new Object()).addListener(new ChannelFutureListener() {
    
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        // do sth
                    } else {
                        // do sth
                    }
                }
            });
        }

    这就是整个流程

    最后提一下, Netty的AbstractNioChannel里封装了selectionKey, 在accept socket的时候, socket会被注册到eventLoop()的Selector, 这个selectionKey就会被赋值,  如下

    selectionKey = javaChannel().register(eventLoop().selector, 0, this);

    在以后Selector的select()的时候,  则会通过这个key来获取到channel, 然后调用 AbstractChannel 里的 DefaultChannelPipeline 来触发 Handler 的 connect, read, write 等等事件...

  • 相关阅读:
    MySQL Partition--分区基础
    MySQL Replication--跳过复制错误
    MySQL--SHOW PROCESSLIST
    MySQL InnoDB Engine--缓冲器数据交换
    MySQL InnoDB Engine--数据预热
    MySQL Profiling--常用命令
    Linux--用户管理
    vi和vim快捷键的使用
    vi和vim
    xftp使用
  • 原文地址:https://www.cnblogs.com/zemliu/p/3667332.html
Copyright © 2011-2022 走看看