zoukankan      html  css  js  c++  java
  • Netty源码分析之ByteBuf引用计数

    引用计数是一种常用的内存管理机制,是指将资源的被引用次数保存起来,当被引用次数变为零时就将其释放的过程。Netty在4.x版本开始使用引用计数机制进行部分对象的管理,其实现思路并不是特别复杂,它主要涉及跟踪某个对象被引用的次数。在Netty具体代码中需要通过引用计数进行内存管理的对象,会基于ReferenceCounted接口实现,其中引用计数大于0时则代表该对象被引用不会释放,当引用计数减少到0时,该对象就会被释放。通过引用计数机制,Netty可以很好的实现内存管理,引用计数减少到0时要么直接释放内存,要么放回内存池中重复利用。

    1、基本示例

    下面先通过一个简单示例看下Netty中引用计数机制的使用

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            
               ByteBuf recvBuffer = (ByteBuf) msg;// 申请ByteBuf 需要主动释放
            if(recvBuffer.isDirect()){
                System.err.println(true);
            }
            PooledByteBufAllocator allocator = new PooledByteBufAllocator(true);
            ByteBuf sendBuffer = allocator.buffer();//申请池化直接内存
            System.err.println("sendBuffer的引用计数:"+sendBuffer.refCnt());
            sendBuffer.retain();
            System.err.println("sendBuffer的引用计数:"+sendBuffer.refCnt());
            sendBuffer.release();
            System.err.println("sendBuffer的引用计数:"+sendBuffer.refCnt());
    try { byte[] bytesReady = new byte[recvBuffer.readableBytes()]; recvBuffer.readBytes(bytesReady); System.out.println("channelRead收到数据:"+ BytesUtils.toHexString(bytesReady)); byte[] sendBytes = new byte[] {0x7E,0x01,0x02,0x7e}; sendBuffer.writeBytes(sendBytes); ctx.writeAndFlush(sendBuffer); System.err.println("sendBuffer的引用计数:"+sendBuffer.refCnt()); }catch (Exception e) { // TODO: handle exception System.err.println(e.getMessage()); }finally { System.err.println("recvBuffer的引用计数:"+recvBuffer.refCnt()); recvBuffer.release(); //此处需要释放 System.err.println("recvBuffer的引用计数:"+recvBuffer.refCnt()); } }

    输出结果如下,通过示例可以看出retain方法会增加计数引用,release方法会减少计数引用

    true
    sendBuffer的引用计数:1
    sendBuffer的引用计数:2
    sendBuffer的引用计数:1
    sendBuffer的引用计数:0
    recvBuffer的引用计数:1
    recvBuffer的引用计数:0

    AbstractReferenceCountedByteBuf实现了对ByteBuf的内存管理,以实现内存的回收、释放或者重复利用 ,AbstractReferenceCountedByteBuf的继承实现关系如下图所示

    2、ReferenceCounted接口定义

    首先是ReferenceCounted接口的定义

    public interface ReferenceCounted {
        /**
         * Returns the reference count of this object.  If {@code 0}, it means this object has been deallocated.
         * 返回对象的引用计数
         */
        int refCnt();
    
        /**
         * Increases the reference count by {@code 1}.
         * 增加引用计数
         */
        ReferenceCounted retain();
    
        /**
         * Increases the reference count by the specified {@code increment}.
         * 引用计数增加指定值
         */
        ReferenceCounted retain(int increment);
    
        /**
         * Records the current access location of this object for debugging purposes.
         * If this object is determined to be leaked, the information recorded by this operation will be provided to you
         * via {@link ResourceLeakDetector}.  This method is a shortcut to {@link #touch(Object) touch(null)}.
         * 记录该对象的当前访问位置,用于调试。
         * 如果确定该对象被泄露,将提供此操作记录的信息给您
         */
        ReferenceCounted touch();
    
        /**
         * Records the current access location of this object with an additional arbitrary information for debugging
         * purposes.  If this object is determined to be leaked, the information recorded by this operation will be
         * provided to you via {@link ResourceLeakDetector}.
         * 记录该对象的当前访问位置,附加信息用于调试。
         * 如果确定该对象被泄露,将提供此操作记录的信息给您
         */
        ReferenceCounted touch(Object hint);
    
        /**
         * Decreases the reference count by {@code 1} and deallocates this object if the reference count reaches at
         * {@code 0}.
         *
         * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
         * 引用计数减少,如果计数变为了0,则释放对象资源
         * 如果对象资源被释放,则返回true,否则返回false
         */
        boolean release();
    
        /**
         * Decreases the reference count by the specified {@code decrement} and deallocates this object if the reference
         * count reaches at {@code 0}.
         *
         * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
         * 引用计数-指定值,如果计数变为了0,则释放对象资源或交回到对象池
         * 如果对象资源被释放,则返回true,否则返回false
         */
        boolean release(int decrement);
    }

    3、AbstractReferenceCountedByteBuf源码分析

    AbstractReferenceCountedByteBuf对ReferenceCounted进行了具体实现,retain与release两个方法通过CAS方式对引用计数refcnt进行操作,下面对其源码进行简单分析

    初始化

    引用计数初始值refCnt 使用关键字volatile修饰,保证线程的可见性,同时使用偶数,引用增加通过位移操作实现,提高运算效率。

    采用 AtomicIntegerFieldUpdater 对象,通过CAS方式更新refCnt,以实现线程安全,避免加锁,提高效率。

        private static final long REFCNT_FIELD_OFFSET;
        //采用 AtomicIntegerFieldUpdater 对象,CAS方式更新refCnt
        private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
                AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
    
        //refCnt 实际值为偶数,采用位移操作提高效率
        // even => "real" refcount is (refCnt >>> 1); odd => "real" refcount is 0
        @SuppressWarnings("unused")
        private volatile int refCnt = 2;

    retain操作

    上面示例中每调用一次retain方法,引用计数就会累加一次,我们看下源码中retain的具体实现

        public ByteBuf retain() {
            return retain0(1);
        }
    
        @Override
        public ByteBuf retain(int increment) {
            return retain0(checkPositive(increment, "increment"));
        }
    
        //计数器增值操作
        private ByteBuf retain0(final int increment) {
            // all changes to the raw count are 2x the "real" change
            int adjustedIncrement = increment << 1; // overflow OK here  真正的计数都是2倍递增
            int oldRef = refCntUpdater.getAndAdd(this, adjustedIncrement); //通过CAS方式递增并获取原值
            if ((oldRef & 1) != 0) {//判断奇偶,正常情况这里应该都是偶数
                throw new IllegalReferenceCountException(0, increment);
            }
            // don't pass 0!  如果计数小于等于0,以及整型范围越界(0x7fffffff+1)抛出异常
            if ((oldRef <= 0 && oldRef + adjustedIncrement >= 0)
                    || (oldRef >= 0 && oldRef + adjustedIncrement < oldRef)) {
                // overflow case
                refCntUpdater.getAndAdd(this, -adjustedIncrement);
                throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);
            }
            return this;
        }

    release操作

    通过调用release方法,对引用计数做减值操作,源码中release的具体实现要注意的是由于引用计数以2倍递增,所以引用次数= 引用计数/2,当decrement=refcnt/2 也就是引用次数=释放次数时,代表ByteBuf不再被引用,执行内存释放或放回内存池的操作。

        //计数器减值操作
        private boolean release0(int decrement) {
            int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement); //对计数器进行除以2操作,也就是引用次数
            /**
             * /这里如注意 你传入的减值参数decrement  = realCnt 时 等同于 引用次数=释放次数,直接进行释放操作
             */
            if (decrement == realCnt) {
                if (refCntUpdater.compareAndSet(this, rawCnt, 1)) { //CAS方式置为1
                    deallocate();//内存释放或放回内存池
                    return true;
                }
                return retryRelease0(decrement);//进入具体操作
            }
            return releaseNonFinal0(decrement, rawCnt, realCnt);
        }
    
        private boolean releaseNonFinal0(int decrement, int rawCnt, int realCnt) {
            //如果decrement 小于 realCnt,通过CAS方式减去decrement*2
            if (decrement < realCnt
                    // all changes to the raw count are 2x the "real" change
                    && refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
                return false;
            }
            return retryRelease0(decrement);
        }
    
        private boolean retryRelease0(int decrement) {
            for (;;) {
                int rawCnt = refCntUpdater.get(this), realCnt = toLiveRealCnt(rawCnt, decrement);
                if (decrement == realCnt) {
                    if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
                        deallocate();
                        return true;
                    }
                } else if (decrement < realCnt) {//如果decrement 小于 realCnt,通过CAS方式减去decrement*2
                    // all changes to the raw count are 2x the "real" change
                    if (refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
                        return false;
                    }
                } else {
                    throw new IllegalReferenceCountException(realCnt, -decrement);
                }
                Thread.yield(); // this benefits throughput under high contention
            }
        }
    
        /**
         * Like {@link #realRefCnt(int)} but throws if refCnt == 0
         */
        private static int toLiveRealCnt(int rawCnt, int decrement) {
            if ((rawCnt & 1) == 0) {
                return rawCnt >>> 1;
            }
            // odd rawCnt => already deallocated
            throw new IllegalReferenceCountException(0, -decrement);
        }

    4、总结 

    以上我们围绕AbstractReferenceCountedByteBuf对Netty引用计数的具体实现进行了分析,可以看到Netty在实现引用计数的同时,结合CAS、位移计算等方式,保证了运算效率和线程安全,在实际项目中我们遇到类似应用场景也都可以借鉴参考,如数据发送次数,商品剩余数量等计数场景的实现。希望本文对大家能有所帮助,其中如有不足与不正确的地方还望指正与海涵,十分感谢。

    关注微信公众号,查看更多技术文章。

     

  • 相关阅读:
    Qt 无边框窗体改变大小 完美实现
    深入Windows窗体原理及控件重绘技巧
    EF里Guid类型数据的自增长、时间戳和复杂类型的用法
    Entity Framework中的Identity map和Unit of Work模式
    使用SQLite数据库和Access数据库的一些经验总结
    实现Avl平衡树
    使用Ajax
    接口和类 反射的差异性
    Guacamole 介绍
    依赖注入(DI)和Ninject
  • 原文地址:https://www.cnblogs.com/dafanjoy/p/15611042.html
Copyright © 2011-2022 走看看