zoukankan      html  css  js  c++  java
  • ByteBuf & AbstractReferenceCountedByteBuf & UnpooledHeapByteBuf

     <pre>
          +-------------------+------------------+------------------+
          | discardable bytes |  readable bytes  |  writable bytes  |
          |                   |     (CONTENT)    |                  |
          +-------------------+------------------+------------------+
          |                   |                  |                  |
          0      <=      readerIndex   <=   writerIndex    <=    capacity
     </pre>
    

    AbstractByteBuf


    主要成员变量

    // 日志 
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractByteBuf.class);
        private static final String PROP_MODE = "io.netty.buffer.bytebuf.checkAccessible";
        private static final boolean checkAccessible;
    
        static {
            checkAccessible = SystemPropertyUtil.getBoolean(PROP_MODE, true);
            if (logger.isDebugEnabled()) {
                logger.debug("-D{}: {}", PROP_MODE, checkAccessible);
            }
        }
    // 所有的子类都是共享同一个对象的
        static final ResourceLeakDetector<ByteBuf> leakDetector =
                ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);
    
        int readerIndex;
        int writerIndex;
        private int markedReaderIndex;
        private int markedWriterIndex;
        private int maxCapacity;
    

    很显然缓冲区的实现由子类自行定义

    读操作簇

    检查流程 【ensuerAccessible()验证是否有效内存引用 -- 边界检查 】--> 获取字节内容 --> readerIndex增加

    写操作簇

    检查流程 【参数自身边界校验 -- ensuerAccessible()验证是否有效内存引用 -- 参数相对边界检查 -- 调用alloc对象的calculateNewCapacity方法判断扩容 -- 调整capacity 】--> 写入字节内容 --> writerIndex增加

    参数是当前所需最小的容量和允许的最大容量,返回调整后的容量值

    public int calculateNewCapacity(int minNewCapacity, int maxCapacity)

    • 参数自身边界大于等于0检查
    • 参数相对于最大容量的边界检查
    • threshold=4MB
      • equal just return
      • Not over threshold. Double up to 4 MiB, starting from 64.
      • If over threshold, do not double but just increase by threshold.

    扩容的策略是设定了threshold,当然这个值是netty自己设定的一个经验值,如果新的容量需求小于该值,从64B开始进行double增长,这样的倍增操作不会带来太多内存浪费,但是如果大于threshold则不适合double,因为容量变得更大可能有效利用空间率会降低,所以按照threshold的倍数进行平滑的扩张。

    操作索引

    markreset对应的index操作,比较简单

    重用缓冲区

    discard的相关函数

     <pre>
      BEFORE discardReadBytes()
    
          +-------------------+------------------+------------------+
          | discardable bytes |  readable bytes  |  writable bytes  |
          +-------------------+------------------+------------------+
          |                   |                  |                  |
          0      <=      readerIndex   <=   writerIndex    <=    capacity
    
    
      AFTER discardReadBytes()
    
          +------------------+--------------------------------------+
          |  readable bytes  |    writable bytes (got more space)   |
          +------------------+--------------------------------------+
          |                  |                                      |
     readerIndex (0) <= writerIndex (decreased)        <=        capacity
     </pre>
    

    AbstractReferenceCountedByteBuf


    主要成员变量

    // 如命名所指的功能,进行特定的volatile变量进行原子性操作
    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
            AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
    
    private volatile int refCnt;
    

    核心操作

    retainrelease 分别指向引用计数的增加和减少

    PR改进方案进展

    Original
    • volatile 变量 refCnt
    • CAS + loop 的方式实现引用计数的原子性增加和减少

    特点

    • 实现简单,能够确保计数不会overflow,double free,revive the object
    • 耗时明显,容易进行争夺
    private volatile int refCnt;
    
    private ByteBuf retain0(int increment) {
            for (;;) {
                int refCnt = this.refCnt;
                final int nextCnt = refCnt + increment;
    
                // Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
                if (nextCnt <= increment) {
                    throw new IllegalReferenceCountException(refCnt, increment);
                }
                if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
                    break;
                }
                }
            return this;
        }
    
     private boolean release0(int decrement) {
            for (;;) {
                int refCnt = this.refCnt;
                if (refCnt < decrement) {
                    throw new IllegalReferenceCountException(refCnt, -decrement);
                }
    
                if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
                    if (refCnt == decrement) {
                        deallocate();
                        return true;
                    }
                    return false;
                }
            }
     }
    
    Optimistically update ref counts
    Motivation

    与其用悲观锁的方式不如用乐观锁的方式。当update失败我们可以进行回滚。大多数情况下计数的增加是正确的。

    这个改进是用getAndAdd替换掉compareAndSet,这会引发不同的cpu指令(CMPXCHG to XADD)。因为CPU知道它将会修改内存所以会避免竞态的发生。

    在高竞态下将会有两倍的速度的提升。

    这个新的方法有一个负面的影响,那就是引用计数可能会临时进入一个错误的状态。而在并发情况下可能导致其他程序发生错误。比如:

    Time 1 Thread 1: obj.retain(INT_MAX - 1)
    Time 2 Thread 1: obj.retain(2)
    Time 2 Thread 2: obj.retain(1)

    之前的方式线程1总会失败而线程2总会成功。但是现在会导致线程2可能发生在线程1进行回滚的时候从而发生失败。

    这是可以接受

    • Buggy code is going to have bugs.让bug提前发生。
    • 如果count需要很大那么不如直接使用long
    • 竞态的发生会大幅度降低,所以说这个案例发生的可能性同比也会降低
    • 就算出错,有回滚操作所以也不会导致错误。
    Modification

    compareAndSet to getAndAdd

    Result

    Faster refcounting

    private ByteBuf retain0(final int increment) {
        int oldRef = refCntUpdater.getAndAdd(this, increment);
        if (oldRef <= 0 || oldRef + increment < oldRef) {
            // Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
            refCntUpdater.getAndAdd(this, -increment);
            throw new IllegalReferenceCountException(oldRef, increment);
        }
        return this;
    }
    
      private boolean release0(int decrement) {
            int oldRef = refCntUpdater.getAndAdd(this, -decrement);
            if (oldRef == decrement) {
                deallocate();
                return true;
            } else if (oldRef < decrement || oldRef - decrement > oldRef) {
                // Ensure we don't over-release, and avoid underflow.
                refCntUpdater.getAndAdd(this, decrement);
                throw new IllegalReferenceCountException(oldRef, decrement);
            }
            return false;
        }
    
    Use a non-volatile read for ensureAccessible()
    Motivation

    这个热点的调用的函数,之前做法是通过volatile读取,会引发大量的上下文切换等。如果改成non-volatile将会降低上下文切换并且利于内联。

    核心观点的该函数只是一个best-effort目的,即使通过了并发有函数使得buffer引用计数置为无效也是无法规避的。那么与其如此不如提高效率,而且大多数buffer是处于单线程内。

    Modification
    • try to do a non-volatile read using sun.misc.Unsafe if we can use it
    • Add a benchmark
    Result

    Big performance win when multiple ByteBuf methods are called from a method.

    // 通过类内参数的偏移量去取 
    protected final void ensureAccessible() {
            if (checkAccessible && internalRefCnt() == 0) {
                throw new IllegalReferenceCountException(0);
            }
        }
    
    Harden ref-counting concurrency semantics

    起始是改成乐观锁模式后带来的来自存在可能的并发边界问题

    Possible race condition on AbstractReferenceCounted

    问题在于所有的修改都是先进行修改,那么将会存在一种情况,refCnt为0的时候被retian函数有效增加为1这段时间,因为不好判断是被释放了还是overflow的判断条件内。

    所以接下来的修改,将腾出最后一位bit进行标识是否释放,从而能够保证只释放一次。

    Motivation

    这个PR的改进是提供一个较强的并发语境。竞态仍然存在但是存在的可能性非常小,比如释放的同时保持操作发生溢出,但是一些保障措施仍然有效。当一个release成功返回true,那么能够保障后续的retain或者release调用会抛出异常,并且deallocate只会被执行一次。

    Modification
    • 内部偶数表示有效的refcount
    • “最后”一次执行release会将refcount改成表示失效的奇数(和0一样)
    • retain使用getAndAddrelease使用CAS loop
    • first cas attempt uses non-volatile read
    • Thread.yield() after a failed CAS providers a net gain
    Result

    更加坚固的并发语境,高竞争下有延迟但还是比之前逻辑快了两倍

     // even => "real" refcount is (refCnt >>> 1); odd => "real" refcount is 0
        @SuppressWarnings("unused")
        private volatile int refCnt = 2;
    
     // 返回真实值,因为内部使用偶数表示 
    	private static int realRef(int refCnt) {
            return (refCnt & 1) != 0 ? 0 : refCnt >>> 1;
    	}
    

    0作为一个失效边不适合在竞态中,除非CAS,否则0很难判断是release过没有。

    这个优化方案是利用最后一位来标定是否释放过,所以不能使用最后一位,所有的计数要进行乘二处理。

    本次修改将很多条件拆分成私有函数有利于内联。同时注意命名的规范性。

    private ByteBuf retain0(final int increment) {
    // all changes to the raw count are 2x the "real" change
            int adjustedIncrement = increment << 1; // overflow OK here,因为符号位也可以用来存储
            int oldRef = refCntUpdater.getAndAdd(this, adjustedIncrement);
            if ((oldRef & 1) != 0) {
                throw new IllegalReferenceCountException(0, increment);
            }
            // don't pass 0!
            if ((oldRef <= 0 && oldRef + adjustedIncrement >= 0)
                    || (oldRef >= 0 && oldRef + adjustedIncrement < oldRef)) {
                // overflow case
                refCntUpdater.getAndAdd(this, -adjustedIncrement);
                throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);
            }
            return this;
        }
    
    
    private boolean release0(int decrement) {
        int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement); // 返回对外的真实值,并且如果是奇数会抛异常
        //第一次直接non-volatile进行cas
            if (decrement == realCnt) {
                if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
                    deallocate();
                    return true;
                }
                return retryRelease0(decrement);
            }
            return releaseNonFinal0(decrement, rawCnt, realCnt);
     }
    
        private boolean releaseNonFinal0(int decrement, int rawCnt, int realCnt) {
            if (decrement < realCnt
                    // all changes to the raw count are 2x the "real" change
                    && refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
                return false;
            }
            return retryRelease0(decrement);
        }
    
        private boolean retryRelease0(int decrement) {
            for (;;) {
                int rawCnt = refCntUpdater.get(this), realCnt = toLiveRealCnt(rawCnt, decrement);
                if (decrement == realCnt) {
                    if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
                        deallocate();
                        return true;
                    }
                } else if (decrement < realCnt) {
                    // all changes to the raw count are 2x the "real" change
                    if (refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
                        return false;
                    }
                } else {
                    throw new IllegalReferenceCountException(realCnt, -decrement);
                }
                Thread.yield(); // this benefits throughput under high contention
            }
        }
    
    /**
         * Like {@link #realRefCnt(int)} but throws if refCnt == 0
         */
        private static int toLiveRealCnt(int rawCnt, int decrement) {
            if ((rawCnt & 1) == 0) {
                return rawCnt >>> 1;
            }
            // odd rawCnt => already deallocated
            throw new IllegalReferenceCountException(0, -decrement);
        }
    

    UnpooledHeapByteBuf


    Big endian Java heap buffer implementation.It is recommended to use UnpooledByteBufAllocator.heapBuffer(int, int), Unpooled.buffer(int) and Unpooled.wrappedBuffer(byte[]) instead of calling the constructor explicitly.

    主要成员变量

    private final ByteBufAllocator alloc;
    byte[] array;
    // internal bytebuffer,利用ByteBuffer.wrap(array);实现
    private ByteBuffer tmpNioBuf;
    

    函数解读

    • 构造函数
    // 参数检查会检查是否为null,然后进行参数合法性校验
    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {
        // 调用父类,引用计数加1
        super(maxCapacity);
    
        checkNotNull(alloc, "alloc");
        checkNotNull(initialArray, "initialArray");
    
        if (initialArray.length > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));
        }
    
        this.alloc = alloc;
        // 直接将穿过的引用赋值
        setArray(initialArray);
        setIndex(0, initialArray.length);
    }
    
    public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        
            ...
    
            this.alloc = alloc;
            // 赋值array,但是tmpNioBuf还是null
            setArray(allocateArray(initialCapacity));
            setIndex(0, 0);
        }
    
    • Array相关函数,allocateArray freeArray setArrary,由于只是直接利用java堆所以直接new,noop,直接赋值

    • 获取基本信息,order isDirect capacity hasArray array arrayOffset hasMemoryAddress ,分别是大端,false,有array,返回array变量,0,false

    • // 将按照newCapacity的大小去截取旧的数组
      // new的比较大,那么会全部复制
      // new小于旧的,截取new大小的数组内容,同时只保留复制未读部分,已读均不进行复制,更新index
      @Override
      public ByteBuf capacity(int newCapacity) {
          checkNewCapacity(newCapacity);
      
          int oldCapacity = array.length;
          byte[] oldArray = array;
          // 如果设置新的容量大于旧的容量,那么将申请新的空间,将原来的内存复制到新的空间中
          if (newCapacity > oldCapacity) {
              byte[] newArray = allocateArray(newCapacity);
              System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
              setArray(newArray);
              freeArray(oldArray);
          } else if (newCapacity < oldCapacity) {
              // 如果设置新的容量小于旧的容量,默认只保留【0-newCapacity】之间的内容,如果这部分内容中有未读取完的,按照原来的readIndex位置复制回去
              // 如果这部分内容中都是已读的,不进行复制
              byte[] newArray = allocateArray(newCapacity);
              int readerIndex = readerIndex();
              if (readerIndex < newCapacity) {
                  int writerIndex = writerIndex();
                  if (writerIndex > newCapacity) {
                      writerIndex(writerIndex = newCapacity);
                  }
                  System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
              } else {
                  setIndex(newCapacity, newCapacity);
              }
              setArray(newArray);
              freeArray(oldArray);
          }
          return this;
      }
      
    • getBytes setBytes nioBuffer getType...系列函数,以下例子可以简单了解大小端读取差异

      • @Override
        public short getShort(int index) {
            ensureAccessible();
            return _getShort(index);
        }
        
        @Override
        protected short _getShort(int index) {
            return HeapByteBufUtil.getShort(array, index);
        }
        
        @Override
        public short getShortLE(int index) {
            ensureAccessible();
            return _getShortLE(index);
        }
        
            static short getShort(byte[] memory, int index) {
                return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
            }
        
            static short getShortLE(byte[] memory, int index) {
                return (short) (memory[index] & 0xff | memory[index + 1] << 8);
            }
        
    • 配置功能性函数copy 大致是copy array然后调用构造函数,internalNioBuffer返回成员变量(同时初始化ByteBuffer.wrap(array)),deallocate只是将array置为空,unwrap返回null

    Other

    • 内联函数,private就是final,final函数会建议虚拟机进行内联操作从而减少函数调用带来的损耗
    • if .. else if … 语句可以改成if .. if .. 可读性更高
    • [JMH性能框架
  • 相关阅读:
    2013-06-28,“万能数据库查询分析器”在中关村本月数据库类下载排行中重返前10位
    Oracle 存储过程
    强化学习精要:核心算法与TensorFlow实现
    深入理解TensorFlow:架构设计与实现原理
    Vue.js实战
    TensorFlow机器学习实战指南
    深入浅出React和Redux
    Flutter技术入门与实战
    TensorFlow:实战Google深度学习框架
    深度学习:一起玩转TensorLayer
  • 原文地址:https://www.cnblogs.com/GrimReaper/p/10386412.html
Copyright © 2011-2022 走看看