zoukankan      html  css  js  c++  java
  • netty源码解析(4.0)-28 ByteBuf内存池:PooledByteBufAllocator-把一切组装起来

      PooledByteBufAllocator负责初始化PoolArena(PA)和PoolThreadCache(PTC)。它提供了一系列的接口,用来创建使用堆内存或直接内存的PooledByteBuf对象,这些接口只是一张皮,内部完全使用了PA和PTC的能力。初始化过程分两个步骤,首先初始化一系列的默认参数,然后初始化PTC对象和PA数组。

    默认参数和它们的值

      DEFAULT_PAGE_SIZE: PoolChunk中的page的大小-pageSize,  使用-Dio.netty.allocator.pageSize设置, 默认值:8192。

      DEFAULT_MAX_ORDER: PoolChunk中二叉树的高度: maxOrder, 使用-Dio.netty.allocator.maxOrder设置,默认值:11。

      DEFAULT_NUM_HEAP_ARENA: 使用堆内存的PA数组的长度,使用-Dio.netty.allocator.numHeapArenas设置,默认值: CPU核心数 * 2。

      DEFAULT_NUM_DIRECT_ARENA: 使用直接内存的PA数组的长度,使用-Dio.netty.allocator.numHeapArenas设置,默认值: CPU核心数 * 2。

      DEFAULT_TINY_CACHE_SIZE:  PTC对象中每个用来缓存Tiny内存的MemoryRegionCache对象中queue的长度,使用-Dio.netty.allocator.tinyCacheSize设置,默认值:512。

      DEFAULT_SMALL_CACHE_SIZE: PTC对象中每个用来缓存Small内存的MemoryRegionCache对象中queue的长度,使用-Dio.netty.allocator.smallCacheSize设置,默认值:256。

      DEFAULT_NORMAL_CACHE_SIZE: PTC对象中每个用来缓存Normal内存的MemoryRegionCache对象中queue的长度,使用-Dio.netty.allocator.normalCacheSize设置,默认值:64。

      DEFAULT_MAX_CACHED_BUFFER_CAPACITY: PTC对象中缓存Normal内存的大小上限。使用-Dio.netty.allocator.maxCachedBufferCapacity设置,默认值32 * 1024。

      DEFAULT_CACHE_TRIM_INTERVAL:  PTC对象中释放缓存的内存阈值。当PTC分配内存次数大于这个值时会释放缓存的内存。使用-Dio.netty.allocator.cacheTrimInterval设置,默认值:8192。

      DEFAULT_USE_CACHE_FOR_ALL_THREADS: 是否对所有的线程使用缓存。使用-Dio.netty.allocator.useCacheForAllThreads设置,默认值:true。

      DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT: 直接内存的对齐参数,分配直接内存的大小必须是它的整数倍。使用-Dio.netty.allocator.directMemoryCacheAlignment设置,默认值:0, 表示不对齐。

    初始化PoolArena数组

      PooledByteBufAllocator维护了两个数组:

    PoolArena<byte[]>[] heapArenas; 
    PoolArena<ByteBuffer>[] directArenas;

      heapArenas用来管理堆内存,directArenas用来管理直接内存。这两个数组在构造方法中初始化,构造方法的定义是:

        public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                      int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                      boolean useCacheForAllThreads, int directMemoryCacheAlignment)

      prefreDirect: 创建PooledByteBuf时,是否优先使用直接内存。

      nHeapArena: 默认使用DEFAULT_NUM_HEAP_ARENA

      nDirectArena: 默认使用DEFAULT_NUM_DIRECT_ARENA

      pageSize: 默认使用的DEFAULT_PAGE_SIZE

      maxOrder: 默认使用DEFAULT_MAX_ORDER

      tinyCacheSize:  默认使用DEFAULT_TINY_CACHE_SIZE

      smallCacheSize: 默认使用DEFAULT_SMALL_CACHE_SIZE

      normalCacheSize: 默认使用DEFAULT_NORMAL_CACHE_SIZE。

      useCacheForAllThreads: 默认使用DEFAULT_USE_CACHE_FOR_ALL_THREADS

      directMemoryCacheAlignment: 默认使用DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT

      

      这两数组的初始化代码如下:

     1       int pageShifts = validateAndCalculatePageShifts(pageSize);
     2 
     3         if (nHeapArena > 0) {
     4             heapArenas = newArenaArray(nHeapArena);
     5             List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
     6             for (int i = 0; i < heapArenas.length; i ++) {
     7                 PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
     8                         pageSize, maxOrder, pageShifts, chunkSize,
     9                         directMemoryCacheAlignment);
    10                 heapArenas[i] = arena;
    11                 metrics.add(arena);
    12             }
    13             heapArenaMetrics = Collections.unmodifiableList(metrics);
    14         } else {
    15             heapArenas = null;
    16             heapArenaMetrics = Collections.emptyList();
    17         }
    18 
    19         if (nDirectArena > 0) {
    20             directArenas = newArenaArray(nDirectArena);
    21             List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
    22             for (int i = 0; i < directArenas.length; i ++) {
    23                 PoolArena.DirectArena arena = new PoolArena.DirectArena(
    24                         this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
    25                 directArenas[i] = arena;
    26                 metrics.add(arena);
    27             }
    28             directArenaMetrics = Collections.unmodifiableList(metrics);
    29         } else {
    30             directArenas = null;
    31             directArenaMetrics = Collections.emptyList();
    32         }

      1行,计算pageShifts,算法是pageShifts = Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize) = 31 - Integer.numberOfLeadingZeros(pageSize)。 Integer.numberOfLeadingZeros(pageSize)是pageSize(32位整数)从最高位起连续是0的位数,因此pageShifts可以简化为pageShifts = log2(pageSize)。

      4,20行,创建数组,new PoolArena[size]。  

      6-12,22-17行, 初始化数组中的PoolArena对象,分别使用PooArena的两个内部类: HeapArena, DirectArena。

    初始化PoolThreadCache

       PoolThreadCache使用PoolThreadLocalCache(PTLC)间接初始化,PTLC是PooledByteBufAllocator的内部内,它的定义如下:

    final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache>

      这个类派生自io.netty.util.concurrent.FastThreadLocal<T>,  和java.lang.ThreadLocal<T>功能一样,实现了线程本地存储(TLS)的功能,不同的是FastThreadLocal<T>优化了访问性能。PTLC覆盖了父类的initialValue方法,这个方法负责初始化线程本地的PoolThreadCache对象。当第一次调用PTLC对象的get方法时,这个方法会被调用。

     1         @Override
     2         protected synchronized PoolThreadCache initialValue() {
     3             final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
     4             final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
     5 
     6             if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
     7                 return new PoolThreadCache(
     8                         heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
     9                         DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    10             }
    11             // No caching for non FastThreadLocalThreads.
    12             return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
    13         }

      3,4行,分别从headArenas,directArenas中取出一个使用次数最少的PoolArena对象。PoolArena有一个numThreadCaches属性,这个属性是AtomicInteger类型的原子变量。它的作用是在用来记录被PoolThreadCache对象使用的次数。PoolThreadCache对象创建时会在构造方法中会调用它的getAndIncrement方法,释放时在free0方法中调用他的getAndDecrement方法。

      6行,  如果运行每个线程都使用缓存(userCacheForAllThreads==true),或者当成线程对象是FastThreadLocalThread时, 在第8行创建一个线程专用的PTC对象。

    PoolChunkList(PCKL)

    关键属性

      PoolChunkList<T> nextList

      PoolChunkList<T> prevList

      这两个属性表明PCKL对象是一个双向链表的节点。

      PoolChunk<T> head

      这个属性表明PCKL对象还维护的一个PCK类型的链表,head指向这个链表的头。

      int minUsage;

      int maxUsage;

      int maxCapacity;

      minUsage是PCK链表中每个PCK对象内存的最小使用率,maxUseage是PCK的最大使用率。这两个值是百分比,例如:minUsage=10, maxUse=50,表示PCK链表中只能保存使用率在[10%,50%)的PCK对象。 maxCapacity表示PCK最大可分配的内存数,算法是: maxCapacity = (int)(chunkSize * (100L - minUseage) / 100L)。

    初始化PCKL链表

      PCKL链表有PoolArena负责维护,在PoolArena的构造方法中初始化:

     1 // io.netty.buffer.PoolArena#PoolArena(PooledByteBufAllocator parent, int pageSize,
     2 //          int maxOrder, int pageShifts, int chunkSize, int cacheAlignment)
     3 
     4         q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
     5         q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
     6         q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
     7         q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
     8         q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
     9         qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
    10 
    11         q100.prevList(q075);
    12         q075.prevList(q050);
    13         q050.prevList(q025);
    14         q025.prevList(q000);
    15         q000.prevList(null);
    16         qInit.prevList(qInit); 

      4-9行,初始化PCKL节点。每个节点的名字q{num},其中num表示这个节点的最小使用率minUsage,如q075节点的minUsage=%75。

      11-16行,把PCKL节点组装成一个链表。

      使用q(minUsage, maxUsage)表示一个节点,那么:

      qInit = q(Integer.MIN_VALUE, 25%)

      q000 = q(1%, 50%)

      q025 = q(25%, 75%)

      q075 = q(75%, 100%)

      q100 = q(100%, Integer.MAX_VALUE)

      这个链表的结构如下图所示:

      

    PoolChunk(PCK)在PoolChunkList(PCKL)中移动

      一个新创建的PCK对象,它的内存使用率是usage=%0,被放进qInit节节点。每次从这个PCK对象中分配内存,都会导致它的使用率增加,当usage>=25%,即大于等于qInit的maxUsage时,会把它移动到q000中。继续从PCK对象中分配内存,它的usage继续增加,当usage大于等于它所属PCKL的maxUsage时,把它移动到PKCL链表中的下一个节点,直到q100为止。下面是内存分配导致PCK移动的代码:

     1     //io.netty.buffer.PoolChunkList#allocate
     2     boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
     3         if (head == null || normCapacity > maxCapacity) {
     4             // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
     5             // be handled by the PoolChunks that are contained in this PoolChunkList.
     6             return false;
     7         }
     8 
     9         for (PoolChunk<T> cur = head;;) {
    10             long handle = cur.allocate(normCapacity);
    11             if (handle < 0) {
    12                 cur = cur.next;
    13                 if (cur == null) {
    14                     return false;
    15                 }
    16             } else {
    17                 cur.initBuf(buf, handle, reqCapacity);
    18                 if (cur.usage() >= maxUsage) {
    19                     remove(cur);
    20                     nextList.add(cur);
    21                 }
    22                 return true;
    23             }
    24         }
    25     }

      9-12行,尝试从PCK链表中的所有PCK节点分配所需的内存。

      14行,没有找到能分配内存的PCK节点。

      17行,从cur节点分配到所需的内存,并初始化PooledByteBuf对象。

      18-21行,如cur节点的使用率大于等于当前PCKL节点maxUsage,调用remove方法把cur从head链表中删除,然后调用PCKL链表中的下一个节点的add方法,把cur移动到下一个节点中。

      如果持续地释放内存,把内存还给PCK对象,会导致usage持续减小,当usage小于它所属的PCKL的minUsage时,把它移动到PCKL链表中的前一个节点,直到q000位为止。当释放内存导致PCK对象的usage等于%0,会销毁这个PCK对象,释放整个chunk的内存。下面是释放内存导致PCK对象移动的代码:

     1     //io.netty.buffer.PoolChunkList#free
     2     boolean free(PoolChunk<T> chunk, long handle) {
     3         chunk.free(handle);
     4         if (chunk.usage() < minUsage) {
     5             remove(chunk);
     6             // Move the PoolChunk down the PoolChunkList linked-list.
     7             return move0(chunk);
     8         }
     9         return true;
    10     }
    11 
    12     //io.netty.buffer.PoolChunkList#move0
    13     private boolean move0(PoolChunk<T> chunk) {
    14         if (prevList == null) {
    15             // There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and
    16             // all memory associated with the PoolChunk will be released.
    17             assert chunk.usage() == 0;
    18             return false;
    19         }
    20         return prevList.move(chunk);
    21     }

      第3行,释放内存,把内存返还给PCK对象。

      4-7行,如PCK的使用率小于当前PCKL的minUsage,调用remove方法把PCK对象从当前PCKL对象中删除,然后调用move0方法把它移动到前一个PCKL节点。

      13-31行,移动PCK到前一个PCKL。

    完整的内存分配释放流程

    内存分配

      入口方法:

      io.netty.buffer.AbstractByteBufAllocator#heapBuffer(int, int),创建使用堆内存的ByteBuf, 调用newHeapBuffer方法。

      io.netty.buffer.AbstractByteBufAllocator#directBuffer(int, int), 创建使用直接内存的ByteBuf,  调用newDirectBuffer方法。

      具体实现:

      io.netty.buffer.PooledByteBufAllocator#newHeapBuffer(int initialCapacity, int maxCapacity)。

      io.netty.buffer.PooledByteBufAllocator#newDirectBuffer(int initialCapacity, int maxCapacity)。 

      这两个方法都是从PoolThreadCache对象中得到线程专用的PoolArena对象,然后调用PoolArena的allocate方法创建PoolByteBuf对象。

      PoolArena入口方法:

      io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int),这个方法是PoolArena分配内存,创建PoolByteBuf对象的入口方法。它先调用子类实现的newByteBuf创建一个PoolByteBuf对象,这个方法有两个实现:

      io.netty.buffer.PoolArena.HeapArena#newByteBuf(int maxCapacity),创建使用堆内存的PooledByteBuf对象。

      io.netty.buffer.PoolArena.DirectArena#newByteBuf(int maxCapacity),创建使用直接内存PooledByteBuf对象。

      然后调用io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, io.netty.buffer.PooledByteBuf<T>, int)方法为PoolByteBuf对象分配内存,这个方法是分配内存的核心方法,下面来重点分析一下它的代码:

     1      private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
     2         final int normCapacity = normalizeCapacity(reqCapacity);
     3         if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
     4             int tableIdx;
     5             PoolSubpage<T>[] table;
     6             boolean tiny = isTiny(normCapacity);
     7             if (tiny) { // < 512
     8                 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
     9                     // was able to allocate out of the cache so move on
    10                     return;
    11                 }
    12                 tableIdx = tinyIdx(normCapacity);
    13                 table = tinySubpagePools;
    14             } else {
    15                 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
    16                     // was able to allocate out of the cache so move on
    17                     return;
    18                 }
    19                 tableIdx = smallIdx(normCapacity);
    20                 table = smallSubpagePools;
    21             }
    22 
    23             final PoolSubpage<T> head = table[tableIdx];
    24 
    25             /**
    26              * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
    27              * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
    28              */
    29             synchronized (head) {
    30                 final PoolSubpage<T> s = head.next;
    31                 if (s != head) {
    32                     assert s.doNotDestroy && s.elemSize == normCapacity;
    33                     long handle = s.allocate();
    34                     assert handle >= 0;
    35                     s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
    36                     incTinySmallAllocation(tiny);
    37                     return;
    38                 }
    39             }
    40             synchronized (this) {
    41                 allocateNormal(buf, reqCapacity, normCapacity);
    42             }
    43 
    44             incTinySmallAllocation(tiny);
    45             return;
    46         }
    47         if (normCapacity <= chunkSize) {
    48             if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
    49                 // was able to allocate out of the cache so move on
    50                 return;
    51             }
    52             synchronized (this) {
    53                 allocateNormal(buf, reqCapacity, normCapacity);
    54                 ++allocationsNormal;
    55             }
    56         } else {
    57             // Huge allocations are never served via the cache so just call allocateHuge
    58             allocateHuge(buf, reqCapacity);
    59         }
    60     }

      第2行,根据需要的内存大小reqCapacity,计算可以分配的标准内存大小normCapacity。必须满足(1)normCapacity>=reqCapacity, (2)normCapacity是directMemoryCacheAlignment的整数倍,此外,还要根据reqCapacity的大小分3中情况:

        reqCapacity>=chunkSize:normCapacity取同时满足(1),(2)的最小值。

        reqCapacity>=512且reqCapacity<chunkSize: (3)normCapacity>=512*2k, (4)normCapacity<=chunkSize,normCapacit取同时满足(1),(2),(3),(4)的最小值。

        reqCapacity<412: (5)normCapacity<512, (6)normCapacity是16的整数倍,normCapacity取同时满足(1),(2),(5),(6)的最小值。

      8-13行,分配Tiny类型的内存(<512)。 8-10行,如果PoolThreadCache缓存对象中分配到内存,分配内流程结束。12-13行,如果缓存中没有,就从Tiny内存池中分配一块内存。

      15-20行,分配Small类型的内存(>=512且<pageSize)。和分配Tiny内存的逻辑相同。

      29-27行,  使用从前两个步骤中得到的Tiny或Small内存的索引,从子页面池中分配一块内存。33行,从子页面中分配内存。35行,使用分配到的内存初始化PoolByteBuf对象,如果能到这里,分配内存流程结束。

      41行,如果子页面池中还没有内存可用,调用allocateNormal方法从PoolChunk对象中分配一个子页面,再从子页面中分配所需的内存。

      47-55行,分配Normal类型的内存(>=pageSize且<chunkSize)。48,49行,从缓存中分配内存,如果成功,分配内存流程结束。53行,缓存中没有可用的内存,调用allocateNormal方法从PoolChunk中分配内存。

      58行,如果分配的是>chunkSize的内存。这块内存不会进入PCKL链表中。

      

      上面代码中的allocateNormal方法封装了创建PCK对象,从PCK对象中分配内存,再把PCK对象放入到PCKL链表中的逻辑,也是十分重要的代码。

     1     private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
     2         if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
     3             q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
     4             q075.allocate(buf, reqCapacity, normCapacity)) {
     5             return;
     6         }
     7 
     8         // Add a new chunk.
     9         PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    10         long handle = c.allocate(normCapacity);
    11         assert handle > 0;
    12         c.initBuf(buf, handle, reqCapacity);
    13         qInit.add(c);
    14     }

      2-5行,依次尝试从每个PCKL节点中分配内存,如果成功,分配内存流程结束。

      9-13行,先创建一个新的PCK对象,然后从中分配内存,使用内存初始化PooledByteBuf对象,最后把PCK对象添加PCKL链表头节点qInit中。PKCL对象的add方法会和allocate一样,根据PCK对象的内存使用率,把它移动到链表中合适的位置。

      

    内存释放

      io.netty.buffer.PooledByteBuf#deallocate方法调用io.netty.buffer.PoolArena#free方法,这个free方法负责整个内存释放过程。

     1     void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
     2         if (chunk.unpooled) {
     3             int size = chunk.chunkSize();
     4             destroyChunk(chunk);
     5             activeBytesHuge.add(-size);
     6             deallocationsHuge.increment();
     7         } else {
     8             SizeClass sizeClass = sizeClass(normCapacity);
     9             if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
    10                 // cached so not free it.
    11                 return;
    12             }
    13 
    14             freeChunk(chunk, handle, sizeClass);
    15         }
    16     }

      这段代码重点在8-14行。第8,9行,优先把内存放到缓存中,这样下次就能快速地从缓存中直接取用。第14行,在不能放进缓存的情况下把内存返回给PCK对象。

     1     void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
     2         final boolean destroyChunk;
     3         synchronized (this) {
     4             switch (sizeClass) {
     5             case Normal:
     6                 ++deallocationsNormal;
     7                 break;
     8             case Small:
     9                 ++deallocationsSmall;
    10                 break;
    11             case Tiny:
    12                 ++deallocationsTiny;
    13                 break;
    14             default:
    15                 throw new Error();
    16             }
    17             destroyChunk = !chunk.parent.free(chunk, handle);
    18         }
    19         if (destroyChunk) {
    20             // destroyChunk not need to be called while holding the synchronized lock.
    21             destroyChunk(chunk);
    22         }
    23     }

      第17行,掉用PCKL对象的free方法把内存还给PCK对象,移动PCK对象在PCKL链表中位置。如果此时这个PCK对象的使用率变成0,destroyChunk=true。

      第21行,调用destroyChunk方法销毁掉PCK对象。

       

      

      

  • 相关阅读:
    VMware虚拟机下实现NAT方式上网的小方法
    文件附件上传
    [转] 深入了解 HTML 5
    用JS获取图片尺寸
    Stream与byte的转换
    递归算法
    搞懂java中的synchronized关键字
    初学UML之用例图
    浅析tomcat nio 配置
    面向对象:单一任务原则(SRP)
  • 原文地址:https://www.cnblogs.com/brandonli/p/11649263.html
Copyright © 2011-2022 走看看