zoukankan      html  css  js  c++  java
  • Netty内存池源码二 (PooledByteBufAllocator)

    Netty-内存池源码二 (PooledByteBufAllocator)

    内存池核心类如下:

    • PooledByteBufAllocator 本期介绍
    • PooledUnsafeDirectByteBuf
    • PooledUnsafeDirectByteBuf
    • PoolThreadCache
    • MemoryRegionCache
    • PoolArena
    • SizeClasses
    • PoolChunk
    • LongPriorityQueue
    • LongLongHashMap
    • PoolSubpage

    一、详情概要

    从上一期文章 我们可以知道 内存分配的入口是 【PooledByteBufAllocator #newDirectBuffer()】。【PooledByteBufAllocator】 该类,也就是内存分配的 起始类, 其内部定义了很多 默认变量值 用于接下来的内存分配整个流程。

    PooledByteBufAllocator】的结构图如下:

    从上图 可看到有三个陌生的类:

    ​ 1.【PoolArena】(DirectArena, HeapArena都继承于该类) : 可以把他想象成一块大内存(直接内存/ 堆内存)

    ​ 2.【PoolThreadCache】: 线程本地内存缓存池

    ​ 3.【InternalThreadLocalMap】: 目前把他当成 ThreadLocalMap 就行

    在讲解源码前,请 思考一个问题 :

    从图中可看到 不管是 DirectArena 还是 HeapArena,他们都有多个,且各自组成了数组 分别是directArenas 和 heapArenas。 前面说了PoolArena , 我们目前可以看成是一块大内存, 当业务来申请内存时,需要从PoolArena 这块大内存中 截取一块来使用就行。 但是 为什么要有多个PoolArena呢

    解答:

    首先我们假设就只有一个PoolArena, 众所周知 现在的 CPU 一般都是多核的, 此时有多个业务(多线程) 同时从Netty中 申请内存(从大内存中偏移出多个小内存)来使用,而本质上是多核CPU来操作这同一块大内存 进行读写。但是 由于 操作系统的读写内存屏障 存在, 会导致多个线程的读写并不能做到真正的并行。 因此Netty用了多个PoolArena 来减轻这种不能并行的行为,从而提升效率

    二、 源码分析

    1.关键成员变量

       	
    	// 默认 HeapArena的个数   cpu*2
        private static final int DEFAULT_NUM_HEAP_ARENA;
    
    	// 默认 DirectArena的个数 cpu*2
        private static final int DEFAULT_NUM_DIRECT_ARENA;
    
     	// 默认一页大小 PageSize  8192 => 8KB
        private static final int DEFAULT_PAGE_SIZE;
    	
    	// 用来计算默认Chunk的大小,在jemalloc3 中同时还表示Chunk内部完全二叉树的最大深度。
        private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
    
        // 表示默认poolThreadCache中 一个smallMemoryRegionCache中的队列长度  256
        private static final int DEFAULT_SMALL_CACHE_SIZE;
    
    	// 表示默认poolThreadCache中 一个normalMemoryRegionCache中的队列长度 64
        private static final int DEFAULT_NORMAL_CACHE_SIZE;
    
    	//  表示最大缓存规格 32KB
        static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
    
    	// 表示 默认 某个线程从缓存中获取内存的最大次数限制
        private static final int DEFAULT_CACHE_TRIM_INTERVAL;
    
    	// 表示 是否允许所有的线程使用缓存  默认是true
        private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
    	
    	// 默认内存缓存对齐填充为0
        private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
         
    	// heapArena数组
        private final PoolArena<byte[]>[] heapArenas;
    
    	// directArena数组
        private final PoolArena<ByteBuffer>[] directArenas;
    
    	// PoolThreadLocalCache 线程本地缓存
        private final PoolThreadLocalCache threadCache;
    	
    	// chunk的大小
        private final int chunkSize;
    
    

    2.构造方法

        /**
         *
         * @param preferDirect          是否申请直接内存                   默认一般都是true
         * @param nHeapArena            heapArena的个数                  假设 cpu*2
         * @param nDirectArena          directArena的个数                假设 cpu*2 
         * @param pageSize              页的大小                          默认8KB (8192)
         * @param maxOrder             chunk中完全平衡二叉树的深度           11
         * @param smallCacheSize     smallMemoryRegionCache的队列长度      256
         * @param normalCacheSize    normalMemoryRegionCache的队列长度     64
         * @param useCacheForAllThreads 是否所有的线程都使用PoolThreadCache  true
         * @param directMemoryCacheAlignment 对齐填充                      0
         */
        public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int smallCacheSize, int normalCacheSize,
    boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
            
            // directByDefault = true
            super(preferDirect);
    
            // 目前理解成 threadLocal 每个线程中有自己的 PoolThreadLocalCache缓存
            threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
            
            // 赋值 smallCacheSize=256  normalCacheSize=64
            this.smallCacheSize = smallCacheSize;
            this.normalCacheSize = normalCacheSize;
    		
            
            // 计算chunkSize   8KB<<11 = 16mb (16,777,216)
            chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
    		
    
            // pageSize=8KB(8192)  
            //pageShifts表示 1 左移多少位是 8192  = 13
            int pageShifts = validateAndCalculatePageShifts(pageSize, directMemoryCacheAlignment);
            
            
            
            // 生成 heapArenas 数组
            if (nHeapArena > 0) {
                heapArenas = newArenaArray(nHeapArena);
                List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
                for (int i = 0; i < heapArenas.length; i ++) {
                    PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                            pageSize, pageShifts, chunkSize,
                            directMemoryCacheAlignment);
                    heapArenas[i] = arena;
                    metrics.add(arena);
                }
                heapArenaMetrics = Collections.unmodifiableList(metrics);
            } else {
                heapArenas = null;
                heapArenaMetrics = Collections.emptyList();
            }
    		
            // netty默认情况下都会使用 直接内存,因此我们在整个Netty中关心直接内存相关就可以了,而且直接内存与堆内存逻辑并无太多差异。
            
            // 生成 directArena数组  
            if (nDirectArena > 0) {
    
                // 假设平台CPU 个数是8, 这里会创建 cpu(8)*2 = 16个长度的 directArenas数组。
                directArenas = newArenaArray(nDirectArena);
    
                // 这是个内存池图表 
                //如果想要监测 内存池详情 可使用该对象(关于内存分配逻辑不用关心Metric相关对象)
                List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
    
                // for循环最终创建了 16个 directArena 对象,并且 将这些DirectArena对象放入到数组内
                for (int i = 0; i < directArenas.length; i ++) {
                    
                    // 参数1:allocator 对象
                    // 参数2:pageSize 8k
                    // 参数3:pageShift 13  1<<13 可推出pageSize的值
                    // 参数4:chunkSize: 16mb
                    // 参数5:directMemoryCacheAlignment 对齐填充 0
                    // 生成 DirectArena对象
                    PoolArena.DirectArena arena = new PoolArena.DirectArena(
                            this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
                    directArenas[i] = arena;
                    metrics.add(arena);
                }
                directArenaMetrics = Collections.unmodifiableList(metrics);
            } else {
                directArenas = null;
                directArenaMetrics = Collections.emptyList();
            }
            
            metric = new PooledByteBufAllocatorMetric(this);
        }
    

    3.申请内存入口

        /**
         * 申请分配直接内存 入口
         * @param initialCapacity  业务需求内存大小
         * @param maxCapacity      内存最大限制
         * @return  ByteBuf netty中内存对象
         */
        @Override
        protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    
            // 若当前线程没有PoolThreadCache 则创建一份 PoolThreadCache (里面包含 Small、Normal MemroyRegionCache数组【该数组中每个元素包含一个队列】)用于缓存内存对象
          	// 若当前线程有 则直接获取
            PoolThreadCache cache = threadCache.get();
    
            // 拿到当前线程cache中绑定的directArena
            PoolArena<ByteBuffer> directArena = cache.directArena;
    
            final ByteBuf buf;
    
            // 这个条件正常逻辑 都会成立
            if (directArena != null) {
                
                //这是咱们的核心入口 ******  
                // 参数1: cache 当前线程相关的PoolThreadCache对象
                // 参数2:initialCapactiy 业务层需要的内存容量
                // 参数3:maxCapacity 最大内存大小
                buf = directArena.allocate(cache, initialCapacity, maxCapacity);
            } else {
                // 一般不会走到这里,不用看
                buf = PlatformDependent.hasUnsafe() ?
                        UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
    
            return toLeakAwareBuffer(buf);
        }
    

    三、总结

    PooledByteBufAllocator】 类主要是

    1. 设置些默认变量值。 pageSize , chunkSize

    2. 并为每个申请内存的线程 创建一份 【PoolThreadCache】缓存。

    3. 根据当前平台的CPU数量,设置 【PoolArena 】 ,而最终申请内存的工作还是交给了 【PoolArena】。

    万般皆下品,唯有读书高!
  • 相关阅读:
    OAF_OAF控件系列9
    OAF_OAF控件系列8
    OAF_OAF控件系列7
    OAF_OAF控件系列5
    OAF_OAF控件系列4
    OAF_OAF控件系列3
    OAF_OAF控件系列3
    OAF_OAF控件系列2
    OAF_OAF控件系列1
    PLSQL_性能优化索引Index介绍(概念)
  • 原文地址:https://www.cnblogs.com/s686zhou/p/15701801.html
Copyright © 2011-2022 走看看