zoukankan      html  css  js  c++  java
  • 发送数据:自适应写和连接写同样是为了解决什么问题

    发送数据:自适应写和连接写同样是为了解决什么问题

    Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

    发送数据和接收数据比较类似,可以将这两部分结合起来学习。 接收数据:自适应缓冲区和连接读是为了解决什么问题

    1. 主线分析

    1.1 写数据要点

    和读数据一样,写数据我们也会碰到以下问题:

    1. 每次写多少数据合适,这就是自适应写。
    2. 如何处理高并发,保证雨露均沾,这就是连续写。
    3. 不能写了怎么办,这就需要注册 OP_WRITE 事件。

    我们再看一下,Netty 是如何解决这几个问题的。这部分才是本小节内容的核心。

    1. 自适应写(maxBytesPerGatheringWrite)

      Netty 批量写数据时,如果想写的都写进去了,接下来的尝试写更多(调整 maxBytesPerGatheringWrite)。

    2. 连续写(writeSpinCount)

      同连接读一样,每个连接默认最多连续写 16 次,即使还有数据也暂时不处理了,先处理下一个连接。

    3. 注册 OP_WRITE 事件

      如果 socket sendbuf 已经写不动,那就注册 OP_WRITE 事件。当触发 OP_WRITE 事件时,则取消 OP_WRITE 事件,并继续写。

    4. 高低水位线(writeBufferWaterMark)

      Netty 待写数据太多,超过一定的水位线(writeBufferWaterMark.high()),会将可写的标志位改成
      false ,让应用自己做决定要不要发送数据了。

    1.2 主线

    Netty 中将发送数据分为两步,write 只写到缓冲区,只有调用 flush 才会真正发送数据。

    • Write:写数据到 buffer,ChannelOutboundBuffer#addMessage
    • Flush:发送 buffer 里面的数据,AbstractChannel.AbstractUnsafe#flush
      • 准备数据:ChannelOutboundBuffer#addFlush
      • 发送:NioSocketChannel#doWrite

    发送数据核心步骤有三步:addMessage、addFlush、doWrite。如下图所示:

    图1:发送数据分为两步:写缓冲区和刷新

    1.2 知识点

    (1)写的本质

    • Single write: sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer)
    • Gathering write:sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer[], int, int)

    (2)OP_WRITE vs OP_READ

    • OP_WRITE:调用 ch.unsafe().forceFlush() 刷新数据到 socket sendbuf 缓冲区。正常情况下不能注册,否则一直触发该事件。我们只是需要一个机制,在能写时通知程序可以继续写就可以了。
    • OP_READ:一直注册,只要有数据就读,读超过 defaultMaxMessagesPerRead 次则下次继续读。

    (3)写优化

    • 自适应写:批量写数据时,如果尝试写的都写进去了,接下来会尝试写更多(maxBytesPerGatheringWrite)。
    • 连续写:最多写 16 次(writeSpinCount),如果没有写完,就直接 schedule 一个 task 来继续写,而不是用注册写事件来触发,更简洁有力。
    • 水位线:待写数据太多,超过水位线 writeBufferWaterMark.high(),会将可写的标志位改成 false ,让应用端自己做决定要不要继续写。
      • ctx.channel().write() :从 TailContext 开始执行;
      • ctx.write() : 从当前的 Context 开始。

    2. 源码分析

    ctx#write
        -> HeadContext#write
            -> AbstractChannel.AbstractUnsafe#write
                -> ChannelOutboundBuffer#addMessage    # √ 添加到缓冲区
    ctx#flush
        -> HeadContext#flush
            -> AbstractChannel.AbstractUnsafe#flush
                -> ChannelOutboundBuffer#addFlush      # √ 将缓冲区的数据状态改成待发送状态
                -> flush0
                    -> NioSocketChannel#doWrite        # √ 核心逻辑,真正发送数据
    

    ChannelOutboundBuffer 缓冲区本质是一个单向链表,addMessage、addFlush、doWrite 会更新链表的状态。

    private Entry flushedEntry;     // 相当于队头。doWrite时将发送数据后并更新
    private Entry unflushedEntry;   // 指向第一个未刷新的数据。addFlush时更新
    private Entry tailEntry;        // 队尾。addMessage 添加到队尾并更新
    private int flushed;            // 表示flushedEntry~unflushedEntry中未刷新结点的个数
    

    说明: addFlush 更新 unflushedEntry 后,并不表示链表之前的缓冲区数据已经发送。只有调用 doWrite 才真正发送数据,并更新 flushedEntry,将结点从缓冲区中剔除。

    2.1 addMessage

    ctx#write 调用 addMessage 方法将 msg 添加到缓冲区。addMessage 完成了二件事:

    1. 将 msg 包装成 Entry,添加到 outboundBuffer 队尾。
    2. 判断是否超过缓冲区的高水位线,如果超过将可写标志设置为 false。如果应用程序不管三七二十一继续写,可能发生 OOM。
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        // 1. 添加到队尾,更新tailEntry
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
    
        // 2. 如果超过高水位线,将可写标志设置为false 
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
    

    说明: addMessage 的参数 size 为 msg 的大小,Netty 默认使用 DefaultMessageSizeEstimator,直接调用 ((ByteBuf) msg).readableBytes() 获取数据包的大小。高低水位线最后再来统一分析。

    2.2 addFlush

    ctx#flush 先调用 addFlush 更新 unflushedEntry,然后调用 unsafe.flush0() 将 flushedEntry ~ unflushedEntry 之间的数据刷新出去。addFlush 同样做了两件事:

    1. 更新 unflushedEntry 到队尾。
    2. 如果有结点 cancel 则重新判断水位线,是否将可写标志设置为 true。
    public void addFlush() {
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) 
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);
            unflushedEntry = null;
        }
    }
    

    2.3 doWrite

    unsafe.flush0() 实际上是调用 NioSocketChannel#doWrite 方法,将 flushedEntry ~ unflushedEntry 之间的数据刷新出去,同时更新 flushedEntry。这也是最复杂的一部分。doWrite 方法:

    1. writeSpinCount:连续写的最大次数,这个就再赘述了。
    2. 缓冲区为空:直接返回,同时取消 OP_WRITE 事件的注册。前面已经提到,OP_WRITE 事件其实上是调用 flush 将缓冲区的数据刷新出去,都没有数据了,当然也就不需要了。此时不会调用 incompleteWrite 方法。
    3. 将缓冲区的数据转换成 ByteBuffer(方便后面调用 API):每次最多写 maxBytesPerGatheringWrite,每次刷新时会调整 maxBytesPerGatheringWrite 值。
    4. 缓冲区有数据:按缓冲区 in 转换成 ByteBuffer[] 的数据长度分三种情况。
      • 长度为 0:表示要写的数据不是 ByteBuf,如 FileRegin。直接调用 doWrite0() 方法,本质和 ByteBuf 差不多,这里不会分析这种情况。
      • 长度为 1 或大于 1:其实这两种情况差不多,长度为 1 时调用 ch.write(buffer),长度大于 1 时调用 ch.write(nioBuffers, 0, nioBufferCnt),也就批量写。在这里我们只分析长度大于 1 的情况。
    5. 什么时候结束刷新操作(incompleteWrite)
      • 写次数超过 writeSpinCount:退出循环时 writeSpinCount = 0,即调用 incompleteWrite(false) 方法,注册一个 flushTask,flushTask 其实也是直接调用 unsafe().flush0(),空闲时继续刷新。
      • socket sendbuf 刷不动了:调用 incompleteWrite(true),注册 OP_WRITE 事件,等触发该事件后继续刷新。
      • 数据写完:即从第二步退出循环,同时取消 OP_WRITE 事件注册。此时不会调用 incompleteWrite 方法,incompleteWrite 从方法名称也知道当数据没有刷新完时调用。
    NioSocketChannel#doWrite
        -> ChannelOutboundBuffer#nioBuffers       # 将in中待刷新的数据转换成ByteBuffer[]
        -> SocketChannel#write                    # 真正刷新数据
        -> adjustMaxBytesPerGatheringWrite        # 动态调整下一次刷新的数据
        -> ChannelOutboundBuffer#removeBytes      # 将in中已经刷新的结点移除
        -> AbstractNioByteChannel#incompleteWrite # 处理未全部刷新完成的情况
        -> AbstractNioByteChannel#clearOpWrite    # 清除 OP_WRITE 事件
    

    doWrite 代码如下(有删减)

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        // 1. 将缓冲区in中的数据转换成ByteBuffer[]
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        int nioBufferCnt = in.nioBufferCount();
    
        // do...while 刷新缓冲区数据
        long attemptedBytes = in.nioBufferSize();
        // 2. socket api 刷新数据
        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
        // 3. socket sendbuf 刷不动后,调用incompleteWrite,并结点刷新
        if (localWrittenBytes <= 0) {
            incompleteWrite(true);
            return;
        }
        // 4. 动态调整每次最大刷新的数据量
        adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);
        // 5. 删除in中已经刷新的数据结点
        in.removeBytes(localWrittenBytes);
    }
    

    说明: in.nioBuffers 和 in.removeBytes 都是 ChannelOutboundBuffer 的操作,先不去管它。我们看一下,adjustMaxBytesPerGatheringWrite 和 incompleteWrite 这两个方法都做了些什么事。

    (1)adjustMaxBytesPerGatheringWrite

    // maxBytesPerGatheringWrite:默认是SO_SNDBUF大小
    // attempted表示尝试刷新的数据大小,而written表示真实刷新的数据大小。
    private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
        // 1. 扩大2倍。全部写满了,可能还有更多的数据需要写。
        if (attempted == written) {
            if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
                ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
            }
        // 2. 缩小2倍。attempted>4KB且真实写入的数据不到attempted的一半
        } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
            ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
        }
    }
    

    (2)incompleteWrite

    incompleteWrite 表示数据没有写完,又分两种情况:true 表示 socket sendbuf 写不了;false 表示连续写次数超过16次,提交flushTask,空闲时继续写。

    protected final void incompleteWrite(boolean setOpWrite) {
        // 1. socket sendbuf 写不了
        if (setOpWrite) {
            setOpWrite();
        // 2. 连续写次数超过16次,提交flushTask,空闲时继续写
        } else {
            clearOpWrite();
            eventLoop().execute(flushTask);
        }
    }
    

    2.4 ChannelOutboundBuffer

    • addMessage:将 msg 添加到缓冲区。
    • addFlush:更新 unflushedEntry,flushedEntry ~ unflushedEntry 之间的结点表示需要刷新的数据。
    • nioBuffers:将可刷新缓冲区的数据转换成 ByteBuffer[]。
    • removeBytes:删除已经刷新的结点,更新 flushedEntry。

    2.5 高低水位线

    WriteBufferWaterMark 中设置了默认的高低水位线,高水位线默认为 32KB,低水位线默认为 64KB。

    1. 32KB 是不是太小了。Netty 支持百万连接,如果连接过多,可能导致内存压力很大。
    2. 为什么要设置高低水位线两个阀值。避免写标志位频繁波动。
    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
    

    高低水位线设置代码如下:

    // 超过高水位线,可写标志设置成false
    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }
    
    // 超过低水位线,可写标志设置成true
    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            setWritable(invokeLater);
        }
    }
    

    3. 总结

    3.1 发送数据默认参数

    • 高水位线(high):64KB
    • 低水位线(low):32KB
    • 一次刷新数据的大小(maxBytesPerGatheringWrite):默认是 SO_SNDBUF 大小
    • 连续写最大次数(writeSpinCount):16

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    zbb20181207 springboot @ConfigurationProperties使用
    zbb20181206 logback,lombok 默认日志logback配置解析
    Spring Boot (8) 全局异常处理
    Spring Boot (7) JdbcTemplate访问数据库
    Spring Boot (6) Spring Data JPA
    Spring Boot (4) 静态页面和Thymeleaf模板
    Spring Boot (3) 热部署devtools
    Spring Boot (2) Restful风格接口
    Spring Boot (1) 构建第一个Spring Boot工程
    idea使用maven搭建ssm框架实现登陆商品增删改查
  • 原文地址:https://www.cnblogs.com/binarylei/p/12642938.html
Copyright © 2011-2022 走看看