zoukankan      html  css  js  c++  java
  • 发送数据:自适应写和连接写同样是为了解决什么问题

    发送数据:自适应写和连接写同样是为了解决什么问题

    Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

    发送数据和接收数据比较类似,可以将这两部分结合起来学习。 接收数据:自适应缓冲区和连接读是为了解决什么问题

    1. 主线分析

    1.1 写数据要点

    和读数据一样,写数据我们也会碰到以下问题:

    1. 每次写多少数据合适,这就是自适应写。
    2. 如何处理高并发,保证雨露均沾,这就是连续写。
    3. 不能写了怎么办,这就需要注册 OP_WRITE 事件。

    我们再看一下,Netty 是如何解决这几个问题的。这部分才是本小节内容的核心。

    1. 自适应写(maxBytesPerGatheringWrite)

      Netty 批量写数据时,如果想写的都写进去了,接下来的尝试写更多(调整 maxBytesPerGatheringWrite)。

    2. 连续写(writeSpinCount)

      同连接读一样,每个连接默认最多连续写 16 次,即使还有数据也暂时不处理了,先处理下一个连接。

    3. 注册 OP_WRITE 事件

      如果 socket sendbuf 已经写不动,那就注册 OP_WRITE 事件。当触发 OP_WRITE 事件时,则取消 OP_WRITE 事件,并继续写。

    4. 高低水位线(writeBufferWaterMark)

      Netty 待写数据太多,超过一定的水位线(writeBufferWaterMark.high()),会将可写的标志位改成
      false ,让应用自己做决定要不要发送数据了。

    1.2 主线

    Netty 中将发送数据分为两步,write 只写到缓冲区,只有调用 flush 才会真正发送数据。

    • Write:写数据到 buffer,ChannelOutboundBuffer#addMessage
    • Flush:发送 buffer 里面的数据,AbstractChannel.AbstractUnsafe#flush
      • 准备数据:ChannelOutboundBuffer#addFlush
      • 发送:NioSocketChannel#doWrite

    发送数据核心步骤有三步:addMessage、addFlush、doWrite。如下图所示:

    图1:发送数据分为两步:写缓冲区和刷新

    1.2 知识点

    (1)写的本质

    • Single write: sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer)
    • Gathering write:sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer[], int, int)

    (2)OP_WRITE vs OP_READ

    • OP_WRITE:调用 ch.unsafe().forceFlush() 刷新数据到 socket sendbuf 缓冲区。正常情况下不能注册,否则一直触发该事件。我们只是需要一个机制,在能写时通知程序可以继续写就可以了。
    • OP_READ:一直注册,只要有数据就读,读超过 defaultMaxMessagesPerRead 次则下次继续读。

    (3)写优化

    • 自适应写:批量写数据时,如果尝试写的都写进去了,接下来会尝试写更多(maxBytesPerGatheringWrite)。
    • 连续写:最多写 16 次(writeSpinCount),如果没有写完,就直接 schedule 一个 task 来继续写,而不是用注册写事件来触发,更简洁有力。
    • 水位线:待写数据太多,超过水位线 writeBufferWaterMark.high(),会将可写的标志位改成 false ,让应用端自己做决定要不要继续写。
      • ctx.channel().write() :从 TailContext 开始执行;
      • ctx.write() : 从当前的 Context 开始。

    2. 源码分析

    ctx#write
        -> HeadContext#write
            -> AbstractChannel.AbstractUnsafe#write
                -> ChannelOutboundBuffer#addMessage    # √ 添加到缓冲区
    ctx#flush
        -> HeadContext#flush
            -> AbstractChannel.AbstractUnsafe#flush
                -> ChannelOutboundBuffer#addFlush      # √ 将缓冲区的数据状态改成待发送状态
                -> flush0
                    -> NioSocketChannel#doWrite        # √ 核心逻辑,真正发送数据
    

    ChannelOutboundBuffer 缓冲区本质是一个单向链表,addMessage、addFlush、doWrite 会更新链表的状态。

    private Entry flushedEntry;     // 相当于队头。doWrite时将发送数据后并更新
    private Entry unflushedEntry;   // 指向第一个未刷新的数据。addFlush时更新
    private Entry tailEntry;        // 队尾。addMessage 添加到队尾并更新
    private int flushed;            // 表示flushedEntry~unflushedEntry中未刷新结点的个数
    

    说明: addFlush 更新 unflushedEntry 后,并不表示链表之前的缓冲区数据已经发送。只有调用 doWrite 才真正发送数据,并更新 flushedEntry,将结点从缓冲区中剔除。

    2.1 addMessage

    ctx#write 调用 addMessage 方法将 msg 添加到缓冲区。addMessage 完成了二件事:

    1. 将 msg 包装成 Entry,添加到 outboundBuffer 队尾。
    2. 判断是否超过缓冲区的高水位线,如果超过将可写标志设置为 false。如果应用程序不管三七二十一继续写,可能发生 OOM。
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        // 1. 添加到队尾,更新tailEntry
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
    
        // 2. 如果超过高水位线,将可写标志设置为false 
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
    

    说明: addMessage 的参数 size 为 msg 的大小,Netty 默认使用 DefaultMessageSizeEstimator,直接调用 ((ByteBuf) msg).readableBytes() 获取数据包的大小。高低水位线最后再来统一分析。

    2.2 addFlush

    ctx#flush 先调用 addFlush 更新 unflushedEntry,然后调用 unsafe.flush0() 将 flushedEntry ~ unflushedEntry 之间的数据刷新出去。addFlush 同样做了两件事:

    1. 更新 unflushedEntry 到队尾。
    2. 如果有结点 cancel 则重新判断水位线,是否将可写标志设置为 true。
    public void addFlush() {
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) 
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);
            unflushedEntry = null;
        }
    }
    

    2.3 doWrite

    unsafe.flush0() 实际上是调用 NioSocketChannel#doWrite 方法,将 flushedEntry ~ unflushedEntry 之间的数据刷新出去,同时更新 flushedEntry。这也是最复杂的一部分。doWrite 方法:

    1. writeSpinCount:连续写的最大次数,这个就再赘述了。
    2. 缓冲区为空:直接返回,同时取消 OP_WRITE 事件的注册。前面已经提到,OP_WRITE 事件其实上是调用 flush 将缓冲区的数据刷新出去,都没有数据了,当然也就不需要了。此时不会调用 incompleteWrite 方法。
    3. 将缓冲区的数据转换成 ByteBuffer(方便后面调用 API):每次最多写 maxBytesPerGatheringWrite,每次刷新时会调整 maxBytesPerGatheringWrite 值。
    4. 缓冲区有数据:按缓冲区 in 转换成 ByteBuffer[] 的数据长度分三种情况。
      • 长度为 0:表示要写的数据不是 ByteBuf,如 FileRegin。直接调用 doWrite0() 方法,本质和 ByteBuf 差不多,这里不会分析这种情况。
      • 长度为 1 或大于 1:其实这两种情况差不多,长度为 1 时调用 ch.write(buffer),长度大于 1 时调用 ch.write(nioBuffers, 0, nioBufferCnt),也就批量写。在这里我们只分析长度大于 1 的情况。
    5. 什么时候结束刷新操作(incompleteWrite)
      • 写次数超过 writeSpinCount:退出循环时 writeSpinCount = 0,即调用 incompleteWrite(false) 方法,注册一个 flushTask,flushTask 其实也是直接调用 unsafe().flush0(),空闲时继续刷新。
      • socket sendbuf 刷不动了:调用 incompleteWrite(true),注册 OP_WRITE 事件,等触发该事件后继续刷新。
      • 数据写完:即从第二步退出循环,同时取消 OP_WRITE 事件注册。此时不会调用 incompleteWrite 方法,incompleteWrite 从方法名称也知道当数据没有刷新完时调用。
    NioSocketChannel#doWrite
        -> ChannelOutboundBuffer#nioBuffers       # 将in中待刷新的数据转换成ByteBuffer[]
        -> SocketChannel#write                    # 真正刷新数据
        -> adjustMaxBytesPerGatheringWrite        # 动态调整下一次刷新的数据
        -> ChannelOutboundBuffer#removeBytes      # 将in中已经刷新的结点移除
        -> AbstractNioByteChannel#incompleteWrite # 处理未全部刷新完成的情况
        -> AbstractNioByteChannel#clearOpWrite    # 清除 OP_WRITE 事件
    

    doWrite 代码如下(有删减)

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        // 1. 将缓冲区in中的数据转换成ByteBuffer[]
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        int nioBufferCnt = in.nioBufferCount();
    
        // do...while 刷新缓冲区数据
        long attemptedBytes = in.nioBufferSize();
        // 2. socket api 刷新数据
        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
        // 3. socket sendbuf 刷不动后,调用incompleteWrite,并结点刷新
        if (localWrittenBytes <= 0) {
            incompleteWrite(true);
            return;
        }
        // 4. 动态调整每次最大刷新的数据量
        adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);
        // 5. 删除in中已经刷新的数据结点
        in.removeBytes(localWrittenBytes);
    }
    

    说明: in.nioBuffers 和 in.removeBytes 都是 ChannelOutboundBuffer 的操作,先不去管它。我们看一下,adjustMaxBytesPerGatheringWrite 和 incompleteWrite 这两个方法都做了些什么事。

    (1)adjustMaxBytesPerGatheringWrite

    // maxBytesPerGatheringWrite:默认是SO_SNDBUF大小
    // attempted表示尝试刷新的数据大小,而written表示真实刷新的数据大小。
    private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
        // 1. 扩大2倍。全部写满了,可能还有更多的数据需要写。
        if (attempted == written) {
            if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
                ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
            }
        // 2. 缩小2倍。attempted>4KB且真实写入的数据不到attempted的一半
        } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
            ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
        }
    }
    

    (2)incompleteWrite

    incompleteWrite 表示数据没有写完,又分两种情况:true 表示 socket sendbuf 写不了;false 表示连续写次数超过16次,提交flushTask,空闲时继续写。

    protected final void incompleteWrite(boolean setOpWrite) {
        // 1. socket sendbuf 写不了
        if (setOpWrite) {
            setOpWrite();
        // 2. 连续写次数超过16次,提交flushTask,空闲时继续写
        } else {
            clearOpWrite();
            eventLoop().execute(flushTask);
        }
    }
    

    2.4 ChannelOutboundBuffer

    • addMessage:将 msg 添加到缓冲区。
    • addFlush:更新 unflushedEntry,flushedEntry ~ unflushedEntry 之间的结点表示需要刷新的数据。
    • nioBuffers:将可刷新缓冲区的数据转换成 ByteBuffer[]。
    • removeBytes:删除已经刷新的结点,更新 flushedEntry。

    2.5 高低水位线

    WriteBufferWaterMark 中设置了默认的高低水位线,高水位线默认为 32KB,低水位线默认为 64KB。

    1. 32KB 是不是太小了。Netty 支持百万连接,如果连接过多,可能导致内存压力很大。
    2. 为什么要设置高低水位线两个阀值。避免写标志位频繁波动。
    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
    

    高低水位线设置代码如下:

    // 超过高水位线,可写标志设置成false
    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }
    
    // 超过低水位线,可写标志设置成true
    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            setWritable(invokeLater);
        }
    }
    

    3. 总结

    3.1 发送数据默认参数

    • 高水位线(high):64KB
    • 低水位线(low):32KB
    • 一次刷新数据的大小(maxBytesPerGatheringWrite):默认是 SO_SNDBUF 大小
    • 连续写最大次数(writeSpinCount):16

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    要学习TINY框架要有什么前提条件?
    如何获取最新的代码?
    python 反射的用法
    面试题1
    Process多进程的创建方法
    异常捕捉
    用type动态创建Form
    ModelForm的使用
    git 常见命令
    TVTK库的安装
  • 原文地址:https://www.cnblogs.com/binarylei/p/12642938.html
Copyright © 2011-2022 走看看