zoukankan      html  css  js  c++  java
  • Netty源码分析--内存模型(下)(十二)

         这一节我们一起看下分配过程

    1     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
    2         PooledByteBuf<T> buf = newByteBuf(maxCapacity); // 初始化一块容量为 2^31 - 1的ByteBuf
    3         allocate(cache, buf, reqCapacity); // reqCapacity = 1024  进入分配逻辑
    4         return buf;
    5     }
     1     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
     2         final int normCapacity = normalizeCapacity(reqCapacity); // 对请求的大小进行校准,一会具体看
     3         if (isTinyOrSmall(normCapacity)) { // capacity < pageSize 判断校准之后的请求大小 是否小于 8k
     4             int tableIdx;
     5             PoolSubpage<T>[] table;
     6             boolean tiny = isTiny(normCapacity); // 判断请求大小是否小于512B
     7             if (tiny) { // < 512
     8                 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // 从缓存中进行分配,如果是第一次这里肯定是没有对应大小缓存的,除非之前有过释放,然后内存空间被缓存起来了。
     9                     // was able to allocate out of the cache so move on
    10                     return;
    11                 }
    12                 tableIdx = tinyIdx(normCapacity); // tiny数组中获取对应的下标 
    13                 table = tinySubpagePools; // tiny subPage数组
    14             } else { // 512 <=  normCapacity  < 8K
    15                 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // 一样从缓存中进行分配,这个上一节详细的介绍过
    16                     // was able to allocate out of the cache so move on
    17                     return;
    18                 }
    19                 tableIdx = smallIdx(normCapacity); // 获取small subPage数组下标  
    20                 table = smallSubpagePools; // subPage 数组
    21             }
    22 
    23             final PoolSubpage<T> head = table[tableIdx];// 取得对应下标的subPage
    24 
    25             /**
    26              * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
    27              * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
    28              */
    29             synchronized (head) { // 锁定防止其他操作修改head结点
    30                 final PoolSubpage<T> s = head.next;
    31                 if (s != head) {// 如果双向链表已经初始化,那么从subpage中进行分配
    32                     assert s.doNotDestroy && s.elemSize == normCapacity; // 断言确保 当前的这个subPage中的elemSize大小必须和校对后的请求大小一样
    33                     long handle = s.allocate(); // 从subPage中进行分配
    34                     assert handle >= 0;
    35                     s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
    36                     incTinySmallAllocation(tiny);
    37                     return;
    38                 }
    39             }
    40             synchronized (this) {
    41                 allocateNormal(buf, reqCapacity, normCapacity); // 双向循环链表还没初始化,使用normal分配
    42             }
    43 
    44             incTinySmallAllocation(tiny);
    45             return;
    46         }
    47         if (normCapacity <= chunkSize) {
    48             if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
    49                 // was able to allocate out of the cache so move on
    50                 return;
    51             }
    52             synchronized (this) {
    53                 allocateNormal(buf, reqCapacity, normCapacity);
    54                 ++allocationsNormal;
    55             }
    56         } else {
    57             // Huge allocations are never served via the cache so just call allocateHuge
    58             allocateHuge(buf, reqCapacity);
    59         }
    60     }
        int normalizeCapacity(int reqCapacity) {
            if (reqCapacity < 0) {
                throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
            }
    
            if (reqCapacity >= chunkSize) {
                return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
            }
    
            if (!isTiny(reqCapacity)) { //  如果是 >=512 的内存申请,那么就规范到2的次方大小 ,比如 1000 就会 变成 1024,自己可以把这段位操作,自己main方法试一下。
                // Doubled
    
                int normalizedCapacity = reqCapacity;
                normalizedCapacity --;
                normalizedCapacity |= normalizedCapacity >>>  1;
                normalizedCapacity |= normalizedCapacity >>>  2;
                normalizedCapacity |= normalizedCapacity >>>  4;
                normalizedCapacity |= normalizedCapacity >>>  8;
                normalizedCapacity |= normalizedCapacity >>> 16;
                normalizedCapacity ++;
    
                if (normalizedCapacity < 0) {
                    normalizedCapacity >>>= 1;
                }
                assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
    
                return normalizedCapacity;
            }
    
            if (directMemoryCacheAlignment > 0) {
                return alignCapacity(reqCapacity);
            }
    
            // Quantum-spaced
            if ((reqCapacity & 15) == 0) { // 如果 < 512 判断如果是16的倍数,那么直接返回
                return reqCapacity;
            }
    
            return (reqCapacity & ~15) + 16; // 如果不是则对齐成16的倍数
        }
     private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
         if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
             q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
             q075.allocate(buf, reqCapacity, normCapacity)) {
             return;
         }  // 先尝试从chunk链表中进行分配, 这里有一个需要注意的地方
         // 分配顺序 先分配 q50 也就是 50% ~ 100% 的,然后是 q25 也就是 25% ~ 75% 的, 然后是 q000 0%~50% 、 qInit 0%~25% 、 q75 75% ~ 100%
    
         // Add a new chunk.
         PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); // 新建一个Chunk 
         long handle = c.allocate(normCapacity); // 内存分配
         assert handle > 0;
         c.initBuf(buf, handle, reqCapacity);
         qInit.add(c); // 添加到初始化的chunk链表
     }

    上面的分配顺序,大家想一下为什么不是从q000开始分配呢?我找了一段分析的很好的。

    在分析PoolChunkList的时候,我们知道一个chunk随着内存的不停释放,它本身会不停的往其所在的chunk list的prev list移动,直到其完全释放后被回收。 如果这里是从q000开始尝试分配,虽然分配的速度可能更快了(因为分配成功的几率更大),但一个chunk在使用率为25%以内时有更大几率再分配,也就是一个chunk被回收的几率大大降低了。这样就带来了一个问题,我们的应用在实际运行过程中会存在一个访问高峰期,这个时候内存的占用量会是平时的几倍,因此会多分配几倍的chunk出来,而等高峰期过去以后,由于chunk被回收的几率降低,内存回收的进度就会很慢(因为没被完全释放,所以无法回收),内存就存在很大的浪费。

    为什么是从q050开始尝试分配呢,q050是内存占用50%~100%的chunk,猜测是希望能够提高整个应用的内存使用率,因为这样大部分情况下会使用q050的内存,这样在内存使用不是很多的情况下一些利用率低(<50%)的chunk慢慢就会淘汰出去,最终被回收。然而为什么不是从qinit中开始呢,这里的chunk利用率低,但又不会被回收,岂不是浪费?q075,q100由于使用率高,分配成功的几率也会更小,因此放到最后。

    上面也有提到新建一个chunk,这里我想特殊说明两个byte数组,数组的大小是4096

            memoryMap = new byte[maxSubpageAllocs << 1];
            depthMap = new byte[memoryMap.length];
            int memoryMapIndex = 1;
            for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
                int depth = 1 << d;
                for (int p = 0; p < depth; ++ p) {
                    // in each level traverse left to right and set value to the depth of subtree
                    memoryMap[memoryMapIndex] = (byte) d;
                    depthMap[memoryMapIndex] = (byte) d;
                    memoryMapIndex ++;
                }
            }

    循环之后的数组是怎么样子的呢?

    MemoryMap存放分配信息,depthMap存放节点的高度信息 ;初始状态时,memoryMap和depthMap相等,depthMap的值初始化后不再改变,memoryMap的值则随着节点分配而改变。

    下标代表了平衡二叉树的节点序号,值代表该节点的高度值。

     比如MemoryMap[1024] = 10, MemoryMap[2048] = 11. 好了,这个说完了,我们继续看:

    1     long allocate(int normCapacity) {
    2         if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize  >= 8K
    3             return allocateRun(normCapacity);
    4         } else {
    5             return allocateSubpage(normCapacity); // < 8K
    6         }
    7     }
     1     private long allocateSubpage(int normCapacity) {
     2         // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
     3         // This is need as we may add it back and so alter the linked-list structure.
     4         PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); // 根据请求内存大小,获取下标,然后获取对应的subPage
     5         synchronized (head) {
     6             int d = maxOrder; // 最大层数 11
     7             int id = allocateNode(d);  // 分配节点,修改涉及节点层数, 具体看下面的分析
     8             if (id < 0) { // 无可分配节点
     9                 return id;
    10             }
    11 
    12             final PoolSubpage<T>[] subpages = this.subpages;
    13             final int pageSize = this.pageSize; 
    14 
    15             freeBytes -= pageSize;// 16M(初始化值,后面会逐渐减少) - 8K
    16 
    17             int subpageIdx = subpageIdx(id); // 2048 对应的偏移量是 0 , 2049 对应的偏移量是 1 。。。4095 对应偏移量是 2047 
    18             PoolSubpage<T> subpage = subpages[subpageIdx]; // 取对应下标的 subpage
    19             if (subpage == null) {
    20                 subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); // 创建PoolSubpage
    21                 subpages[subpageIdx] = subpage;
    // 新建或初始化subpage并加入到chunk的subpages数组,同时将subpage加入到arena的subpage双向链表中,最后完成分配请求的内存。
    // 代码中,subpage != null的情况产生的原因是:subpage初始化后分配了内存,但一段时间后该subpage分配的内存释放并从arena的双向链表中删除,
    // 此时subpage不为null,当再次请求分配时,只需要调用init()将其加入到areana的双向链表中即可。
    22 } else { 23 subpage.init(head, normCapacity); 24 } 25 return subpage.allocate(); // 最后结果是一个long整数,其中低32位表示二叉树中的分配的节点,高32位表示subPage中分配的具体位置 26 } 27 }

     修改节点对应的层数,底下这个方法涉及了很多的位运算,这里大家要仔细琢磨。

     1     private int allocateNode(int d) { // d = 11
     2         int id = 1;
     3         int initial = - (1 << d); // -2048
     4         byte val = value(id); // memoryMap[id] , 一般1对应的层数是 0 ,当第一个节点被分配完成后,这个节点的值会变成12
     5         if (val > d) { // unusable // 如果分配完成,那么无法进行分配,那么就是 12 > 11 ,直接返回 -1 
     6             return -1;
     7         }
     8         while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
     9             id <<= 1; // 左移1位
    10             val = value(id); // 取到节点对应的层数
    11             if (val > d) { // 如果当前的值大于层数,也就是说左子节点用完了
    12                 id ^= 1; // 当前节点值 +1 ,取右节点
    13                 val = value(id); // 取右子节点
    14             }
    15         }
    16         byte value = value(id); 
    17         assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
    18                 value, id & initial, d);
    19         setValue(id, unusable); // mark as unusable //将该节点的值设置为12,不可用
    20         updateParentsAlloc(id); // 将该节点对应的所有父节点,层数全部 + 1
    21         return id; 
    22     }
     1     private void updateParentsAlloc(int id) {
     2         while (id > 1) {
     3             int parentId = id >>> 1; // 取父级节点
     4             byte val1 = value(id); // 
     5             byte val2 = value(id ^ 1); // 如果 id = 2048 那么 id ^ 1 = 2049 ,  如果 id = 2049 这里  id ^ 1 = 2048 这里一定要注意!!!
     6             byte val = val1 < val2 ? val1 : val2; 
     7             setValue(parentId, val);  // 修改父级对应层数
     8             id = parentId; 
     9         }
    10     }

    上面这个方法很有意思,大家要仔细体会,我举个例子

     最开始是这样的,我们开始第一次分配,那么就是2048号节点,分配完成后会变成

     

     这时候由于2048变成了12,不可用状态,那么取右节点进行分配

        下一次再进行分配的时候, 发现 1024 是 12 ,那么取右子节点, 是 1025 = 10 ,因为 10 < 11 所以再次进入循环,找到2050节点,对应层数是 11 < 12 ,则分配 2050节点分配后变成

      

     然后依次类推。

    1     private long allocateRun(int normCapacity) {
    2         int d = maxOrder - (log2(normCapacity) - pageShifts); // 计算满足需求的节点的高度 比如 16K , 那么 就是 d= 11 - (14 - 13) = 10
    3         int id = allocateNode(d); // 从第十层找空闲节点, 原理跟上面相同,就不说了
    4         if (id < 0) {
    5             return id;
    6         }
    7         freeBytes -= runLength(id); // 分配后剩余的字节数
    8         return id;
    9     }

    说到这里,内存模型的重点部分算是说完了。如果整个Netty系列有任何不对的地方,欢迎大家指出,我会虚心改正。

          后面呢,我将打算将Spring的源码的关键部分进行分析,期间也会穿插着对Redis的底层数据结构等重要内容进行分析。有需要的童鞋们,记得关注我,随时可以获取最新的消息。

    谢谢。

        

  • 相关阅读:
    spring boot 配置rabbitmq启动遇到的问题
    linux 启动rabbitmq 报错:
    spring boot 读取自定义properties文件
    jvm笔记-02-垃圾收集器与内存分配策略
    Jvm笔记-01-Java运行时数据区域与对象
    C++ 14 重载操作符与转换
    C++ 13 复制控制
    C++ 12 类
    C++ 11 泛型算法、迭代器
    android studio 问题及处理记录
  • 原文地址:https://www.cnblogs.com/huxipeng/p/11357150.html
Copyright © 2011-2022 走看看