zoukankan      html  css  js  c++  java
  • Netty源码解析 -- PoolChunk实现原理

    本文主要分享Netty中PoolChunk如何管理内存。
    源码分析基于Netty 4.1.52

    内存管理算法

    首先说明PoolChunk内存组织方式。
    PoolChunk的内存大小默认是16M,Netty将它划分为2048个page,每个page为8K。
    PoolChunk上可以分配Normal内存块。
    Normal内存块大小必须是page的倍数。

    PoolChunk通过runsAvail字段管理内存块。
    PoolChunk#runsAvail是PriorityQueue数组,其中PriorityQueue存放的是handle。
    handle可以理解为一个句柄,维护一个内存块的信息,由以下部分组成

    • o: runOffset ,在chunk中page偏移索引,从0开始,15bit
    • s: size,当前位置可分配的page数量,15bit
    • u: isUsed,是否使用?, 1bit
    • e: isSubpage,是否在subpage中, 1bit
    • b: bitmapIdx,内存块在subpage中的索引,不在subpage则为0, 32bit

    前面《内存对齐类SizeClasses》文章说过,SizeClasses将sizeClasses表格中isMultipageSize为1的行取出可以组成一个新表格,这里称为Page表格

    runsAvail数组默认长度为40,每个位置index上放的handle代表了存在一个可用内存块,并且可分配pageSize大于等于(pageIdx=index)上的pageSize,小于(pageIdex=index+1)的pageSize。
    如runsAvail[11]上的handle的size可分配pageSize可能为16 ~ 19,
    假如runsAvail[11]上handle的size为18,如果该handle分配了7个page,剩下的11个page,这时要将handle移动runsAvail[8](当然,handle的信息要调整)。
    这时如果要找分配6个page,就可以从runsAvail[5]开始查找runsAvail数组,如果前面runsAvail[5]~runsAvail[7]都没有handle,就找到了runsAvail[8]。
    分配6个page之后,剩下的5个page,handle移动runsAvail[4]。

    先看一下PoolChunk的构造函数

    PoolChunk(PoolArena<T> arena, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx, int offset) {
        // #1
        unpooled = false;
        this.arena = arena;
        this.memory = memory;
        this.pageSize = pageSize;
        this.pageShifts = pageShifts;
        this.chunkSize = chunkSize;
        this.offset = offset;
        freeBytes = chunkSize;
    
        runsAvail = newRunsAvailqueueArray(maxPageIdx);
        runsAvailMap = new IntObjectHashMap<Long>();
        subpages = new PoolSubpage[chunkSize >> pageShifts];
    
        // #2
        int pages = chunkSize >> pageShifts;
        long initHandle = (long) pages << SIZE_SHIFT;
        insertAvailRun(0, pages, initHandle);
    
        cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
    }
    

    #1
    unpooled: 是否使用内存池
    arena:该PoolChunk所属的PoolArena
    memory:底层的内存块,对于堆内存,它是一个byte数组,对于直接内存,它是(jvm)ByteBuffer,但无论是哪种形式,其内存大小默认都是16M。
    pageSize:page大小,默认为8K。
    chunkSize:整个PoolChunk的内存大小,默认为16777216,即16M。
    offset:底层内存对齐偏移量,默认为0。
    runsAvail:初始化runsAvail
    runsAvailMap:记录了每个内存块开始位置和结束位置的runOffset和handle映射。

    #2 insertAvailRun方法在runsAvail数组最后位置插入一个handle,该handle代表page偏移位置为0的地方可以分配16M的内存块

    内存分配

    PoolChunk#allocate

    boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
        final long handle;
        // #1
        if (sizeIdx <= arena.smallMaxSizeIdx) {
            // small
            handle = allocateSubpage(sizeIdx);
            if (handle < 0) {
                return false;
            }
            assert isSubpage(handle);
        } else {
            // #2
            int runSize = arena.sizeIdx2size(sizeIdx);
            handle = allocateRun(runSize);
            if (handle < 0) {
                return false;
            }
        }
    
        // #3
        ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
        initBuf(buf, nioBuffer, handle, reqCapacity, cache);
        return true;
    }
    

    #1 处理Small内存块申请,调用allocateSubpage方法处理,后续文章解析。
    #2 处理Normal内存块申请
    sizeIdx2size方法根据内存块索引查找对应内存块size。sizeIdx2size是PoolArena父类SizeClasses提供的方法,可参考系列文章《内存对齐类SizeClasses》。
    allocateRun方法负责分配Normal内存块,返回handle存储了分配的内存块大小和偏移量。

    #3 使用handle和底层内存类(ByteBuffer)初始化ByteBuf了。

    private long allocateRun(int runSize) {
        // #1
        int pages = runSize >> pageShifts;
        // #2
        int pageIdx = arena.pages2pageIdx(pages);
    
        synchronized (runsAvail) {
            //find first queue which has at least one big enough run
            // #3
            int queueIdx = runFirstBestFit(pageIdx);
            if (queueIdx == -1) {
                return -1;
            }
    
            //get run with min offset in this queue
            PriorityQueue<Long> queue = runsAvail[queueIdx];
            long handle = queue.poll();
    
            assert !isUsed(handle);
            // #4
            removeAvailRun(queue, handle);
            // #5
            if (handle != -1) {
                handle = splitLargeRun(handle, pages);
            }
            // #6
            freeBytes -= runSize(pageShifts, handle);
            return handle;
        }
    }
    

    #1 计算所需的page数量
    #2 计算对应的pageIdx
    注意,pages2pageIdx方法会将申请内存大小对齐为上述Page表格中的一个size。例如申请172032字节(21个page)的内存块,pages2pageIdx方法计算结果为13,实际分配196608(24个page)的内存块。
    #3 从pageIdx开始遍历runsAvail,找到第一个handle。
    该handle上可以分配所需内存块。
    #4 从runsAvail,runsAvailMap移除该handle信息
    #5#3步骤找到的handle上划分出所要的内存块。
    #6 减少可用内存字节数

    private long splitLargeRun(long handle, int needPages) {
        assert needPages > 0;
    
        // #1
        int totalPages = runPages(handle);
        assert needPages <= totalPages;
    
        int remPages = totalPages - needPages;
    
        // #2 
        if (remPages > 0) {
            int runOffset = runOffset(handle);
    
            // keep track of trailing unused pages for later use
            int availOffset = runOffset + needPages;
            long availRun = toRunHandle(availOffset, remPages, 0);
            insertAvailRun(availOffset, remPages, availRun);
    
            // not avail
            return toRunHandle(runOffset, needPages, 1);
        }
    
        //mark it as used
        handle |= 1L << IS_USED_SHIFT;
        return handle;
    }
    

    #1 totalPages,从handle中获取当前位置可用page数。
    remPages,分配后剩余page数。
    #2 剩余page数大于0
    availOffset,计算剩余page开始偏移量
    生成一个新的handle,availRun
    insertAvailRun将availRun插入到runsAvail,runsAvailMap中

    内存释放

    void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
        ...
    
        // #1
        int pages = runPages(handle);
    
        synchronized (runsAvail) {
            // collapse continuous runs, successfully collapsed runs
            // will be removed from runsAvail and runsAvailMap
            // #2
            long finalRun = collapseRuns(handle);
    
            // #3
            finalRun &= ~(1L << IS_USED_SHIFT);
            //if it is a subpage, set it to run
            finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
            insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
            freeBytes += pages << pageShifts;
        }
    
        if (nioBuffer != null && cachedNioBuffers != null &&
            cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
            cachedNioBuffers.offer(nioBuffer);
        }
    }
    

    #1 计算释放的page数
    #2 如果可以,将前后的可用内存块进行合并
    #3 插入新的handle

    collapseRuns

    private long collapseRuns(long handle) {
        return collapseNext(collapsePast(handle));
    }
    

    collapsePast方法合并前面的可用内存块
    collapseNext方法合并后面的可用内存块

    private long collapseNext(long handle) {
        for (;;) {
            // #1
            int runOffset = runOffset(handle);
            int runPages = runPages(handle);
    
            Long nextRun = getAvailRunByOffset(runOffset + runPages);
            if (nextRun == null) {
                return handle;
            }
    
            int nextOffset = runOffset(nextRun);
            int nextPages = runPages(nextRun);
    
            //is continuous
            // #2
            if (nextRun != handle && runOffset + runPages == nextOffset) {
                //remove next run
                removeAvailRun(nextRun);
                handle = toRunHandle(runOffset, runPages + nextPages, 0);
            } else {
                return handle;
            }
        }
    }
    

    #1 getAvailRunByOffset方法从runsAvailMap中找到下一个内存块的handle。
    #2 如果是连续的内存块,则移除下一个内存块handle,并将其page合并生成一个新的handle。

    下面来看一个例子

    大家可以结合例子中runsAvail和内存使用情况的变化,理解上面的代码。
    实际上,2个Page的内存块是通过Subpage分配,回收时会放回线程缓存中而不是直接释放存块,但为了展示PoolChunk中内存管理过程,图中不考虑这些场景。

    PoolChunk在Netty 4.1.52版本修改了算法,引入了jemalloc 4的算法 -- https://github.com/netty/netty/commit/0d701d7c3c51263a1eef56d5a549ef2075b9aa9e#diff-6850686cf7ebc7b9ddb873389ded45ebf40e6c1ccf411c44b744e7d3ca2ff774
    Netty 4.1.52之前的版本,PoolChunk引入的是jemalloc 3的算法,使用二叉树管理内存块。有兴趣的同学可以参考我后续的文章《PoolChunk实现(jemalloc 3的算法)》

    如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力!

  • 相关阅读:
    Analysis of Hello2 source code
    CORS’s source, Principle and Implementation
    CDI Features(EL(SPEL),Decorator,Interceptor,Producer)
    Java Design Patterns(2)
    Cookie and Session
    Vue错误信息解决
    cdh搭建仓库
    cdh本地源安装-自用
    创建本地repo源
    dockerfile:python-cuda-nvidia-cudnn
  • 原文地址:https://www.cnblogs.com/binecy/p/14092637.html
Copyright © 2011-2022 走看看