zoukankan      html  css  js  c++  java
  • netty 的 Recycler

    netty 是用 Recycler 实现对象池。

    每个线程有一个 ThreadLocalMap 变量,ThreadLocalMap 本质是一个哈希表,用 index + 1 来避免槽冲突,键是 ThreadLocal 变量,值是尖括号里的对象。netty 里面大量使用 ThreadLocal,目的是减少多线程之间锁竞争。

    // 每个线程一个 Stack,用作对象池
    private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
        @Override
        protected Stack<T> initialValue() {
            return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                    interval, maxDelayedQueuesPerThread);
        }
    
        @Override
        protected void onRemoval(Stack<T> value) {
            // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
            if (value.threadRef.get() == Thread.currentThread()) {
               if (DELAYED_RECYCLED.isSet()) {
                   DELAYED_RECYCLED.get().remove(value);
               }
            }
        }
    };
    // 每个线程有一个 Map,存储其他线程的 Stack -> WeakOrderQueue 对
    private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
        @Override
        protected Map<Stack<?>, WeakOrderQueue> initialValue() {
            return new WeakHashMap<Stack<?>, WeakOrderQueue>();
        }
    };

    DefaultHandle 封装了对象,是直接存放在 Stack 中的元素

    private static final class DefaultHandle<T> implements Handle<T> {
        Stack<?> stack;
        Object value;
    
        DefaultHandle(Stack<?> stack) {
            this.stack = stack;
        }
    }

    Stack 是最直接的对象池,用数组实现的栈,存放 DefaultHandle

    private static final class Stack<T> {
        // stack 属于某一个线程,用弱引用
        final WeakReference<Thread> threadRef;
    
        DefaultHandle<?>[] elements;
    
    }

    如果我们获取一个对象,需要调用 

    public final T get() {
        // 没有启用对象池
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        // 获取当前对象的 Stack
        Stack<T> stack = threadLocal.get();
        // 从栈顶推出一个元素
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            // 栈中没有元素,则需要创建对象
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

    用完对象之后,归还

    public void recycle(Object object) {
        if (object != value) {
            throw new IllegalArgumentException("object does not belong to handle");
        }
    
        // DefaultHandle 绑定的 stack
        Stack<?> stack = this.stack;
        if (lastRecycledId != recycleId || stack == null) {
            throw new IllegalStateException("recycled already");
        }
    
        stack.push(this);
    }
    void push(DefaultHandle<?> item) {
        Thread currentThread = Thread.currentThread();
        // 当前线程持有该 stack
        if (threadRef.get() == currentThread) {
            // 直接入栈
            pushNow(item);
        } else {
            // 归还的对象不是由当前线程获取的
            pushLater(item, currentThread);
        }
    }
    private void pushLater(DefaultHandle<?> item, Thread thread) {
        if (maxDelayedQueues == 0) {
            // We don't support recycling across threads and should just drop the item on the floor.
            return;
        }
    
        // 当前线程的 Map
        Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
        WeakOrderQueue queue = delayedRecycled.get(this);
        if (queue == null) {
            if (delayedRecycled.size() >= maxDelayedQueues) {
                // Add a dummy queue so we know we should drop the object
                delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                return;
            }
            // 创建 WeakOrderQueue
            if ((queue = newWeakOrderQueue(thread)) == null) {
                // drop object
                return;
            }
            delayedRecycled.put(this, queue);
        } else if (queue == WeakOrderQueue.DUMMY) {
            // drop object
            return;
        }
        
        // DefaultHandle 放入 queue
        queue.add(item);
    }

    那么,归还到 WeakOrderQueue 中的元素,啥时候用到呢

    DefaultHandle<T> pop() {
        int size = this.size;
        if (size == 0) {
            if (!scavenge()) {
                return null;
            }
            size = this.size;
            if (size <= 0) {
                // double check, avoid races
                return null;
            }
        }
        size --;
        DefaultHandle ret = elements[size];
        elements[size] = null;
        // As we already set the element[size] to null we also need to store the updated size before we do
        // any validation. Otherwise we may see a null value when later try to pop again without a new element
        // added before.
        this.size = size;
    
        if (ret.lastRecycledId != ret.recycleId) {
            throw new IllegalStateException("recycled multiple times");
        }
        ret.recycleId = 0;
        ret.lastRecycledId = 0;
        return ret;
    }

    把 WeakOrderQueue 中的元素移到 Stack 中

    boolean transfer(Stack<?> dst) {
        Link head = this.head.link;
        if (head == null) {
            return false;
        }
    
        if (head.readIndex == LINK_CAPACITY) {
            if (head.next == null) {
                return false;
            }
            head = head.next;
            this.head.relink(head);
        }
    
        final int srcStart = head.readIndex;
        int srcEnd = head.get();
        final int srcSize = srcEnd - srcStart;
        if (srcSize == 0) {
            return false;
        }
    
        final int dstSize = dst.size;
        final int expectedCapacity = dstSize + srcSize;
    
        if (expectedCapacity > dst.elements.length) {
            final int actualCapacity = dst.increaseCapacity(expectedCapacity);
            srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
        }
    
        if (srcStart != srcEnd) {
            final DefaultHandle[] srcElems = head.elements;
            final DefaultHandle[] dstElems = dst.elements;
            int newDstSize = dstSize;
            for (int i = srcStart; i < srcEnd; i++) {
                DefaultHandle<?> element = srcElems[i];
                if (element.recycleId == 0) {
                    element.recycleId = element.lastRecycledId;
                } else if (element.recycleId != element.lastRecycledId) {
                    throw new IllegalStateException("recycled already");
                }
                srcElems[i] = null;
    
                if (dst.dropHandle(element)) {
                    // Drop the object.
                    continue;
                }
                element.stack = dst;
                dstElems[newDstSize ++] = element;
            }
    
            if (srcEnd == LINK_CAPACITY && head.next != null) {
                // Add capacity back as the Link is GCed.
                this.head.relink(head.next);
            }
    
            head.readIndex = srcEnd;
            if (dst.size == newDstSize) {
                return false;
            }
            dst.size = newDstSize;
            return true;
        } else {
            // The destination stack is full already.
            return false;
        }
    }
  • 相关阅读:
    Qt Installer Framework翻译(3-4)
    Qt Installer Framework翻译(3-3)
    Qt Installer Framework翻译(3-2)
    Qt Installer Framework翻译(3-1)
    Qt Installer Framework翻译(3-0)
    Qt Installer Framework翻译(0)
    Qt Installer Framework翻译(2)
    Qt Installer Framework翻译(1)
    野生前端的数据结构基础练习(8)——图
    野生前端的数据结构基础练习(7)——二叉树
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12229532.html
Copyright © 2011-2022 走看看