zoukankan      html  css  js  c++  java
  • netty源码解析(4.0)-24 ByteBuf基于内存池的内存管理

     io.netty.buffer.PooledByteBuf<T>使用内存池中的一块内存作为自己的数据内存,这个块内存是PoolChunk<T>的一部分。PooledByteBuf<T>是一个抽象类型,它有4个派生类:

    • PooledHeapByteBuf, PooledUnsafeHeapByteBuf 使用堆内存的PooledByteBuffer<byte[]>。
    • PooledDirectByteBuf, PooledUnsafeDirectByteBuf 使用直接内存的PooledByteBuf<ByteBuffer>。

    初始化

      PooledByteBuf的初始化过程分为两个步骤:创建实例;初始化内存。这两个步骤的代码如下:

        protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) {
            super(maxCapacity);
            this.recyclerHandle = recyclerHandle;
        }
    
        void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
            init0(chunk, handle, offset, length, maxLength, cache);
        }
    
        private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
            assert handle >= 0;
            assert chunk != null;
    
            this.chunk = chunk;
            memory = chunk.memory;
            allocator = chunk.arena.parent;
            this.cache = cache;
            this.handle = handle;
            this.offset = offset;
            this.length = length;
            this.maxLength = maxLength;
            tmpNioBuf = null;
        }

      创建实例时调用的构造方法只是为maxCapacity和recyclerHandler属性赋值,构造方法是protected,不打算暴露到外面。派生类都提供了newInstance方法创建实例,以PooledHeapByteBuf为例,它的newInstance方法实现如下:

     1     private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
     2         @Override
     3         protected PooledHeapByteBuf newObject(Handle handle) {
     4             return new PooledHeapByteBuf(handle, 0);
     5         }
     6     };
     7 
     8     static PooledHeapByteBuf newInstance(int maxCapacity) {
     9         PooledHeapByteBuf buf = RECYCLER.get();
    10         buf.reuse(maxCapacity);
    11         return buf;
    12     }

      这里的newInstance使用RECYCLER创建实例对象。Recycler<T>是一个轻量级的,支持循环使用的对象池。当对象池中没有可用对象时,会在第4行使用构造方法创建一个新的对象。

      init调用init0初始化数据内存,init0方法为几个内存相关的关键属性赋值:

    • chunk:  PoolChunk<T>对象,这个PooledByteBuf使用的内存就是它的一部分。
    • memory: 内存对象。更准确地说,PooledByteBuf使用的内存是它的一部分。
    • allocator: 创建这个PooledByteBuf的PooledByteBufAllocator对象。
    • cache:  线程专用的内存缓存。分配内存时会优先从这个缓存中寻找合适的内存块。
    • handle:  内存在chunk中node的句柄。chunk使用handle可以计算出它对应内存的起始位置offset。
    • offset:  分配内存的起始位置。
    • length: 分配内存的长度,也是这个PooledByteBuf的capacity。
    • maxLength: 这块内存node的最大长度。当调用capacity(int newCapacity)方法增加capacity时,只要newCapacity不大于这个值,就不用从新分配内存。

      内存初始化完成之后,这个PooledByteBuf可使用的内存范围是memory内存中[offset, offset+length)。idx方法可以把ByteBuf的索引转换成memory的索引:

    1     protected final int idx(int index) {
    2         return offset + index;
    3     }

    重新分配内存

      和前面讲过的ByteBuf实现一样,PooledByteBuf也需要使用capacity(int newCapacity)改变内存大小,也会涉及到把数据从旧内存中复制到新内存的问题。也就是说,要解决的问题是一样的,只是具体实现的差异。

     1     @Override
     2     public final ByteBuf capacity(int newCapacity) {
     3         checkNewCapacity(newCapacity);
     4 
     5         // If the request capacity does not require reallocation, just update the length of the memory.
     6         if (chunk.unpooled) {
     7             if (newCapacity == length) {
     8                 return this;
     9             }
    10         } else {
    11             if (newCapacity > length) {
    12                 if (newCapacity <= maxLength) {
    13                     length = newCapacity;
    14                     return this;
    15                 }
    16             } else if (newCapacity < length) {
    17                 if (newCapacity > maxLength >>> 1) {
    18                     if (maxLength <= 512) {
    19                         if (newCapacity > maxLength - 16) {
    20                             length = newCapacity;
    21                             setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
    22                             return this;
    23                         }
    24                     } else { // > 512 (i.e. >= 1024)
    25                         length = newCapacity;
    26                         setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
    27                         return this;
    28                     }
    29                 }
    30             } else {
    31                 return this;
    32             }
    33         }
    34 
    35         // Reallocation required.
    36         chunk.arena.reallocate(this, newCapacity, true);
    37         return this;
    38     }

      这个方法处理两大类情况: 不重新分配内存;重新分配内存并复制ByteBuf中的数据和状态。

      不重新分配内存: 

      8行: chunk不需要回收到内存池中,且newCapacity没有变化。

      11-32行: chunk需要回收到内存池中。

        13-14行:内存增大,且newcapacity不大于maxLength。把容量修改成newCapacity即可。

        20-22行: 内存减小,  newCapacity 大于maxLength的一半,maxLength<=512, newCapacity >maxLength - 16。 把容量修改成newCapacity, 调整readerIndex, writerIndex。

        25-27行: 内存减小,newCapacity大于maxLength的一半,  maxLength > 512。把容量修改成newCapacity, 调整readerIndex, writerIndex。

        31行: 内存不变,不做任何操作。

      需要重新分配内存:

      36行: 任何不满足以上情况的都要重新分配内存。这里使用Arena的reallocate方法重新分配内存,并把旧内存释放调,代码如下:

     1     //io.netty.buffer.PoolArena#reallocate, 
     2     void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
     3         if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
     4             throw new IllegalArgumentException("newCapacity: " + newCapacity);
     5         }
     6 
     7         int oldCapacity = buf.length;
     8         if (oldCapacity == newCapacity) {
     9             return;
    10         }
    11 
    12         PoolChunk<T> oldChunk = buf.chunk;
    13         long oldHandle = buf.handle;
    14         T oldMemory = buf.memory;
    15         int oldOffset = buf.offset;
    16         int oldMaxLength = buf.maxLength;
    17         int readerIndex = buf.readerIndex();
    18         int writerIndex = buf.writerIndex();
    19 
    20         allocate(parent.threadCache(), buf, newCapacity);
    21         if (newCapacity > oldCapacity) {
    22             memoryCopy(
    23                     oldMemory, oldOffset,
    24                     buf.memory, buf.offset, oldCapacity);
    25         } else if (newCapacity < oldCapacity) {
    26             if (readerIndex < newCapacity) {
    27                 if (writerIndex > newCapacity) {
    28                     writerIndex = newCapacity;
    29                 }
    30                 memoryCopy(
    31                         oldMemory, oldOffset + readerIndex,
    32                         buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
    33             } else {
    34                 readerIndex = writerIndex = newCapacity;
    35             }
    36         }
    37 
    38         buf.setIndex(readerIndex, writerIndex);
    39 
    40         if (freeOldMemory) {
    41             free(oldChunk, oldHandle, oldMaxLength, buf.cache);
    42         }
    43     }

      7-9行: 内存大小没变化,返回。

      12-18行: 记录下旧内存的信息,readerIndex, writerIndex。

      20行: 为PooledByteBuf分配新内存。

      21-38行: 把旧内存中数据复制到新内存,并把readerIndex,writerIndex设置在正确。

      41行: 释放就内存。

    释放内存

      内存释放代码在deallocate中:

     1     @Override
     2     protected final void deallocate() {
     3         if (handle >= 0) {
     4             final long handle = this.handle;
     5             this.handle = -1;
     6             memory = null;
     7             tmpNioBuf = null;
     8             chunk.arena.free(chunk, handle, maxLength, cache);
     9             chunk = null;
    10             recycle();
    11         }
    12     }

      关键是第8行代码,使用PoolArena的free方法释放内存。然后是recycle把当前PooledByteBuf对象放到RECYCLER中循环使用。

    PooledByteBufAllocator创建内存管理模块

      在前面分析PooledByteBuf内存初始化,重新分配及释放时,看到了内存管理的三个核心模块: PoolArena(chunk.arena),  PoolChunk(chunk),  PoolThreadCache(cache)。PooledByteBuf的内存管理能力都是使用这三模块实现的,它本身没有实现内存管理算法。当需要为PooledByteBuf分配一块内存时,先从一个线程专用的PoolThreadCache中得到一个PoolArena,  使用PoolArena的allocate找到一个满足要求内存块PoolChunk,  从这个内存块中分配一块连续的内存handle,计算出这块内存起始位置的偏移量offset, 最后调用PooledByteBuf的init方法初始化内存完成内存分配。 释放内存调用PoolArena的free方法。在内存分配时,会优先从PoolThreadCache中寻找合适的内存块。在内存释放时会把内存块暂时放在PoolThreadCache中,等使用频率过低时才会还给PoolChunk。这三个模块中PoolArena,  PoolThreadCache由PooledByteBufAllocator创建,PoolChunk由PoolArean维护。

      PooledByteBufAllocator维护了相关的几个属性:

      PoolArena<byte[]>[] heapArenas

      PoolArena<ByteBuffer>[] directArenas

      PoolThreadLocalCache threadCache

      headArenas和directArenas分别维护了多个PoolArena, 他们分别用来分配堆内存和直接内存。 如果使用得当,可以让每个线程持有一个专用的PoolArena,  避免线程间数据同步的开销。PoolThreadLocalCache会为每个线程创建一个专用的PoolThreadCache实例,这个实例分别持有一个heapArena和directArena。

      接下来的的几个章节会详细分析PoolArena和PoolThreadCache的实现代码。

  • 相关阅读:
    C#学习记录二:高级数据存储方式
    SharePoint 2010 匿名用户调用Client Object Model访问列表项
    Android 在闹钟开机时,如何解决开机动画没有播完就进入Launcher M
    Getting in Line UVA 216
    Android 如何关闭Navigation Bar M
    google protocol buffer 简介 版本 安装 使用 实例
    Android [VP]视频播放器播放本地视频时收到短信/彩信,需要界面提示 M
    Maven教程初级篇02:pom.xml配置初步
    当Ruby的model名字出错时,在现实view时显示错误的提示
    VS Code 安装 C++ 调试环境
  • 原文地址:https://www.cnblogs.com/brandonli/p/11562906.html
Copyright © 2011-2022 走看看