ChannelOutboundBuffer
书接上文,NioSocketChannelUnsafe.write()方法,实际上调用的是AbstractUnsafe的write方法。
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { try { // release message now to prevent resource-leak ReferenceCountUtil.release(msg); } finally { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)")); } return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { try { ReferenceCountUtil.release(msg); } finally { safeSetFailure(promise, t); } return; } outboundBuffer.addMessage(msg, size, promise); }
它使用了ChannelOutboundBuffer,ChannelOutboundBuffer是一个通道的出站缓冲区,所有要写的数据都会先存在这里,等到要刷新的时候才会真的写出去。每个 ChannelSocket 的 Unsafe 都有一个绑定的 ChannelOutboundBuffer , Netty 向站外输出数据的过程统一通过 ChannelOutboundBuffer 类进行封装,目的是为了提高网络的吞吐量,在外面调用 write 的时候,数据并没有写到 Socket,而是写到了 ChannelOutboundBuffer 这里,当调用 flush 的时候,才真正的向 Socket 写出。每个Unsafe都会绑定一个ChannelOutboundBuffer,也就是说每个客户端连接上服务端都会创建一个ChannelOutboundBuffer绑定客户端Channel。Netty设计ChannelOutboundBuffer是为了减少TCP缓存的压力提高系统的吞吐率。
看下属性
消息都是封装成内部的Entry类的,存储结构是一个单链表。其实他是一个对象池,可以复用:
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =//出站实体的额外开销96字节 SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96); //保存线程对应的缓冲区,默认是1024个ByteBuffer数组,FastThreadLocal比一般的ThreadLocal要快,他是利用数组,内部用的是常量索引的数组,不是hash算法 private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() { @Override protected ByteBuffer[] initialValue() throws Exception { return new ByteBuffer[1024]; } }; // 单链表结构 // The Entry that is the first in the linked-list structure that was flushed private Entry flushedEntry;//第一个要冲刷的实体 // The Entry which is the first unflushed in the linked-list structure private Entry unflushedEntry;//第一个未冲刷的实体 // The Entry which represents the tail of the buffer private Entry tailEntry;//尾结点实体 // The number of flushed entries that are not written yet private int flushed;//要冲刷的数量,但是还没真正冲刷出去,就是出站缓冲区大小 private int nioBufferCount;//可以冲刷的缓冲区个数 private long nioBufferSize;//可以写出的总的缓冲区数组数据大小 private boolean inFail;//是否冲刷失败 //原子操作totalPendingSize private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); @SuppressWarnings("UnusedDeclaration") private volatile long totalPendingSize;//待冲刷缓冲区的字节总数 //原子操作unwritable private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); @SuppressWarnings("UnusedDeclaration") private volatile int unwritable; private volatile Runnable fireChannelWritabilityChangedTask;//写能力改变的任务
Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
flushedEntry(包括)到unflushedEntry之间的就是待发送数据,unflushedEntry(包括)到tailEntry就是暂存数据,flushed就是待发送数据个数。
调用 addMessage 方法的时候,创建一个 Entry ,将这个 Entry 追加到 TailEntry 节点后面,调用 addFlush 的时候,将 unflushedEntry 的引用赋给 flushedEntry,表示即将从这里开始刷新。然后将 unflushedEntry 置为 null。当数据被写进 Socket 后,从 flushedEntry(current) 节点开始,循环将每个节点删除。
ChannelOutboundBuffer的addMessage
将直接缓冲区添加到出站缓冲区中,不过是会创建一个实体Entry,然后用一个单链表结构来存取的
public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise);//创建entry节点 if (tailEntry == null) { flushedEntry = null; } else { Entry tail = tailEntry; tail.next = entry; } tailEntry = entry; if (unflushedEntry == null) { unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false);//增加待冲刷的消息 }
- 根据 ByteBuf 相互属性和 promise 创建一个 Entry 节点。
- 将新的节点追加到 tailEntry 节点上。如果考虑之前的全部被清空了话,则新节点就是唯一节点,unflushedEntry 属性就是新的节点。可对照上面的图来看。
- 使用 CAS 将 totalPendingSize(总的数据大小) 属性增加 Entry 实例的大小(96 字节) + 真实数据的大小。
内部类Entry
static final class Entry {
//Entry对象池 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() { @Override public Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }); private final Handle<Entry> handle;//池化操作的处理器 Entry next;//链表的下一个 Object msg;//信息 ByteBuffer[] bufs;//缓存字节缓冲区数组,为了复用提高效率 ByteBuffer buf;//缓存字节缓冲区,为了复用提高效率 ChannelPromise promise;//回调 long progress;//当前进度,即已经传了多少数据 long total;//总共的数据大小 int pendingSize;//待冲刷的评估大小,要加上96 int count = -1; boolean cancelled;//是否被取消了 private Entry(Handle<Entry> handle) { this.handle = handle; } //用对象池的方式创建实体 static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get();//从池子里获取 entry.msg = msg;//消息 entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;//评估的大小 entry.total = total;//具体大小 entry.promise = promise; return entry; } //取消了,返回待冲刷的评估大小 int cancel() { if (!cancelled) { cancelled = true;//取消标识 int pSize = pendingSize; // release message and replace with an empty buffer ReferenceCountUtil.safeRelease(msg);//释放 msg = Unpooled.EMPTY_BUFFER; pendingSize = 0; total = 0; progress = 0; bufs = null; buf = null; return pSize; } return 0; } //用完初始化后再放回收到池子里 void recycle() { next = null; bufs = null; buf = null; msg = null; promise = null; progress = 0; total = 0; pendingSize = 0; count = -1; cancelled = false; handle.recycle(this); } //回收当前实体并获取下一个实体,为什么要先获取下一个再回收呢,因为回收的时候把next设置null Entry recycleAndGetNext() { Entry next = this.next; recycle(); return next; } }
创建Entry节点
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD; entry.total = total; entry.promise = promise; return entry; }
方法RECYCLER.get();
private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() { @Override public Entry newObject(Handle<Entry> handle) { return new Entry(handle); } });
public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) { return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator")); } private static final class RecyclerObjectPool<T> extends ObjectPool<T> { private final Recycler<T> recycler; RecyclerObjectPool(final ObjectCreator<T> creator) { recycler = new Recycler<T>() { @Override protected T newObject(Handle<T> handle) { return creator.newObject(handle); } }; } @Override public T get() { return recycler.get(); } }
最终调用Recycler.get()
public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
而newObject(handle)调用的即是
也就是
这块有点绕,用的是匿名内部类。具体流程就是:
Netty 将在 ThreadLocalMap 中存储了一个 Stack (栈)对象,存储重复使用的 DefaultHandle 实例,该实例的 value 属性就是 Entry ,所以这个 Entry 也是重复使用的,每次用完所有参数置为 null,再返回到栈中,下次再用,从这个栈中弹出。重复利用。对象池的最佳实践。而且是保存再线程中,速度更快,不会有线程竞争。
ChannelOutboundBuffer的addFlush
当 addMessage 成功添加进 ChannelOutboundBuffer 后,就需要 flush 刷新到 Socket 中去。但是这个方法并不是做刷新到 Socket 的操作。而是将暂存数据节点变成待发送节点,需要发送的数据,是flushedEntry指向的节点到unflushedEntry指向的节点(不包含unflushedEntry)的之间的节点数据,unflushedEntry 的引用转移到 flushedEntry 引用中,表示即将刷新这个 flushedEntry,下次发送要将flushedEntry指向unflushedEntry指向的节点作为发送数据的起始节点。 Netty 提供了 promise,这个对象可以做取消操作,例如,不发送这个 ByteBuf 了,所以,在 write 之后,flush 之前需要告诉 promise 不能做取消操作了。
public void addFlush() { Entry entry = unflushedEntry;//第一个没冲刷的数据,也是链表的第一个 if (entry != null) {//有数据才刷了 if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry;//设置第一个要冲刷的实体 } do { flushed ++;//冲刷数+1 if (!entry.promise.setUncancellable()) {//如果取消的话需要回收内存 // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null);//遍历冲刷是否有取消的 // All flushed so reset unflushedEntry unflushedEntry = null;//重置未冲刷的 } }
- 首先拿到未刷新的头节点。
- 判 null 之后,将这个 unflushedEntry 赋值给 flushedEntry,而这里的判 null 是做什么呢?防止多次调用 flush 。
- 循环尝试设置这些节点,告诉他们不能做取消操作了,如果尝试失败了,就将这个节点取消,在调用 nioBuffers 方法的时候,这个节点会被忽略。同时将 totalPendingSize 相应的减小。
设置之后,promise 调用 cancel 方法就会返回 false。
在调用完 outboundBuffer.addFlush() 方法后,Channel 会调用 flush0 方法做真正的刷新。
首先调用自己的flush0 方法
AbstractNioUnsafe的flush0
protected final void flush0() { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. if (!isFlushPending()) { super.flush0(); } }
AbstractNioUnsafe的isFlushPending
判断下是否已经有待冲刷存在,也就是有设置OP_WRITE事件
private boolean isFlushPending() { SelectionKey selectionKey = selectionKey(); return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; }
这里的判断是:如果注册了写事件,就暂时不写了,因为缓冲区到了水位线了,所以这次直接返回,等会再写。等到 EventLoop 触发写事件了,就会调用 ch.unsafe().forceFlush()
方法将数据刷新到 TCP 缓冲区。NIO 的写事件大部分时候是不需要注册的,只有当 TCP 缓冲区达到水位线了,不能写入了,才需要注册写事件。当缓冲区有空间了,NIO 就会触发写事件。
调用了父类的flush0方法,即AbstractUnsafe的flush0
AbstractUnsafe的flush0
protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { // Check if we need to generate the exception at all. if (!outboundBuffer.isEmpty()) { if (isOpen()) { outboundBuffer.failFlushed(new NotYetConnectedException(), true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false); } } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { handleWriteError(t); } finally { inFlush0 = false; } }
主要是doWrite(outboundBuffer);
关键代码
// 拿到NIO Socket SocketChannel ch = javaChannel(); // 获取自旋的次数,默认16 int writeSpinCount = config().getWriteSpinCount(); // 获取设置的每个 ByteBuf 的最大字节数,这个数字来自操作系统的 so_sndbuf 定义 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); // 调用 ChannelOutboundBuffer 的 nioBuffers 方法获取 ByteBuffer 数组,从flushedEntry开始,循环获取 ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); // ByteBuffer 的数量 int nioBufferCnt = in.nioBufferCount(); // 使用 NIO 写入 Socket ch.write(buffer); // 调整最大字节数 adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); // 删除 ChannelOutboundBuffer 中的 Entry in.removeBytes(localWrittenBytes); // 自旋减一,直到自旋小于0停止循环,当然如果 ChannelOutboundBuffer 空了,也会停止。 --writeSpinCount; // 如果自旋16次还没有完成 flush,则创建一个任务放进mpsc 队列中执行。 incompleteWrite(writeSpinCount < 0);
ChannelOutboundBuffer的incrementPendingOutboundBytes
如果对方 Socket 接收很慢,ChannelOutboundBuffer 就会积累很多的数据。并且这个 ChannelOutboundBuffer 是没有大小限制的链表。可能会导致 OOM,所以在 addMessage 方法的最后一行,incrementPendingOutboundBytes方法,会判断 totalPendingSize 的大小是否超过了高水位阈值(默认64 kb),如果超过,关闭写开关,调用 piepeline 的 fireChannelWritabilityChanged 方法可改变 flush 策略。
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {//如果大于配置的16位大小 setUnwritable(invokeLater);//设置不可写 } }
ChannelOutboundBuffer的setUnwritable
将unwritable原子操作改为非0
,然后触发fireChannelWritabilityChanged
private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0) { fireChannelWritabilityChanged(invokeLater); } break; } } }
如果是立即改变,就会调用pipeline.fireChannelWritabilityChanged();
,就会从头结点开始传递这个事件,否则就给通道的事件循环提交个任务
private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } }
当 Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成的太快(以避免发生 OOM)或者可以在 Channel 变为再次可写时恢复写入。可以通过调用 Channel 的 isWritable 方法来检测 Channel 的可写性。与可写性相关的阈值可以通过 Channel.config().setWriteBufferHighWaterMark 和 Channel.config().setWriteBufferLowWaterMark 方法来设置,默认最小 32 kb,最大 64 kb。
恢复可写状态,remove 的时候,或者 addFlush 是丢弃了某个节点,会对 totalPendingSize 进行削减,削减之后进行判断。如果 totalPendingSize 小于最低水位了。就恢复写入。
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } }
默认的情况下,ChannelOutboundBuffer 缓存区的大小最大是 64 kb,最小是 32 kb
当不能写的时候,就会调用 ChannelWritabilityChanged 方法,用户可以在代码中,让写操作进行的慢一点。
setWritable
原子操作修改成可写状态:
private void setWritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue & ~1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(invokeLater); } break; } } }
总结
Netty 的 write 的操作不会立即写入,而是存储在了 ChannelOutboundBuffer 缓冲区里,这个缓冲区内部是 Entry 节点组成的链表结构,通过 addMessage 方法添加进链表,通过 addFlush 方法表示可以开始写入了,最后通过 SocketChannel 的 flush0 方法真正的写入到 JDK 的 Socket 中。同时需要注意如果 TCP 缓冲区到达一个水位线了,不能写入 TCP 缓冲区了,就需要晚点写入,这里的方法判断是 isFlushPending()。
其中,有一个需要注意的点就是,如果对方接收数据较慢,可能导致缓冲区存在大量的数据无法释放,导致OOM,Netty 通过一个 isWritable 开关尝试解决此问题,但用户需要重写 ChannelWritabilityChanged 方法,因为一旦超过默认的高水位阈值,Netty 就会调用 ChannelWritabilityChanged 方法,执行完毕后,继续进行 flush。用户可以在该方法中尝试慢一点的操作。等到缓冲区的数据小于低水位的值时,开关就关闭了,就不会调用 ChannelWritabilityChanged 方法。因此,合理设置这两个数值也挺重要的。