zoukankan      html  css  js  c++  java
  • [netty4][netty-buffer]netty之池化buffer

    PooledByteBufAllocator buffer分配

    buffer分配的入口:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(int, int)
    netty实际应用时分配调用栈:

    CLASS_NAMEMETHOD_NAMELINE_NUM
    io/netty/buffer/PooledByteBufAllocatornewDirectBuffer339
    io/netty/buffer/AbstractByteBufAllocatordirectBuffer185
    io/netty/buffer/AbstractByteBufAllocatordirectBuffer176
    io/netty/buffer/AbstractByteBufAllocatorioBuffer139
    io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandleallocate114
    io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsaferead186
    io/netty/channel/nio/NioEventLoopprocessSelectedKey682
    io/netty/channel/nio/NioEventLoopprocessSelectedKeysOptimized628
    io/netty/channel/nio/NioEventLoopprocessSelectedKeys533
    io/netty/channel/nio/NioEventLooprun511
    io/netty/util/concurrent/SingleThreadEventExecutor$5run956

    测试case代码

    package io.netty.buffer;
    
    import org.junit.Assert;
    public class PooledByteBufTest {
    
    	public static void main(String[] args) {
    		  final PooledByteBufAllocator allocator = new PooledByteBufAllocator(
    	                false,   // preferDirect
    	                0,      // nHeapArena
    	                1,      // nDirectArena
    	                8192,   // pageSize
    	                11,     // maxOrder
    	                3,      // tinyCacheSize
    	                3,      // smallCacheSize
    	                3,      // normalCacheSize
    	                true    // useCacheForAllThreads
    	                );
    
    	        // create tiny buffer
    	        final ByteBuf b1 = allocator.directBuffer(24);
    	        // create small buffer
    	        final ByteBuf b2 = allocator.directBuffer(800);
    	        // create normal buffer
    	        final ByteBuf b3 = allocator.directBuffer(8192 * 2);
    
    	        Assert.assertNotNull(b1);
    	        Assert.assertNotNull(b2);
    	        Assert.assertNotNull(b3);
    
    	        // then release buffer to deallocated memory while threadlocal cache has been disabled
    	        // allocations counter value must equals deallocations counter value
    	        Assert.assertTrue(b1.release());
    	        Assert.assertTrue(b2.release());
    	        Assert.assertTrue(b3.release());
    	}
    }
    

    PoolChunk

    PoolChunk本身数据结构与设计思路参见PoolChunk注释:

    /**
     * Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
     *
     * Notation: The following terms are important to understand the code
     * > page  - a page is the smallest unit of memory chunk that can be allocated
     * page是chunk中能分配的最小单元  
     * > chunk - a chunk is a collection of pages
     * 一个chunk中有一组page  1对多  
     * > in this code chunkSize = 2^{maxOrder} * pageSize
     * 代码中  chunksize大小计算如上  maxOrder 是啥?
     *
     * To begin we allocate a byte array of size = chunkSize
     * Whenever a ByteBuf of given size needs to be created we search for the first position
     * in the byte array that has enough empty space to accommodate the requested size and
     * return a (long) handle that encodes this offset information, (this memory segment is then
     * marked as reserved so it is always used by exactly one ByteBuf and no more)
     * 首先,当需要创建给定大小的ByteBuf时,我们分配一个size=chunkSize的字节数组,
     * 在字节数组中搜索第一个有足够的空空间来容纳请求的大小的位置,
     * 并返回一个(长)句柄来编码该偏移量信息(然后将该内存段标记为保留,因此它总是仅由一个ByteBuf使用,不再使用)
     *
     * For simplicity all sizes are normalized according to PoolArena#normalizeCapacity method
     * This ensures that when we request for memory segments of size >= pageSize the normalizedCapacity
     * equals the next nearest power of 2
     * 为了简单起见,所有大小都按照PoolArena#normalizeCapacity方法进行规范化
     * 这确保当我们请求大小大于等于pageSize的内存段时,normalized容量等于下一个最接近的2的幂
     *
     * To search for the first offset in chunk that has at least requested size available we construct a
     * complete balanced binary tree and store it in an array (just like heaps) - memoryMap
     * 为了搜索块中至少有请求大小可用的第一个偏移量,我们构造了一个完整的平衡二叉树,并将其存储在一个数组(就像堆一样)-内存映射中
     *
     * The tree looks like this (the size of each node being mentioned in the parenthesis)
     * 树看起来是这样的(括号中提到的每个节点的大小)
     *
     * depth=0        1 node (chunkSize)
     * depth=1        2 nodes (chunkSize/2)
     * ..
     * ..
     * depth=d        2^d nodes (chunkSize/2^d)
     * ..
     * depth=maxOrder 2^maxOrder nodes (chunkSize/2^{maxOrder} = pageSize)  pageSize 在最下一层  最顶层是chunksize 从上往下走,每过一层除以2  
     *
     * depth=maxOrder is the last level and the leafs consist of pages
     *
     * With this tree available searching in chunkArray translates like this:
     * To allocate a memory segment of size chunkSize/2^k we search for the first node (from left) at height k
     * which is unused 要分配大小为chunkSize/2^k的内存段,我们在高度k处搜索第一个未使用的节点(从左开始)。 嗯嗯
     *
     * Algorithm:
     * ----------
     * Encode the tree in memoryMap with the notation  用符号将树编码在内存中
     *   memoryMap[id] = x => in the subtree rooted at id, the first node that is free to be allocated
     *   is at depth x (counted from depth=0) i.e., at depths [depth_of_id, x), there is no node that is free
     * 在以id为根的子树中,可自由分配的第一个节点在深度x(从深度=0开始计算),即在深度[深度id,x的深度]处,没有可自由分配的节点
     *
     *  As we allocate & free nodes, we update values stored in memoryMap so that the property is maintained
     * 当我们分配空闲节点时,我们更新存储在memoryMap中的值,以便维护属性
     *
     * Initialization -
     *   In the beginning we construct the memoryMap array by storing the depth of a node at each node
     * 首先,我们通过在每个节点上存储一个节点的深度来构造memoryMap数组
     *     i.e., memoryMap[id] = depth_of_id
     *
     * Observations:
     * -------------
     * 1) memoryMap[id] = depth_of_id  => it is free / unallocated
     * 2) memoryMap[id] > depth_of_id  => at least one of its child nodes is allocated, so we cannot allocate it, but
     *                                    some of its children can still be allocated based on their availability
     * 3) memoryMap[id] = maxOrder + 1 => the node is fully allocated & thus none of its children can be allocated, it
     *                                    is thus marked as unusable
     *
     * Algorithm: [allocateNode(d) => we want to find the first node (from left) at height h that can be allocated]
     * ----------
     * 1) start at root (i.e., depth = 0 or id = 1)
     * 2) if memoryMap[1] > d => cannot be allocated from this chunk
     * 3) if left node value <= h; we can allocate from left subtree so move to left and repeat until found
     * 4) else try in right subtree
     *
     * Algorithm: [allocateRun(size)]
     * ----------
     * 1) Compute d = log_2(chunkSize/size)
     * 2) Return allocateNode(d)
     *
     * Algorithm: [allocateSubpage(size)]
     * ----------
     * 1) use allocateNode(maxOrder) to find an empty (i.e., unused) leaf (i.e., page)
     * 2) use this handle to construct the PoolSubpage object or if it already exists just call init(normCapacity)
     *    note that this PoolSubpage object is added to subpagesPool in the PoolArena when we init() it
     *
     * Note:
     * -----
     * In the implementation for improving cache coherence,
     * we store 2 pieces of information depth_of_id and x as two byte values in memoryMap and depthMap respectively
     *
     * memoryMap[id]= depth_of_id  is defined above
     * depthMap[id]= x  indicates that the first node which is free to be allocated is at depth x (from root)
     */
    final class PoolChunk<T> implements PoolChunkMetric {
    

    io.netty.buffer.PoolArena.findSubpagePoolHead(int) 算出page header在page table中的index,小的page在前面

    // trace 库地址 jdbc:h2:/Users/simon/twice-cooked-pork/trace-data/基于netty4做的resetserver的一次http请求trace/tracer.data.h2db

    PoolChunk要解决的问题有:

    1. 快速查找未分配的地方并分配
    2. 尽量不要有碎片,可以理解成尽量挨着紧凑的分配

    整个chunk的结构如下:

                                                        +------+   chunksize 当L=11时,是16M
    L=0                                                 |   0  |
                                       +----------------+------+------------------+
                                       |                                          |
                                       |                                          |
                                       |                                          |
                                   +---v--+                                   +---v--+
    L=1                            |   1  |                                   |   2  |
                            +------+------+------+                     +------+------+-------+
                            |                    |                     |                     |
                            |                    |                     |                     |
                            |                    |                     |                     |
                        +---v--+             +---v--+              +---v--+              +---v--+
    L=2                 |   3  |             |   4  |              |   5  |              |   6  |
                     +--+------+-+         +-+------+--+        +--+------+--+         +-+------+--+
                     |           |         |           |        |            |         |           |
                     |           |         |           |        |            |         |           |
                     |           |         |           |        |            |         |           |
                  +--v---+   +---v--+   +--v---+   +---v--+   +-v----+   +---v--+   +--v---+   +---v--+
    L=3           |  7   |   |   8  |   |  9   |   |  10  |   |  11  |   |  12  |   |  13  |   |  14  |
                  +------+   +------+   +------+   +------+   +------+   +------+   +------+   +------+
                   8K大小即page size
    

    是一个完全二叉树,树的层高可以自定义,目前限制在14层内,默认是11层。
    最底层是真正的chunk描述,最底层每个叶子是一个paage,大小为8K。那么当层数是11层时,chunk的size是16M。因为11层的话,最下面一层叶子是2的11次方,再乘以8K正好是16MB。
    这棵树中每个节点还对对应其相应的大小是否被分配。什么叫其相应的大小?是这样的,每一层代表需要分配的大小的档次。暂且用档次这个词吧。最上面是16MB档次,最下面是8K档次,从最上面开始往下走一层,档次就除以2。
    每次申请内存时,netty会先对其做规格化,所谓规格化就是最接近申请内存值的2de整数次幂。比如我申请900byte,那么规格化后就是1K。在规格化后,netty会在树上标志 0 1 3 7被使用了。下次要再申请8K内存时就要避开这个路径了,只能是 0 1 3 8 了,因为7那边已经不够了。其他大小同理。所以树上的节点是为了标志是否被使用过,以使得内存碎片减少尽量靠左紧凑分配。 对于单page内的内存使用浪费问题,netty又做了一层位图结构使其得以利用。对于chunk对象的查找,netty还做了缓存机制,下面有讲。

    真正数据存放在 io.netty.buffer.PoolChunk.memory 这个字段中,调试时为:java.nio.DirectByteBuffer[pos=0 lim=16777216 cap=16777216]
    16777216是16M

    作业

    仔细调试 1K 2k 3K 8K 11K 内存的多次分配与回收。

    分配24byte过程

    PooledUnsafeDirectByteBuf是用了对象池特性io.netty.buffer.PooledUnsafeDirectByteBuf.RECYCLER

    PoolArena

    PoolArena 这一层负责创建与维护PoolChunk,维护的方式是将用到的正在分配中的PoolChunk放到PoolChunkList这个列表中。
    PoolChunkList是一个链是结构。
    而且,PoolArena还按PoolChunk的使用量分别维护到相对应的PoolChunkList中。

    // abstract class PoolArena<T> implements PoolArenaMetric {
    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;
    

    这些PoolChunkList也是按使用量大小有序的链式的串在一起(参见PoolArena构造方法中初始化这些list字段的代码),当使用量达到本级别时,会加入到下一级别的list中,比如达到25%了,那么就会加到50%列表中了。(参见io.netty.buffer.PoolChunkList.add(PoolChunk))

    void add(PoolChunk<T> chunk) {
        if (chunk.usage() >= maxUsage) {
            nextList.add(chunk);
            return;
        }
        add0(chunk);
    }
    

    PoolArena中还维护了两个PoolSubpage数组,每个数组里面的实例在PoolArena构造时初始化,刚初始化后每个PoolSubpage元素的前继与后继元素都是指向自己(PoolSubpage是支持链表式的一个结构)
    在io.netty.buffer.PoolSubpage.addToPool(PoolSubpage)时 会将io.netty.buffer.PoolChunk.allocateSubpage(int)过程中新构建出来的PoolSubpage实例加到head的next节点上(即后继节点)。 具体代码如下:

        private long allocateSubpage(int normCapacity) {
            // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
            // This is need as we may add it back and so alter the linked-list structure.
            PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); // 这个是查找PoolArena的PoolSubpage数组
            int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
            synchronized (head) {
                int id = allocateNode(d);
                if (id < 0) {
                    return id;
                }
    
                final PoolSubpage<T>[] subpages = this.subpages;
                final int pageSize = this.pageSize;
    
                freeBytes -= pageSize;
    
                int subpageIdx = subpageIdx(id);
                PoolSubpage<T> subpage = subpages[subpageIdx];
                if (subpage == null) {
                    subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); // 此处会将新新构建出来的PoolSubpage实例加到head的next节点
                    subpages[subpageIdx] = subpage;
                } else {
                    subpage.init(head, normCapacity);
                }
                return subpage.allocate();
            }
        }
    

    PoolArenad的cache与Recycler对象池

    PooledByteBuf依赖PoolThreadCache做了一层对PoolChunk的缓存,PoolThreadCache靠MemoryRegionCache实现缓存。MemoryRegionCache靠队列来实现对PoolChunk的缓存(参见下面代码1),MemoryRegionCache在buf释放时会调用其add接口将释放的PoolChunk对象和nioBuffer对象通过io.netty.buffer.PoolThreadCache.MemoryRegionCache.Entry对象包装后加入(offer)到队列(参见下面堆栈1)。在io.netty.buffer.PoolThreadCache.MemoryRegionCache.allocate(PooledByteBuf, int)时再从队列中直接poll出来,达成cache的目的。优化还没有结束,包装PoolChunk用的Entry对象是通过Recycler对象池完成分配(获取)已释放的。对象是本质上一个通过FastThreadLocal的Stack的数据结构,分配对应出栈,释放对象入栈。具体参见下面代码2。
    Recycler
    是一个基于ThreadLocal结合stack玩起来的一个对象池数据结构,像上述这种就是PooledUnsafeDirectByteBuf的对象pool。回收的时候压栈,要用的时候出栈。
    获取对象 io.netty.util.Recycler.get()
    回收对象 io.netty.util.Recycler.DefaultHandle.recycle(Object)

    代码1: 队列初始化

    Queue<Entry<T>> queue = PlatformDependent.newFixedMpscQueue(this.size);
    

    堆栈1:buf释放时会调用MemoryRegionCache add接口将释放的PoolChunk对象包装后入队:

    Thread [main] (Suspended (breakpoint at line 393 in PoolThreadCache$MemoryRegionCache))	
    	PoolThreadCache$SubPageMemoryRegionCache<T>(PoolThreadCache$MemoryRegionCache<T>).add(PoolChunk<T>, ByteBuffer, long) line: 393	
    	PoolThreadCache.add(PoolArena<?>, PoolChunk, ByteBuffer, long, int, SizeClass) line: 209	
    	PoolArena$DirectArena(PoolArena<T>).free(PoolChunk<T>, ByteBuffer, long, int, PoolThreadCache) line: 273	
    	PooledUnsafeDirectByteBuf(PooledByteBuf<T>).deallocate() line: 171	
    	PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release0(int) line: 136	
    	PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release() line: 124	
    	PooledByteBufTest.main(String[]) line: 43	
    

    代码2:Entry对象使用对象池

    private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
        @SuppressWarnings("unchecked")
        @Override
        protected Entry newObject(Handle<Entry> handle) {
            return new Entry(handle);
        }
    };
    
    private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
        Entry entry = RECYCLER.get();
        entry.chunk = chunk;
        entry.nioBuffer = nioBuffer;
        entry.handle = handle;
        return entry;
    }
    
    @Override
    public void recycle(Object object) {
        if (object != value) {
            throw new IllegalArgumentException("object does not belong to handle");
        }
    
        Stack<?> stack = this.stack;
        if (lastRecycledId != recycleId || stack == null) {
            throw new IllegalStateException("recycled already");
        }
    
        stack.push(this);
    }
    

    PooledByteBufAllocator创建及其关联细节

    1. PooledByteBufAllocator validateAndCalculateChunkSize 校验树高度不能超过14,且根据pageSize(可以外部指定)和树高计算出chunksize
    2. PooledByteBufAllocator validateAndCalculatePageShifts 校验pageSize最小不能小于4K,且pageSize必须是2的整数次方((pageSize & pageSize - 1) != 0) (为什么(pageSize & pageSize - 1) != 0能判断?因为2的n次方的二进制形式一定是第一位1后面接n个0,减1后就变成第一位0后面接n个1,相与之后一定是0;如果不是2的n次方的数的二进制形式一定是第一位是1,且,这个数减去1后,第一位一定还是1,因为第一位是1且后面全接0的数一定是2的整数次方的,那么不是2的整数次方的数后面一定不全是0,所以减去1后第一位肯定还是1,所以不管后面接的这些数相与是怎样的结果,第一位两个1相与出来肯定是1,肯定不为0,所以能用这个办法判断)
    3. 创建tinySubpagePools数组并初始化里面的元素,默认数组大小32个,里面的元素是PoolSubpage,PoolSubpage还支持链式形式连接(他有前继和后继)

    PoolChunk 分配与释放小于pagesize的buf

    io.netty.buffer.PoolArena.free(PoolChunk, ByteBuffer, long, int, PoolThreadCache)
    位图相关:

    // long64位 取 高32位转成整数
    private static int bitmapIdx(long handle) {
            return (int) (handle >>> Integer.SIZE);
        }
    

    PoolSubpage 支持位图
    一个page 8192大小 一个块(element)大小32,那么一个page可以拆成256个,每申请一次numAvail减去1。
    long型位图数组中有个8个元素,8192/16/64=8, 64是long的位数,。

    分配时bitmap中元素,以第一个元素为例子,按1 3 7 15 31 63 127网上涨,释放的时候按对应数据往下减,并且在释放时记录nextAvail值,便于下次申请时优先使用。
    bitmap中的4个(bitmapLength)long来维护256个(maxNumElems=pageSize/elemSize)块是否使用的情况。

    final class PoolSubpage<T> implements PoolSubpageMetric {
    
        final PoolChunk<T> chunk;
        private final int memoryMapIdx;
        private final int runOffset;
        private final int pageSize;
        private final long[] bitmap;  // 位图...,默认有8个元素 个数= pagesize >>> 10 (pagesize / 16 / 64)64应该是long的位数,16是啥?一个element算256。 实际这个数组默认只用4个元素
    
        PoolSubpage<T> prev;
        PoolSubpage<T> next;
    
        boolean doNotDestroy;
        int elemSize;
        private int maxNumElems; // 一个page再分maxNumElems分  默认是256
        private int bitmapLength; // 默认是4  256 >>> 6 = 4
        private int nextAvail; // 在有buf释放时会设置这个值,以使得他们在下次分配时优先使用这个
        private int numAvail;
        
        long allocate() {
            if (elemSize == 0) {
                return toHandle(0);
            }
    
            if (numAvail == 0 || !doNotDestroy) {
                return -1;
            }
    
            final int bitmapIdx = getNextAvail();
            int q = bitmapIdx >>> 6;
            int r = bitmapIdx & 63;
            assert (bitmap[q] >>> r & 1) == 0;
            bitmap[q] |= 1L << r;  // 按1 3 7 15 31 63 127往上涨
    
            if (-- numAvail == 0) {
                removeFromPool();
            }
    
            return toHandle(bitmapIdx);
        }
        
        
        private int findNextAvail() {
            final long[] bitmap = this.bitmap;
            final int bitmapLength = this.bitmapLength;
            for (int i = 0; i < bitmapLength; i ++) {
                long bits = bitmap[i];
                if (~bits != 0) { // 这个表示这个long上是否所有的位都用完了。。
                    return findNextAvail0(i, bits);
                }
            }
            return -1;
        }
        
        private int findNextAvail0(int i, long bits) {
            final int maxNumElems = this.maxNumElems;
            final int baseVal = i << 6;
    
            for (int j = 0; j < 64; j ++) {
                if ((bits & 1) == 0) { // 判断是否是偶数
                    int val = baseVal | j;
                    if (val < maxNumElems) {
                        return val;
                    } else {
                        break;
                    }
                }
                bits >>>= 1; // 除以2 并向靠近的2的整数次幂对齐
            }
            return -1;
        }
    

    free时不是每次都会真正释放,在下面会先加入到MemoryRegionCache的queue中cache起来,当queue中放不下时才真正free,代码如下:

    // PoolArena.class
    void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
        if (chunk.unpooled) {
            int size = chunk.chunkSize();
            destroyChunk(chunk);
            activeBytesHuge.add(-size);
            deallocationsHuge.increment();
        } else {
            SizeClass sizeClass = sizeClass(normCapacity);
            if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
                // cached so not free it.
                return;
            }
    
            freeChunk(chunk, handle, sizeClass, nioBuffer);
        }
    }
    
  • 相关阅读:
    [TimLinux] CSS 纯CSS实现动画展开/收起功能
    [TimLinux] CSS pre超长自动换行
    j2ee之struts2表单细节处理
    j2ee之struts2的国际化数据方式
    j2ee之struts2拦截器()
    j2ee之struts2文件下载
    j2ee之struts2文件上传
    j2ee国际化数据方式
    j2ee监听器的实现及配置方法
    j2ee过滤器实现的主要代码
  • 原文地址:https://www.cnblogs.com/simoncook/p/11967186.html
Copyright © 2011-2022 走看看