zoukankan      html  css  js  c++  java
  • ChannelOutboundBuffer

    ChannelOutboundBuffer

    书接上文,NioSocketChannelUnsafe.write()方法,实际上调用的是AbstractUnsafe的write方法。

            public final void write(Object msg, ChannelPromise promise) {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    try {
                        // release message now to prevent resource-leak
                        ReferenceCountUtil.release(msg);
                    } finally {
                        // If the outboundBuffer is null we know the channel was closed and so
                        // need to fail the future right away. If it is not null the handling of the rest
                        // will be done in flush0()
                        // See https://github.com/netty/netty/issues/2362
                        safeSetFailure(promise,
                                newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
                    }
                    return;
                }
    
                int size;
                try {
                    msg = filterOutboundMessage(msg);
                    size = pipeline.estimatorHandle().size(msg);
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable t) {
                    try {
                        ReferenceCountUtil.release(msg);
                    } finally {
                        safeSetFailure(promise, t);
                    }
                    return;
                }
    
                outboundBuffer.addMessage(msg, size, promise);
            }

    它使用了ChannelOutboundBuffer,ChannelOutboundBuffer是一个通道的出站缓冲区,所有要写的数据都会先存在这里,等到要刷新的时候才会真的写出去。每个 ChannelSocket 的 Unsafe 都有一个绑定的 ChannelOutboundBuffer , Netty 向站外输出数据的过程统一通过 ChannelOutboundBuffer 类进行封装,目的是为了提高网络的吞吐量,在外面调用 write 的时候,数据并没有写到 Socket,而是写到了 ChannelOutboundBuffer 这里,当调用 flush 的时候,才真正的向 Socket 写出。每个Unsafe都会绑定一个ChannelOutboundBuffer,也就是说每个客户端连接上服务端都会创建一个ChannelOutboundBuffer绑定客户端Channel。Netty设计ChannelOutboundBuffer是为了减少TCP缓存的压力提高系统的吞吐率。

    看下属性

    消息都是封装成内部的Entry类的,存储结构是一个单链表。其实他是一个对象池,可以复用:

        static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =//出站实体的额外开销96字节
                SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
        //保存线程对应的缓冲区,默认是1024个ByteBuffer数组,FastThreadLocal比一般的ThreadLocal要快,他是利用数组,内部用的是常量索引的数组,不是hash算法
        private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
            @Override
            protected ByteBuffer[] initialValue() throws Exception {
                return new ByteBuffer[1024];
            }
        };
    // 单链表结构
        // The Entry that is the first in the linked-list structure that was flushed
        private Entry flushedEntry;//第一个要冲刷的实体
        // The Entry which is the first unflushed in the linked-list structure
        private Entry unflushedEntry;//第一个未冲刷的实体
        // The Entry which represents the tail of the buffer
        private Entry tailEntry;//尾结点实体
        // The number of flushed entries that are not written yet
        private int flushed;//要冲刷的数量,但是还没真正冲刷出去,就是出站缓冲区大小
    
         private int nioBufferCount;//可以冲刷的缓冲区个数
        private long nioBufferSize;//可以写出的总的缓冲区数组数据大小
        private boolean inFail;//是否冲刷失败
    
    //原子操作totalPendingSize
        private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
                AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
    
        @SuppressWarnings("UnusedDeclaration")
        private volatile long totalPendingSize;//待冲刷缓冲区的字节总数
    
    //原子操作unwritable
        private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
                AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
    
        @SuppressWarnings("UnusedDeclaration")
        private volatile int unwritable;
    
        private volatile Runnable fireChannelWritabilityChangedTask;//写能力改变的任务

    Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)

    flushedEntry(包括)到unflushedEntry之间的就是待发送数据,unflushedEntry(包括)到tailEntry就是暂存数据,flushed就是待发送数据个数。




     调用 addMessage 方法的时候,创建一个 Entry ,将这个 Entry 追加到 TailEntry 节点后面,调用 addFlush 的时候,将 unflushedEntry 的引用赋给 flushedEntry,表示即将从这里开始刷新。然后将 unflushedEntry 置为 null。当数据被写进 Socket 后,从 flushedEntry(current) 节点开始,循环将每个节点删除。

    ChannelOutboundBuffer的addMessage

     将直接缓冲区添加到出站缓冲区中,不过是会创建一个实体Entry,然后用一个单链表结构来存取的

    public void addMessage(Object msg, int size, ChannelPromise promise) {
            Entry entry = Entry.newInstance(msg, size, total(msg), promise);//创建entry节点
            if (tailEntry == null) {
                flushedEntry = null;
            } else {
                Entry tail = tailEntry;
                tail.next = entry;
            }
            tailEntry = entry;
            if (unflushedEntry == null) {
                unflushedEntry = entry;
            }
    
            // increment pending bytes after adding message to the unflushed arrays.
            // See https://github.com/netty/netty/issues/1619
            incrementPendingOutboundBytes(entry.pendingSize, false);//增加待冲刷的消息
        }
    1. 根据 ByteBuf 相互属性和 promise 创建一个 Entry 节点。
    2. 将新的节点追加到 tailEntry 节点上。如果考虑之前的全部被清空了话,则新节点就是唯一节点,unflushedEntry 属性就是新的节点。可对照上面的图来看。
    3. 使用 CAS 将 totalPendingSize(总的数据大小) 属性增加 Entry 实例的大小(96 字节) + 真实数据的大小。

    内部类Entry

        static final class Entry {
         //Entry对象池
    private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() { @Override public Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }); private final Handle<Entry> handle;//池化操作的处理器 Entry next;//链表的下一个 Object msg;//信息 ByteBuffer[] bufs;//缓存字节缓冲区数组,为了复用提高效率 ByteBuffer buf;//缓存字节缓冲区,为了复用提高效率 ChannelPromise promise;//回调 long progress;//当前进度,即已经传了多少数据 long total;//总共的数据大小 int pendingSize;//待冲刷的评估大小,要加上96 int count = -1; boolean cancelled;//是否被取消了 private Entry(Handle<Entry> handle) { this.handle = handle; }      //用对象池的方式创建实体 static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get();//从池子里获取 entry.msg = msg;//消息 entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;//评估的大小 entry.total = total;//具体大小 entry.promise = promise; return entry; }      //取消了,返回待冲刷的评估大小 int cancel() { if (!cancelled) { cancelled = true;//取消标识 int pSize = pendingSize; // release message and replace with an empty buffer ReferenceCountUtil.safeRelease(msg);//释放 msg = Unpooled.EMPTY_BUFFER; pendingSize = 0; total = 0; progress = 0; bufs = null; buf = null; return pSize; } return 0; }     //用完初始化后再放回收到池子里 void recycle() { next = null; bufs = null; buf = null; msg = null; promise = null; progress = 0; total = 0; pendingSize = 0; count = -1; cancelled = false; handle.recycle(this); }      //回收当前实体并获取下一个实体,为什么要先获取下一个再回收呢,因为回收的时候把next设置null Entry recycleAndGetNext() { Entry next = this.next; recycle(); return next; } }

    创建Entry节点

    static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
                Entry entry = RECYCLER.get();
                entry.msg = msg;
                entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
                entry.total = total;
                entry.promise = promise;
                return entry;
            }

    方法RECYCLER.get();

    private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
                @Override
                public Entry newObject(Handle<Entry> handle) {
                    return new Entry(handle);
                }
            });
    public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {
            return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));
        }
    
        private static final class RecyclerObjectPool<T> extends ObjectPool<T> {
            private final Recycler<T> recycler;
    
            RecyclerObjectPool(final ObjectCreator<T> creator) {
                 recycler = new Recycler<T>() {
                    @Override
                    protected T newObject(Handle<T> handle) {
                        return creator.newObject(handle);
                    }
                };
            }
    
            @Override
            public T get() {
                return recycler.get();
            }
        }

     最终调用Recycler.get()

    public final T get() {
            if (maxCapacityPerThread == 0) {
                return newObject((Handle<T>) NOOP_HANDLE);
            }
            Stack<T> stack = threadLocal.get();
            DefaultHandle<T> handle = stack.pop();
            if (handle == null) {
                handle = stack.newHandle();
                handle.value = newObject(handle);
            }
            return (T) handle.value;
        }

    而newObject(handle)调用的即是

     也就是

     这块有点绕,用的是匿名内部类。具体流程就是:

    Netty 将在 ThreadLocalMap 中存储了一个 Stack (栈)对象,存储重复使用的 DefaultHandle 实例,该实例的 value 属性就是 Entry ,所以这个 Entry 也是重复使用的,每次用完所有参数置为 null,再返回到栈中,下次再用,从这个栈中弹出。重复利用。对象池的最佳实践。而且是保存再线程中,速度更快,不会有线程竞争。

    ChannelOutboundBuffer的addFlush

    当 addMessage 成功添加进 ChannelOutboundBuffer 后,就需要 flush 刷新到 Socket 中去。但是这个方法并不是做刷新到 Socket 的操作。而是将暂存数据节点变成待发送节点,需要发送的数据,是flushedEntry指向的节点到unflushedEntry指向的节点(不包含unflushedEntry)的之间的节点数据,unflushedEntry 的引用转移到 flushedEntry 引用中,表示即将刷新这个 flushedEntry,下次发送要将flushedEntry指向unflushedEntry指向的节点作为发送数据的起始节点。 Netty 提供了 promise,这个对象可以做取消操作,例如,不发送这个 ByteBuf 了,所以,在 write 之后,flush 之前需要告诉 promise 不能做取消操作了。

    public void addFlush() {
    
            Entry entry = unflushedEntry;//第一个没冲刷的数据,也是链表的第一个
            if (entry != null) {//有数据才刷了
                if (flushedEntry == null) {
                    // there is no flushedEntry yet, so start with the entry
                    flushedEntry = entry;//设置第一个要冲刷的实体
                }
                do {
                    flushed ++;//冲刷数+1
                    if (!entry.promise.setUncancellable()) {//如果取消的话需要回收内存
                        // Was cancelled so make sure we free up memory and notify about the freed bytes
                        int pending = entry.cancel();
                        decrementPendingOutboundBytes(pending, false, true);
                    }
                    entry = entry.next;
                } while (entry != null);//遍历冲刷是否有取消的
    
                // All flushed so reset unflushedEntry
                unflushedEntry = null;//重置未冲刷的
            }
        }
    1. 首先拿到未刷新的头节点。
    2. 判 null 之后,将这个 unflushedEntry 赋值给 flushedEntry,而这里的判 null 是做什么呢?防止多次调用 flush 。
    3. 循环尝试设置这些节点,告诉他们不能做取消操作了,如果尝试失败了,就将这个节点取消,在调用 nioBuffers 方法的时候,这个节点会被忽略。同时将 totalPendingSize 相应的减小。

    设置之后,promise 调用 cancel 方法就会返回 false。

    在调用完 outboundBuffer.addFlush() 方法后,Channel 会调用 flush0 方法做真正的刷新。

    首先调用自己的flush0 方法

    AbstractNioUnsafe的flush0

    protected final void flush0() {
                // Flush immediately only when there's no pending flush.
                // If there's a pending flush operation, event loop will call forceFlush() later,
                // and thus there's no need to call it now.
                if (!isFlushPending()) {
                    super.flush0();
                }
            }

    AbstractNioUnsafe的isFlushPending

    判断下是否已经有待冲刷存在,也就是有设置OP_WRITE事件

    private boolean isFlushPending() {
                SelectionKey selectionKey = selectionKey();
                return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
            }

    这里的判断是:如果注册了写事件,就暂时不写了,因为缓冲区到了水位线了,所以这次直接返回,等会再写。等到 EventLoop 触发写事件了,就会调用 ch.unsafe().forceFlush() 方法将数据刷新到 TCP 缓冲区。NIO 的写事件大部分时候是不需要注册的,只有当 TCP 缓冲区达到水位线了,不能写入了,才需要注册写事件。当缓冲区有空间了,NIO 就会触发写事件。

    调用了父类的flush0方法,即AbstractUnsafe的flush0

    AbstractUnsafe的flush0

    protected void flush0() {
                if (inFlush0) {
                    // Avoid re-entrance
                    return;
                }
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                    return;
                }
    
                inFlush0 = true;
    
                // Mark all pending write requests as failure if the channel is inactive.
                if (!isActive()) {
                    try {
                        // Check if we need to generate the exception at all.
                        if (!outboundBuffer.isEmpty()) {
                            if (isOpen()) {
                                outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                            } else {
                                // Do not trigger channelWritabilityChanged because the channel is closed already.
                                outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                            }
                        }
                    } finally {
                        inFlush0 = false;
                    }
                    return;
                }
    
                try {
                    doWrite(outboundBuffer);
                } catch (Throwable t) {
                    handleWriteError(t);
                } finally {
                    inFlush0 = false;
                }
            }

    主要是doWrite(outboundBuffer);

    关键代码

    // 拿到NIO Socket
    SocketChannel ch = javaChannel();
    // 获取自旋的次数,默认16
    int writeSpinCount = config().getWriteSpinCount();
    // 获取设置的每个 ByteBuf 的最大字节数,这个数字来自操作系统的 so_sndbuf 定义
    int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
    // 调用 ChannelOutboundBuffer 的 nioBuffers 方法获取 ByteBuffer 数组,从flushedEntry开始,循环获取
    ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
    // ByteBuffer 的数量
    int nioBufferCnt = in.nioBufferCount();
    // 使用 NIO 写入 Socket 
    ch.write(buffer);
    // 调整最大字节数
    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
    // 删除 ChannelOutboundBuffer 中的 Entry
    in.removeBytes(localWrittenBytes);
    // 自旋减一,直到自旋小于0停止循环,当然如果 ChannelOutboundBuffer 空了,也会停止。
    --writeSpinCount;
    // 如果自旋16次还没有完成 flush,则创建一个任务放进mpsc 队列中执行。
    incompleteWrite(writeSpinCount < 0);

    ChannelOutboundBuffer的incrementPendingOutboundBytes

    如果对方 Socket 接收很慢,ChannelOutboundBuffer 就会积累很多的数据。并且这个 ChannelOutboundBuffer 是没有大小限制的链表。可能会导致 OOM,所以在 addMessage 方法的最后一行,incrementPendingOutboundBytes方法,会判断 totalPendingSize 的大小是否超过了高水位阈值(默认64 kb),如果超过,关闭写开关,调用 piepeline 的 fireChannelWritabilityChanged 方法可改变 flush 策略。

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
            if (size == 0) {
                return;
            }
    
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
            if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {//如果大于配置的16位大小
                setUnwritable(invokeLater);//设置不可写
            }
        }

    ChannelOutboundBuffer的setUnwritable

    将unwritable原子操作改为非0,然后触发fireChannelWritabilityChanged

    private void setUnwritable(boolean invokeLater) {
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue | 1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue == 0) {
                        fireChannelWritabilityChanged(invokeLater);
                    }
                    break;
                }
            }
        }

    如果是立即改变,就会调用pipeline.fireChannelWritabilityChanged();,就会从头结点开始传递这个事件,否则就给通道的事件循环提交个任务

    private void fireChannelWritabilityChanged(boolean invokeLater) {
            final ChannelPipeline pipeline = channel.pipeline();
            if (invokeLater) {
                Runnable task = fireChannelWritabilityChangedTask;
                if (task == null) {
                    fireChannelWritabilityChangedTask = task = new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelWritabilityChanged();
                        }
                    };
                }
                channel.eventLoop().execute(task);
            } else {
                pipeline.fireChannelWritabilityChanged();
            }
        }

    当 Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成的太快(以避免发生 OOM)或者可以在 Channel 变为再次可写时恢复写入。可以通过调用 Channel 的 isWritable 方法来检测 Channel 的可写性。与可写性相关的阈值可以通过 Channel.config().setWriteBufferHighWaterMark 和 Channel.config().setWriteBufferLowWaterMark 方法来设置,默认最小 32 kb,最大 64 kb。

    恢复可写状态,remove 的时候,或者 addFlush 是丢弃了某个节点,会对 totalPendingSize 进行削减,削减之后进行判断。如果 totalPendingSize 小于最低水位了。就恢复写入。

    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
            if (size == 0) {
                return;
            }
    
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
            if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
                setWritable(invokeLater);
            }
        }

    默认的情况下,ChannelOutboundBuffer 缓存区的大小最大是 64 kb,最小是 32 kb

    当不能写的时候,就会调用 ChannelWritabilityChanged 方法,用户可以在代码中,让写操作进行的慢一点。

    setWritable

    原子操作修改成可写状态:

    private void setWritable(boolean invokeLater) {
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue & ~1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue != 0 && newValue == 0) {
                        fireChannelWritabilityChanged(invokeLater);
                    }
                    break;
                }
            }
        }

    总结

    Netty 的 write 的操作不会立即写入,而是存储在了 ChannelOutboundBuffer 缓冲区里,这个缓冲区内部是 Entry 节点组成的链表结构,通过 addMessage 方法添加进链表,通过 addFlush 方法表示可以开始写入了,最后通过 SocketChannel 的 flush0 方法真正的写入到 JDK 的 Socket 中。同时需要注意如果 TCP 缓冲区到达一个水位线了,不能写入 TCP 缓冲区了,就需要晚点写入,这里的方法判断是 isFlushPending()。

    其中,有一个需要注意的点就是,如果对方接收数据较慢,可能导致缓冲区存在大量的数据无法释放,导致OOM,Netty 通过一个 isWritable 开关尝试解决此问题,但用户需要重写 ChannelWritabilityChanged 方法,因为一旦超过默认的高水位阈值,Netty 就会调用 ChannelWritabilityChanged 方法,执行完毕后,继续进行 flush。用户可以在该方法中尝试慢一点的操作。等到缓冲区的数据小于低水位的值时,开关就关闭了,就不会调用 ChannelWritabilityChanged 方法。因此,合理设置这两个数值也挺重要的。

  • 相关阅读:
    SPOJ 694 (后缀数组) Distinct Substrings
    POJ 2774 (后缀数组 最长公共字串) Long Long Message
    POJ 3693 (后缀数组) Maximum repetition substring
    POJ 3261 (后缀数组 二分) Milk Patterns
    UVa 1149 (贪心) Bin Packing
    UVa 12206 (字符串哈希) Stammering Aliens
    UVa 11210 (DFS) Chinese Mahjong
    UVa (BFS) The Monocycle
    UVa 11624 (BFS) Fire!
    HDU 3032 (Nim博弈变形) Nim or not Nim?
  • 原文地址:https://www.cnblogs.com/xiaojiesir/p/15435301.html
Copyright © 2011-2022 走看看