zoukankan      html  css  js  c++  java
  • ChannelOutboundBuffer

    ChannelOutboundBuffer

    书接上文,NioSocketChannelUnsafe.write()方法,实际上调用的是AbstractUnsafe的write方法。

            public final void write(Object msg, ChannelPromise promise) {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    try {
                        // release message now to prevent resource-leak
                        ReferenceCountUtil.release(msg);
                    } finally {
                        // If the outboundBuffer is null we know the channel was closed and so
                        // need to fail the future right away. If it is not null the handling of the rest
                        // will be done in flush0()
                        // See https://github.com/netty/netty/issues/2362
                        safeSetFailure(promise,
                                newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
                    }
                    return;
                }
    
                int size;
                try {
                    msg = filterOutboundMessage(msg);
                    size = pipeline.estimatorHandle().size(msg);
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable t) {
                    try {
                        ReferenceCountUtil.release(msg);
                    } finally {
                        safeSetFailure(promise, t);
                    }
                    return;
                }
    
                outboundBuffer.addMessage(msg, size, promise);
            }

    它使用了ChannelOutboundBuffer,ChannelOutboundBuffer是一个通道的出站缓冲区,所有要写的数据都会先存在这里,等到要刷新的时候才会真的写出去。每个 ChannelSocket 的 Unsafe 都有一个绑定的 ChannelOutboundBuffer , Netty 向站外输出数据的过程统一通过 ChannelOutboundBuffer 类进行封装,目的是为了提高网络的吞吐量,在外面调用 write 的时候,数据并没有写到 Socket,而是写到了 ChannelOutboundBuffer 这里,当调用 flush 的时候,才真正的向 Socket 写出。每个Unsafe都会绑定一个ChannelOutboundBuffer,也就是说每个客户端连接上服务端都会创建一个ChannelOutboundBuffer绑定客户端Channel。Netty设计ChannelOutboundBuffer是为了减少TCP缓存的压力提高系统的吞吐率。

    看下属性

    消息都是封装成内部的Entry类的,存储结构是一个单链表。其实他是一个对象池,可以复用:

        static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =//出站实体的额外开销96字节
                SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
        //保存线程对应的缓冲区,默认是1024个ByteBuffer数组,FastThreadLocal比一般的ThreadLocal要快,他是利用数组,内部用的是常量索引的数组,不是hash算法
        private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
            @Override
            protected ByteBuffer[] initialValue() throws Exception {
                return new ByteBuffer[1024];
            }
        };
    // 单链表结构
        // The Entry that is the first in the linked-list structure that was flushed
        private Entry flushedEntry;//第一个要冲刷的实体
        // The Entry which is the first unflushed in the linked-list structure
        private Entry unflushedEntry;//第一个未冲刷的实体
        // The Entry which represents the tail of the buffer
        private Entry tailEntry;//尾结点实体
        // The number of flushed entries that are not written yet
        private int flushed;//要冲刷的数量,但是还没真正冲刷出去,就是出站缓冲区大小
    
         private int nioBufferCount;//可以冲刷的缓冲区个数
        private long nioBufferSize;//可以写出的总的缓冲区数组数据大小
        private boolean inFail;//是否冲刷失败
    
    //原子操作totalPendingSize
        private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
                AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
    
        @SuppressWarnings("UnusedDeclaration")
        private volatile long totalPendingSize;//待冲刷缓冲区的字节总数
    
    //原子操作unwritable
        private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
                AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
    
        @SuppressWarnings("UnusedDeclaration")
        private volatile int unwritable;
    
        private volatile Runnable fireChannelWritabilityChangedTask;//写能力改变的任务

    Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)

    flushedEntry(包括)到unflushedEntry之间的就是待发送数据,unflushedEntry(包括)到tailEntry就是暂存数据,flushed就是待发送数据个数。




     调用 addMessage 方法的时候,创建一个 Entry ,将这个 Entry 追加到 TailEntry 节点后面,调用 addFlush 的时候,将 unflushedEntry 的引用赋给 flushedEntry,表示即将从这里开始刷新。然后将 unflushedEntry 置为 null。当数据被写进 Socket 后,从 flushedEntry(current) 节点开始,循环将每个节点删除。

    ChannelOutboundBuffer的addMessage

     将直接缓冲区添加到出站缓冲区中,不过是会创建一个实体Entry,然后用一个单链表结构来存取的

    public void addMessage(Object msg, int size, ChannelPromise promise) {
            Entry entry = Entry.newInstance(msg, size, total(msg), promise);//创建entry节点
            if (tailEntry == null) {
                flushedEntry = null;
            } else {
                Entry tail = tailEntry;
                tail.next = entry;
            }
            tailEntry = entry;
            if (unflushedEntry == null) {
                unflushedEntry = entry;
            }
    
            // increment pending bytes after adding message to the unflushed arrays.
            // See https://github.com/netty/netty/issues/1619
            incrementPendingOutboundBytes(entry.pendingSize, false);//增加待冲刷的消息
        }
    1. 根据 ByteBuf 相互属性和 promise 创建一个 Entry 节点。
    2. 将新的节点追加到 tailEntry 节点上。如果考虑之前的全部被清空了话,则新节点就是唯一节点,unflushedEntry 属性就是新的节点。可对照上面的图来看。
    3. 使用 CAS 将 totalPendingSize(总的数据大小) 属性增加 Entry 实例的大小(96 字节) + 真实数据的大小。

    内部类Entry

        static final class Entry {
         //Entry对象池
    private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() { @Override public Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }); private final Handle<Entry> handle;//池化操作的处理器 Entry next;//链表的下一个 Object msg;//信息 ByteBuffer[] bufs;//缓存字节缓冲区数组,为了复用提高效率 ByteBuffer buf;//缓存字节缓冲区,为了复用提高效率 ChannelPromise promise;//回调 long progress;//当前进度,即已经传了多少数据 long total;//总共的数据大小 int pendingSize;//待冲刷的评估大小,要加上96 int count = -1; boolean cancelled;//是否被取消了 private Entry(Handle<Entry> handle) { this.handle = handle; }      //用对象池的方式创建实体 static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get();//从池子里获取 entry.msg = msg;//消息 entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;//评估的大小 entry.total = total;//具体大小 entry.promise = promise; return entry; }      //取消了,返回待冲刷的评估大小 int cancel() { if (!cancelled) { cancelled = true;//取消标识 int pSize = pendingSize; // release message and replace with an empty buffer ReferenceCountUtil.safeRelease(msg);//释放 msg = Unpooled.EMPTY_BUFFER; pendingSize = 0; total = 0; progress = 0; bufs = null; buf = null; return pSize; } return 0; }     //用完初始化后再放回收到池子里 void recycle() { next = null; bufs = null; buf = null; msg = null; promise = null; progress = 0; total = 0; pendingSize = 0; count = -1; cancelled = false; handle.recycle(this); }      //回收当前实体并获取下一个实体,为什么要先获取下一个再回收呢,因为回收的时候把next设置null Entry recycleAndGetNext() { Entry next = this.next; recycle(); return next; } }

    创建Entry节点

    static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
                Entry entry = RECYCLER.get();
                entry.msg = msg;
                entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
                entry.total = total;
                entry.promise = promise;
                return entry;
            }

    方法RECYCLER.get();

    private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
                @Override
                public Entry newObject(Handle<Entry> handle) {
                    return new Entry(handle);
                }
            });
    public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {
            return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));
        }
    
        private static final class RecyclerObjectPool<T> extends ObjectPool<T> {
            private final Recycler<T> recycler;
    
            RecyclerObjectPool(final ObjectCreator<T> creator) {
                 recycler = new Recycler<T>() {
                    @Override
                    protected T newObject(Handle<T> handle) {
                        return creator.newObject(handle);
                    }
                };
            }
    
            @Override
            public T get() {
                return recycler.get();
            }
        }

     最终调用Recycler.get()

    public final T get() {
            if (maxCapacityPerThread == 0) {
                return newObject((Handle<T>) NOOP_HANDLE);
            }
            Stack<T> stack = threadLocal.get();
            DefaultHandle<T> handle = stack.pop();
            if (handle == null) {
                handle = stack.newHandle();
                handle.value = newObject(handle);
            }
            return (T) handle.value;
        }

    而newObject(handle)调用的即是

     也就是

     这块有点绕,用的是匿名内部类。具体流程就是:

    Netty 将在 ThreadLocalMap 中存储了一个 Stack (栈)对象,存储重复使用的 DefaultHandle 实例,该实例的 value 属性就是 Entry ,所以这个 Entry 也是重复使用的,每次用完所有参数置为 null,再返回到栈中,下次再用,从这个栈中弹出。重复利用。对象池的最佳实践。而且是保存再线程中,速度更快,不会有线程竞争。

    ChannelOutboundBuffer的addFlush

    当 addMessage 成功添加进 ChannelOutboundBuffer 后,就需要 flush 刷新到 Socket 中去。但是这个方法并不是做刷新到 Socket 的操作。而是将暂存数据节点变成待发送节点,需要发送的数据,是flushedEntry指向的节点到unflushedEntry指向的节点(不包含unflushedEntry)的之间的节点数据,unflushedEntry 的引用转移到 flushedEntry 引用中,表示即将刷新这个 flushedEntry,下次发送要将flushedEntry指向unflushedEntry指向的节点作为发送数据的起始节点。 Netty 提供了 promise,这个对象可以做取消操作,例如,不发送这个 ByteBuf 了,所以,在 write 之后,flush 之前需要告诉 promise 不能做取消操作了。

    public void addFlush() {
    
            Entry entry = unflushedEntry;//第一个没冲刷的数据,也是链表的第一个
            if (entry != null) {//有数据才刷了
                if (flushedEntry == null) {
                    // there is no flushedEntry yet, so start with the entry
                    flushedEntry = entry;//设置第一个要冲刷的实体
                }
                do {
                    flushed ++;//冲刷数+1
                    if (!entry.promise.setUncancellable()) {//如果取消的话需要回收内存
                        // Was cancelled so make sure we free up memory and notify about the freed bytes
                        int pending = entry.cancel();
                        decrementPendingOutboundBytes(pending, false, true);
                    }
                    entry = entry.next;
                } while (entry != null);//遍历冲刷是否有取消的
    
                // All flushed so reset unflushedEntry
                unflushedEntry = null;//重置未冲刷的
            }
        }
    1. 首先拿到未刷新的头节点。
    2. 判 null 之后,将这个 unflushedEntry 赋值给 flushedEntry,而这里的判 null 是做什么呢?防止多次调用 flush 。
    3. 循环尝试设置这些节点,告诉他们不能做取消操作了,如果尝试失败了,就将这个节点取消,在调用 nioBuffers 方法的时候,这个节点会被忽略。同时将 totalPendingSize 相应的减小。

    设置之后,promise 调用 cancel 方法就会返回 false。

    在调用完 outboundBuffer.addFlush() 方法后,Channel 会调用 flush0 方法做真正的刷新。

    首先调用自己的flush0 方法

    AbstractNioUnsafe的flush0

    protected final void flush0() {
                // Flush immediately only when there's no pending flush.
                // If there's a pending flush operation, event loop will call forceFlush() later,
                // and thus there's no need to call it now.
                if (!isFlushPending()) {
                    super.flush0();
                }
            }

    AbstractNioUnsafe的isFlushPending

    判断下是否已经有待冲刷存在,也就是有设置OP_WRITE事件

    private boolean isFlushPending() {
                SelectionKey selectionKey = selectionKey();
                return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
            }

    这里的判断是:如果注册了写事件,就暂时不写了,因为缓冲区到了水位线了,所以这次直接返回,等会再写。等到 EventLoop 触发写事件了,就会调用 ch.unsafe().forceFlush() 方法将数据刷新到 TCP 缓冲区。NIO 的写事件大部分时候是不需要注册的,只有当 TCP 缓冲区达到水位线了,不能写入了,才需要注册写事件。当缓冲区有空间了,NIO 就会触发写事件。

    调用了父类的flush0方法,即AbstractUnsafe的flush0

    AbstractUnsafe的flush0

    protected void flush0() {
                if (inFlush0) {
                    // Avoid re-entrance
                    return;
                }
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                    return;
                }
    
                inFlush0 = true;
    
                // Mark all pending write requests as failure if the channel is inactive.
                if (!isActive()) {
                    try {
                        // Check if we need to generate the exception at all.
                        if (!outboundBuffer.isEmpty()) {
                            if (isOpen()) {
                                outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                            } else {
                                // Do not trigger channelWritabilityChanged because the channel is closed already.
                                outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                            }
                        }
                    } finally {
                        inFlush0 = false;
                    }
                    return;
                }
    
                try {
                    doWrite(outboundBuffer);
                } catch (Throwable t) {
                    handleWriteError(t);
                } finally {
                    inFlush0 = false;
                }
            }

    主要是doWrite(outboundBuffer);

    关键代码

    // 拿到NIO Socket
    SocketChannel ch = javaChannel();
    // 获取自旋的次数,默认16
    int writeSpinCount = config().getWriteSpinCount();
    // 获取设置的每个 ByteBuf 的最大字节数,这个数字来自操作系统的 so_sndbuf 定义
    int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
    // 调用 ChannelOutboundBuffer 的 nioBuffers 方法获取 ByteBuffer 数组,从flushedEntry开始,循环获取
    ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
    // ByteBuffer 的数量
    int nioBufferCnt = in.nioBufferCount();
    // 使用 NIO 写入 Socket 
    ch.write(buffer);
    // 调整最大字节数
    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
    // 删除 ChannelOutboundBuffer 中的 Entry
    in.removeBytes(localWrittenBytes);
    // 自旋减一,直到自旋小于0停止循环,当然如果 ChannelOutboundBuffer 空了,也会停止。
    --writeSpinCount;
    // 如果自旋16次还没有完成 flush,则创建一个任务放进mpsc 队列中执行。
    incompleteWrite(writeSpinCount < 0);

    ChannelOutboundBuffer的incrementPendingOutboundBytes

    如果对方 Socket 接收很慢,ChannelOutboundBuffer 就会积累很多的数据。并且这个 ChannelOutboundBuffer 是没有大小限制的链表。可能会导致 OOM,所以在 addMessage 方法的最后一行,incrementPendingOutboundBytes方法,会判断 totalPendingSize 的大小是否超过了高水位阈值(默认64 kb),如果超过,关闭写开关,调用 piepeline 的 fireChannelWritabilityChanged 方法可改变 flush 策略。

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
            if (size == 0) {
                return;
            }
    
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
            if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {//如果大于配置的16位大小
                setUnwritable(invokeLater);//设置不可写
            }
        }

    ChannelOutboundBuffer的setUnwritable

    将unwritable原子操作改为非0,然后触发fireChannelWritabilityChanged

    private void setUnwritable(boolean invokeLater) {
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue | 1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue == 0) {
                        fireChannelWritabilityChanged(invokeLater);
                    }
                    break;
                }
            }
        }

    如果是立即改变,就会调用pipeline.fireChannelWritabilityChanged();,就会从头结点开始传递这个事件,否则就给通道的事件循环提交个任务

    private void fireChannelWritabilityChanged(boolean invokeLater) {
            final ChannelPipeline pipeline = channel.pipeline();
            if (invokeLater) {
                Runnable task = fireChannelWritabilityChangedTask;
                if (task == null) {
                    fireChannelWritabilityChangedTask = task = new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelWritabilityChanged();
                        }
                    };
                }
                channel.eventLoop().execute(task);
            } else {
                pipeline.fireChannelWritabilityChanged();
            }
        }

    当 Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成的太快(以避免发生 OOM)或者可以在 Channel 变为再次可写时恢复写入。可以通过调用 Channel 的 isWritable 方法来检测 Channel 的可写性。与可写性相关的阈值可以通过 Channel.config().setWriteBufferHighWaterMark 和 Channel.config().setWriteBufferLowWaterMark 方法来设置,默认最小 32 kb,最大 64 kb。

    恢复可写状态,remove 的时候,或者 addFlush 是丢弃了某个节点,会对 totalPendingSize 进行削减,削减之后进行判断。如果 totalPendingSize 小于最低水位了。就恢复写入。

    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
            if (size == 0) {
                return;
            }
    
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
            if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
                setWritable(invokeLater);
            }
        }

    默认的情况下,ChannelOutboundBuffer 缓存区的大小最大是 64 kb,最小是 32 kb

    当不能写的时候,就会调用 ChannelWritabilityChanged 方法,用户可以在代码中,让写操作进行的慢一点。

    setWritable

    原子操作修改成可写状态:

    private void setWritable(boolean invokeLater) {
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue & ~1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue != 0 && newValue == 0) {
                        fireChannelWritabilityChanged(invokeLater);
                    }
                    break;
                }
            }
        }

    总结

    Netty 的 write 的操作不会立即写入,而是存储在了 ChannelOutboundBuffer 缓冲区里,这个缓冲区内部是 Entry 节点组成的链表结构,通过 addMessage 方法添加进链表,通过 addFlush 方法表示可以开始写入了,最后通过 SocketChannel 的 flush0 方法真正的写入到 JDK 的 Socket 中。同时需要注意如果 TCP 缓冲区到达一个水位线了,不能写入 TCP 缓冲区了,就需要晚点写入,这里的方法判断是 isFlushPending()。

    其中,有一个需要注意的点就是,如果对方接收数据较慢,可能导致缓冲区存在大量的数据无法释放,导致OOM,Netty 通过一个 isWritable 开关尝试解决此问题,但用户需要重写 ChannelWritabilityChanged 方法,因为一旦超过默认的高水位阈值,Netty 就会调用 ChannelWritabilityChanged 方法,执行完毕后,继续进行 flush。用户可以在该方法中尝试慢一点的操作。等到缓冲区的数据小于低水位的值时,开关就关闭了,就不会调用 ChannelWritabilityChanged 方法。因此,合理设置这两个数值也挺重要的。

  • 相关阅读:
    怎样解决:未找到路径“……”的控制器或该控制器未实现 IController?
    错误:org.springframework.jdbc.support.SQLErrorCodesFactory
    springbean的生命周期
    注解到处excel
    nio读取文件,输出文件
    AtomicReference
    唯一id
    hashmap1.7的死锁模拟
    数组模拟stack
    环形队列
  • 原文地址:https://www.cnblogs.com/xiaojiesir/p/15435301.html
Copyright © 2011-2022 走看看