zoukankan      html  css  js  c++  java
  • Netty内存池ByteBuf 内存回收

    内存池ByteBuf 内存回收:

      在前面的章节中我们有提到, 堆外内存是不受JVM 垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf 操作时, 使用完毕要对对象进行回收, 本节就以PooledUnsafeDirectByteBuf 为例讲解有关内存分配的相关逻辑。PooledUnsafeDirectByteBuf 中内存释放的入口方法是其父类AbstractReferenceCountedByteBuf 中的release()方法:

    public boolean release() {
       return release0(1);
    }
    private boolean release0(int decrement) {
    for (;;) {
    int refCnt = this.refCnt;
    if (refCnt < decrement) {
    throw new IllegalReferenceCountException(refCnt, -decrement);
    }

    if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
    if (refCnt == decrement) {
    deallocate();
    return true;
    }
    return false;
    }
    }
    }

      if (refCnt == decrement) 中判断当前byteBuf 是否没有被引用了, 如果没有被引用, 则通过deallocate()方法进行释放。因为我们是以PooledUnsafeDirectByteBuf 为例, 所以这里会调用其父类PooledByteBuf 的deallocate()方法:

    protected final void deallocate() {
        if (handle >= 0) {
            final long handle = this.handle;
         //表示当前的ByteBuf 不再指向任何一块内存
    this.handle = -1;
         //这里将memory 也设置为null memory
    = null;
         //这一步是将ByteBuf 的内存进行释放 chunk.arena.free(chunk, handle, maxLength, cache);
         //将对象放入的对象回收站, 循环利用 recycle(); } }

      跟进 free 方法:

    void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
      //是否为unpooled
      if (chunk.unpooled) {
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
      } else {
        //那种级别的Size
        SizeClass sizeClass = sizeClass(normCapacity);
        //加到缓存里
        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
          return;
        }
        //将缓存对象标记为未使用     freeChunk(chunk, handle, sizeClass);   } }

      首先判断是不是unpooled, 我们这里是Pooled, 所以会走到else 块中:sizeClass(normCapacity)计算是哪种级别的size, 我们按照tiny 级别进行分析;cache.add(this, chunk, handle, normCapacity, sizeClass)是将当前当前ByteBuf 进行缓存;我们之前讲过, 再分配ByteBuf 时首先在缓存上分配, 而这步, 就是将其缓存的过程, 继续跟进去:

    boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
      //拿到MemoryRegionCache 节点
      MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
      if (cache == null) {
        return false;
      }
      //将chunk, 和handle 封装成实体加到queue 里面   return cache.add(chunk, handle); }

      首先根据根据类型拿到相关类型缓存节点, 这里会根据不同的内存规格去找不同的对象, 我们简单回顾一下, 每个缓存对象都包含一个queue, queue 中每个节点是entry, 每一个entry 中包含一个chunk 和handle, 可以指向唯一的连续的内存,我们跟到cache 中:

    private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
      switch (sizeClass) {
        case Normal:
            return cacheForNormal(area, normCapacity);
        case Small:
            return cacheForSmall(area, normCapacity);
        case Tiny:
            return cacheForTiny(area, normCapacity);
        default:
            throw new Error();
      }
    }

      假设我们是tiny 类型, 这里就会走到cacheForTiny(area, normCapacity)方法中, 跟进去:

    private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
       int idx = PoolArena.tinyIdx(normCapacity);
       if (area.isDirect()) {
           return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }

      这个方法我们之前剖析过, 就是根据大小找到第几个缓存中的第几个缓存, 拿到下标之后, 通过cache 去超相对应的缓存对象:

    private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
        if (cache == null || idx > cache.length - 1) {
            return null;
        }
        return cache[idx];
    }

      我们这里看到, 是直接通过下标拿的缓存对象,回到add()方法中:

    boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
      //拿到MemoryRegionCache 节点
      MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
      if (cache == null) {
        return false;
      }
      //将chunk, 和handle 封装成实体加到queue 里面
      return cache.add(chunk, handle);
    }

      这里的cache 对象调用了一个add 方法, 这个方法就是将chunk 和handle 封装成一个entry 加到queue 里面,我们跟到add()方法中:

    public final boolean add(PoolChunk<T> chunk, long handle) {
           Entry<T> entry = newEntry(chunk, handle);
           boolean queued = queue.offer(entry);
           if (!queued) {
                // If it was not possible to cache the chunk, immediately recycle the entry
                entry.recycle();
            }
            return queued;
    }

      我们之前介绍过, 从在缓存中分配的时候从queue 弹出一个entry, 会放到一个对象池里面, 而这里Entry<T> entry =newEntry(chunk, handle) 就是从对象池里去取一个entry 对象, 然后将chunk 和handle 进行赋值, 然后通过queue.offer(entry)加到queue,我们回到free()方法中:

    void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
      //是否为unpooled
      if (chunk.unpooled) {
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
      } else {
        //那种级别的Size
        SizeClass sizeClass = sizeClass(normCapacity);
        //加到缓存里
        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
          return;
        }
        //将缓存对象标记为未使用
        freeChunk(chunk, handle, sizeClass);
      }
    }

      这里加到缓存之后, 如果成功, 就会return, 如果不成功, 就会调用freeChunk(chunk, handle, sizeClass)方法, 这个方法的意义是, 将原先给ByteBuf 分配的内存区段标记为未使用,跟进freeChunk()简单分析下:

    void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
            final boolean destroyChunk;//销毁标志
            synchronized (this) {
                switch (sizeClass) {//区分大小类型
                case Normal:
                    ++deallocationsNormal;
                    break;
                case Small:
                    ++deallocationsSmall;
                    break;
                case Tiny:
                    ++deallocationsTiny;
                    break;
                default:
                    throw new Error();
                }//执行销毁
                destroyChunk = !chunk.parent.free(chunk, handle);
            }
            if (destroyChunk) {
                // destroyChunk not need to be called while holding the synchronized lock.
                destroyChunk(chunk);
            }
    }

      我们再跟到free()方法中:

    boolean free(PoolChunk<T> chunk, long handle) {
        chunk.free(handle);
        if (chunk.usage() < minUsage) {
            remove(chunk);
            // Move the PoolChunk down the PoolChunkList linked-list.
            return move0(chunk);
        }
        return true;
    }

      chunk.free(handle)的意思是通过chunk 释放一段连续的内存,再跟到free()方法中:

    void free(long handle) {
        int memoryMapIdx = memoryMapIdx(handle);
        int bitmapIdx = bitmapIdx(handle);
    
        if (bitmapIdx != 0) { // free a subpage
            PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
            assert subpage != null && subpage.doNotDestroy;
    
            // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
            // This is need as we may add it back and so alter the linked-list structure.
            PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
            synchronized (head) {
                if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
                    return;
                }
            }
        }
        freeBytes += runLength(memoryMapIdx);
        setValue(memoryMapIdx, depth(memoryMapIdx));
        updateParentsFree(memoryMapIdx);
    }

      if (bitmapIdx != 0)这里判断是当前缓冲区分配的级别是Page 还是Subpage, 如果是Subpage, 则会找到相关的Subpage 将其位图标记为0,如果不是subpage, 这里通过分配内存的反向标记, 将该内存标记为未使用。这段逻辑大家可以自行分析, 如果之前分配相关的知识掌握扎实的话, 这里的逻辑也不是很难。回到PooledByteBuf 的deallocate方法中:

    protected final void deallocate() {
        if (handle >= 0) {
            final long handle = this.handle;
         //表示当前的ByteBuf 不再指向任何一块内存
            this.handle = -1;
         //这里将memory 也设置为null
            memory = null;
         //这一步是将ByteBuf 的内存进行释放
            chunk.arena.free(chunk, handle, maxLength, cache);
         //将对象放入的对象回收站, 循环利用
            recycle();
        }
    }

      最后, 通过recycle()将释放的ByteBuf 放入对象回收站,以上就是内存回收的大概逻辑。

  • 相关阅读:
    【BZOJ1053】[HAOI2007]反素数
    【BZOJ1052】[HAOI2007]覆盖问题
    【BZOJ1051】[HAOI2006]受欢迎的牛
    【BZOJ1050】[HAOI2006]旅行
    laravel 操作多数据库总结
    微服务浅述---架构演进
    分布式锁
    laravel自动生成model
    springboot集成quartz实现任务调度
    laravel 队列服务使用总结
  • 原文地址:https://www.cnblogs.com/wuzhenzhao/p/11314201.html
Copyright © 2011-2022 走看看