zoukankan      html  css  js  c++  java
  • netty源码解析(4.0)-26 ByteBuf内存池:PoolArena-PoolSubpage

      PoolChunk用来分配大于或等于一个page的内存,如果需要小于一个page的内存,需要先从PoolChunk中分配一个page,然后再把一个page切割成多个子页-subpage,最后把内存以subpage为单位分配出去。PoolSubpage就是用来管理subpage的。

      一个page会被分割成若干个大小相同的subpage,subpage的的大小是elemSize。elemSize必须是16的整数倍,即必须满足elemSize & 15 == 0。elemSize的取值范围是(16, pageSize-16)。多个elemSize相等的PoolSubpage组成一个双向链表,用于分配特定大小的subpage。

      Tiny和Small类型的内存,都是用subpage来分配。Tiny内存大小范围是[16, 512),如果把大小不同的subpage按顺序排列,除最后一个外,任意一个subpage的elemSize+16等于下一个subpage的elemSize,可以用于分配Tiny内存的subpage有512>>4=32种。Small内存大小范围是[512, pageSize),subpage的elemSize=512 * 2n = 2(9+n), 可以用于分配Small内存的subpage有n种,n的最小值是0, 最大值由pageSize决定。

      已知:

        elemSize < pageSize

        pageSize可以表示为2k

        elemSize = 29+n

      => 29+n < 2k

      => 9+n < k

      => n < k - 9

      => n的取值范围是(0, k - 9)

      上一章中分析过pageShifts, 它就是上面推导过程中使用的变量k。

    PoolArena中的PoolSubpage数组

      PoolArena维护了两个PoolSubpage表,都是以PoolSubpage<T>[]数组的形式保存:

    • tinySubpagePools:用于分配Tiny内存,数组长度是521 >> 4 = 32。
    • smallSubpagePools: 用于分配Small内存,数组长度是pageShifts - 9。

      PoolArean在构造方法中初始化这两个数组:

     1         tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
     2         for (int i = 0; i < tinySubpagePools.length; i ++) {
     3             tinySubpagePools[i] = newSubpagePoolHead(pageSize);
     4         }
     5 
     6         numSmallSubpagePools = pageShifts - 9;
     7         smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
     8         for (int i = 0; i < smallSubpagePools.length; i ++) {
     9             smallSubpagePools[i] = newSubpagePoolHead(pageSize);
    10         }

      代码中的numTinySubpagePools=512>>4和numSmallSubpagePools=pageShifts - 9,分别是两个数组的长度。这两个数组保存的都是PoolSubpage双向链表的头节点,头节点不能用来分配内存。

      findSubpagePoolHead方法可以根据elemSize找到对应的PoolSubpage链表的头节点:

     1     PoolSubpage<T> findSubpagePoolHead(int elemSize) {
     2         int tableIdx;
     3         PoolSubpage<T>[] table;
     4         if (isTiny(elemSize)) { // < 512
     5             tableIdx = elemSize >>> 4;
     6             table = tinySubpagePools;
     7         } else {
     8             tableIdx = 0;
     9             elemSize >>>= 10;
    10             while (elemSize != 0) {
    11                 elemSize >>>= 1;
    12                 tableIdx ++;
    13             }
    14             table = smallSubpagePools;
    15         }
    16 
    17         return table[tableIdx];
    18     }

      4-6行,如果是Tiny内存,计算elemSize在tinySubpagePools中的偏移量tableIdx。

      8-14行,如果是Normal内存,计算elemSize在smallSubpagePools中的偏移量tabIeIdx。计算tableIdx的算法是把elemSize无符号右移10位之后,找非0的最高位,在找的过程中累加tableIdx,找到之后及得到了正确的偏移量。这个算法还可以简化成log2(elemSize) - 9。

      17行,取出一个PoolSubpage链表头。

      

    PoolSubpage初始化

      在PoolChunk的allocateSubpage方法中,调用findSubpagePoolHead得到一个head,然后使用分配到的二叉树内存节点初始化一个PoolSubpage节点。一个能用来分配内存的PooSubpage节点可以调用构造方法或init方法进行初始化。

     1     PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
     2         this.chunk = chunk;
     3         this.memoryMapIdx = memoryMapIdx;
     4         this.runOffset = runOffset;
     5         this.pageSize = pageSize;
     6         bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
     7         init(head, elemSize);
     8     }

      这个构造方法只是做了一些简单的属性初始化工作。第6行初始bitmap,它用bit位来记录每个subpage的使用情况,每个bit对应一个subpage,0表示subpage空闲,1表示subpage已经被分配出去。一个subpage的大小是elemSize,前面已经讲过,最小的elemSize=16,  那么一个page最多可分割成subpage的数量maxSubpageCount=pageSize/16=pageSize >> 4。bitmap是个long型的数字,每个long数据有64位,因此bitmap的最大长度只需要maxBitmapLength = maxSubpageCount / 64 = pageSize / 16 / 64 = pageSize >> 10就够用了。

      init方法的作用是根据elemSize计算出有效的bitmap长度bitmapLength,然后把bitmapLength范围内存的bit值都初始化为0。

     1     void init(PoolSubpage<T> head, int elemSize) {
     2         doNotDestroy = true;
     3         this.elemSize = elemSize;
     4         if (elemSize != 0) {
     5             maxNumElems = numAvail = pageSize / elemSize;
     6             nextAvail = 0;
     7             bitmapLength = maxNumElems >>> 6;
     8             if ((maxNumElems & 63) != 0) {
     9                 bitmapLength ++;
    10             }
    11 
    12             for (int i = 0; i < bitmapLength; i ++) {
    13                 bitmap[i] = 0;
    14             }
    15         }
    16         addToPool(head);
    17     }

      第5行,初始subpage的最大数量maxNumElems和可用数量numAvail。

      第6行,初始化下一个可用subpage的索引。

      第7行, 计算bitmap的有效数量,bitmapLength = maxNumElems >>> 6 = maxNumElems / 64。

      第8,9行,如果maxNumElems不是64的整数倍,bitmapLength需要额外加1。

      第12-14行,有效长度的bitmap值都设置成0。

      第16行, 把当前PoolSubpage节点添加到head后面。

      bitmap位的索引范围是[0, maxNumElems)。

      

    分配一个subpage

      PoolSubpage初始化完成之后,调用allocate可以分配一个subpage,返回的是一个long型的的handle,这个handle代表一块内存。handle的低32位memoryMapIdx,是PoolChunk中二叉树节点的索引;高32位bitmapIdx,是subpage在bitmap中对应位的索引。

      分配一个subpage有两个步骤:

    1. 找到一个可用subpage的索引bitmapIdx
    2. 把这个bitmapIdx在bitmap中对应的bit为置为1

      findNextAvail方法负责到底一个可用的subpage并返回它的bitmapIdx。

     1     private int findNextAvail() {
     2         final long[] bitmap = this.bitmap;
     3         final int bitmapLength = this.bitmapLength;
     4         for (int i = 0; i < bitmapLength; i ++) {
     5             long bits = bitmap[i];
     6             if (~bits != 0) {
     7                 return findNextAvail0(i, bits);
     8             }
     9         }
    10         return -1;
    11     }
    12 
    13     private int findNextAvail0(int i, long bits) {
    14         final int maxNumElems = this.maxNumElems;
    15         final int baseVal = i << 6;
    16 
    17         for (int j = 0; j < 64; j ++) {
    18             if ((bits & 1) == 0) {
    19                 int val = baseVal | j;
    20                 if (val < maxNumElems) {
    21                     return val;
    22                 } else {
    23                     break;
    24                 }
    25             }
    26             bits >>>= 1;
    27         }
    28         return -1;
    29     }

      第1-11行,变量bitmap数组,找到一个至少有一位是0的long数据。~bits != 0 说明bits中至少有一位是0。然后调用findNextAvail0找到bits中为0的最低位。

      第15行,计算bitmap数组中的索引i对应的bit索引baseVal = i << 6 = i * 64。

      第17-27行,遍历bits的每个bit位,遇到为0的bit后在19计算回bitmpaIdx = baseVal | j,j表示bit位在long数据中bit索引。如果满足bitmapIdx < maxNumElems在21返回。

      第28行,如果没找到可用的subpage, 返回-1。当maxNumElems不是64的整数倍时,bitmap数组中最后一个bits在~bits != 0的情况下可能已经没有subpage可用。

      

      allcate方法是分配supage的入口,它调用getNextAvail得到一个supage的bitmapIdx,  getNextAvail在nextAvail属性为-1的时候,调用findNexAvail。然后把bitmapIdx对应的bit为置为1,最后返回handle。

     1     long allocate() {
     2         if (elemSize == 0) {
     3             return toHandle(0);
     4         }
     5 
     6         if (numAvail == 0 || !doNotDestroy) {
     7             return -1;
     8         }
     9 
    10         final int bitmapIdx = getNextAvail();
    11         int q = bitmapIdx >>> 6;
    12         int r = bitmapIdx & 63;
    13         assert (bitmap[q] >>> r & 1) == 0;
    14         bitmap[q] |= 1L << r;
    15 
    16         if (-- numAvail == 0) {
    17             removeFromPool();
    18         }
    19 
    20         return toHandle(bitmapIdx);
    21     }

      第10行,得到下一个可用的subpage在bitmap中的索引bitmapIdx。

      第11行,计算bitmapIdx在bitpmap数组中索引,q = bitmapIdx >>> 6 = (int)(bitmapIdx/64)。

      第12行,计算bitmapIdx对应的bit在long数据中的位索引r,表示q对应的long数据的第r位就是。

      第14行,把bitmapIdx对应的bit为设置为1。

      第16,17行,把可用subpage数numAvail减1,如果numAvail==0表示当前PoolSubpage节点已经没有可用的subpage了,调用removeFromPool把它从链表中删除。

      第20行,把bitmapIdx转换成表示内存的handle,算法是:  handle = 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;

      

    释放一个subpage

      free方法实现了subpage释放的功能,和allocate相比要简单的多,它的主要工作是把bitmapIdx对应的bit为设置为0,还顺便做了一下清理善后工作。

     1     boolean free(PoolSubpage<T> head, int bitmapIdx) {
     2         if (elemSize == 0) {
     3             return true;
     4         }
     5         int q = bitmapIdx >>> 6;
     6         int r = bitmapIdx & 63;
     7         assert (bitmap[q] >>> r & 1) != 0;
     8         bitmap[q] ^= 1L << r;
     9 
    10         setNextAvail(bitmapIdx);
    11 
    12         if (numAvail ++ == 0) {
    13             addToPool(head);
    14             return true;
    15         }
    16 
    17         if (numAvail != maxNumElems) {
    18             return true;
    19         } else {
    20             // Subpage not in use (numAvail == maxNumElems)
    21             if (prev == next) {
    22                 // Do not remove if this subpage is the only one left in the pool.
    23                 return true;
    24             }
    25 
    26             // Remove this subpage from the pool if there are other subpages left in the pool.
    27             doNotDestroy = false;
    28             removeFromPool();
    29             return false;
    30         }
    31     }

      第5,6行,和allocate中解释过。

      第8行,把bitmapIdx对应的bit为置为0。

      第10行,把这个bitmapIdx赋值给nextAvail属性,这样在一次或多次调用free之后的第一次allocate调用就不会调用findNextAvail方法,可以提升allocate的性能。

      第12,13行,当前PoolSubpage节点中至少有一个可用的subpage,把当前节点添加到链表中。

      第27-29行,当前PooSubpage节点中所有分配出去的节点都全部还会来了,换言之,当前节点有回到bitmap初始化状态,把当前节点从链表中删除。

      

  • 相关阅读:
    JDK Base64编解码1.7和1.8的坑
    nacos部署注意点
    详解CurrentHashMap之预习篇
    SpringBoot爬坑系列
    开发之缓存与数据库优化
    jreble备注
    Unable to open debugger port (127.0.0.1:55119): java.net.SocketException "Socket closed"
    ConcurrentHashMap源码分析
    为什么要先高16位异或低16位再取模运算
    HashMap(三)之源码分析
  • 原文地址:https://www.cnblogs.com/brandonli/p/11642434.html
Copyright © 2011-2022 走看看