zoukankan      html  css  js  c++  java
  • Netty之写成功后是怎么收到通知的

      有时候使用Netty要发送的两个消息有依赖关系,第一个发送成功才能发送第二个,代码里是可以这么写的

    ChannelFuture channelFuture = ch.writeAndFlush(line + "
    ");
                    channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                        @Override
                        public void operationComplete(Future<? super Void> future) throws Exception {
                            if (future.isSuccess()) {
                                System.out.print("first message send success" + "
    ");
                                ch.writeAndFlush("first message send success" + "
    ");
                            }
                        }
                    });

      上面一定要加" ",因为在客户端的初始化channel里配上了  new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()) 

      本文就通过源码看看这个listener是怎么实现的

    二 源码剖析

      版本为 

    <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.9.Final</version>
            </dependency>

      上一篇我们分析了客户端是怎么写消息出去的 https://www.cnblogs.com/juniorMa/p/14301756.html

      写消息分成两步

      1 把消息包成一个Entry放到缓存队列里 ChannelOutboundBuffer 

      2 执行flush,通过调用JDK中的channel.write完成真正的写消息

      在执行把消息加到消息缓存队列中时,包成Entry,其实是把Promise作为参数的。也就是说一个Entry有一个成员变量有提交时的Promise的引用。

      Entry entry = Entry.newInstance(msg, size, total(msg), promise);

      

      当执行flush之后,会调用各种channel的doWrite方法

       NioSocketChannel.doWrite() 

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            for (;;) {
                int size = in.size();
                if (size == 0) {
                    // All written so clear OP_WRITE
                    clearOpWrite();
                    break;
                }
                long writtenBytes = 0;
                boolean done = false;
                boolean setOpWrite = false;
    
                // Ensure the pending writes are made of ByteBufs only.
                ByteBuffer[] nioBuffers = in.nioBuffers();
                int nioBufferCnt = in.nioBufferCount();
                long expectedWrittenBytes = in.nioBufferSize();
                SocketChannel ch = javaChannel();
    
                // Always us nioBuffers() to workaround data-corruption.
                // See https://github.com/netty/netty/issues/2761
                switch (nioBufferCnt) {
                    case 0:
                        // We have something else beside ByteBuffers to write so fallback to normal writes.
                        super.doWrite(in);
                        return;
                    case 1:
                        // Only one ByteBuf so use non-gathering write
                        ByteBuffer nioBuffer = nioBuffers[0];
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                            final int localWrittenBytes = ch.write(nioBuffer);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                    default:
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                            final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                }
    
                // Release the fully written buffers, and update the indexes of the partially written buffer.
                in.removeBytes(writtenBytes);

      注意看最后的一行代码

    // Release the fully written buffers, and update the indexes of the partially written buffer.
    in.removeBytes(writtenBytes);

      依次处理每一个Entry

    public void removeBytes(long writtenBytes) {
            for (;;) {
                Object msg = current();
                if (!(msg instanceof ByteBuf)) {
                    assert writtenBytes == 0;
                    break;
                }
    
                final ByteBuf buf = (ByteBuf) msg;
                final int readerIndex = buf.readerIndex();
                final int readableBytes = buf.writerIndex() - readerIndex;
    
                if (readableBytes <= writtenBytes) {
                    if (writtenBytes != 0) {
                        progress(readableBytes);
                        writtenBytes -= readableBytes;
                    }
                    remove();

      

    public boolean remove() {
            Entry e = flushedEntry;
            if (e == null) {
                clearNioBuffers();
                return false;
            }
            Object msg = e.msg;
    
            ChannelPromise promise = e.promise;
            int size = e.pendingSize;
    
            removeEntry(e);
    
            if (!e.cancelled) {
                // only release message, notify and decrement if it was not canceled before.
                ReferenceCountUtil.safeRelease(msg);
                safeSuccess(promise);
                decrementPendingOutboundBytes(size, false, true);
            }
    safeSuccess就是执行通知每个listener逻辑的方法,通过这里就会执行我们添加在listener里的逻辑

      

  • 相关阅读:
    Effective C++ 学习笔记(12)
    Effective C++ 学习笔记(6)
    Effective C++ 学习笔记(13)
    Effective C++ 学习笔记(11)
    Effective C++ 学习笔记(10)
    (转)C++函数后加const的意义
    Effective C++ 学习笔记(14)
    Effective C++ 学习笔记(7)
    Effective C++ 学习笔记(9)
    Effective C++ 学习笔记(8)
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14308000.html
Copyright © 2011-2022 走看看