zoukankan      html  css  js  c++  java
  • PriorityBlockingQueue 源码分析

    PriorityBlockingQueue

    PriorityBlockingQueue 能解决什么问题?什么时候使用 PriorityBlockingQueue?

    1)PriorityBlockingQueue 是基于优先级堆实现的线程安全的、无界、优先级、阻塞队列。
    2)队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序。
    take 操作会移除并返回优先级最高的元素,如果队列为空,则当前线程被阻塞。
    

    如何使用 PriorityBlockingQueue?

    1)在并发场景下,需要按照指定的优先级获取元素时。
    

    使用 PriorityBlockingQueue 有什么风险?

    1)PriorityBlockingQueue 是无界的阻塞队列,它并不会阻塞生产者插入元素,
    当生产速率大于消费速率时,时间一长,可能导致内存溢出。
    

    PriorityBlockingQueue 核心操作的实现原理?

    • 创建实例
        /**
         *  默认的数组容量
         */
        private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
        /**
         *  最大的数组容量
         */
        private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    
        /**
         * Priority queue represented as a balanced binary heap: the two
         * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
         * priority queue is ordered by comparator, or by the elements'
         * natural ordering, if comparator is null: For each node n in the
         * heap and each descendant d of n, n <= d.  The element with the
         * lowest value is in queue[0], assuming the queue is nonempty.
         */
        private transient Object[] queue;
    
        /**
         *  优先级队列中的元素总数
         */
        private transient int size;
    
        /**
         *  元素比较器,如果按照自然顺序进行排序,则为 null
         */
        private transient Comparator<? super E> comparator;
    
        /**
         *  控制访问的可重入互斥锁
         */
        private final ReentrantLock lock;
    
        /**
         *  队列为空时,目标线程将在非空条件阻塞等待
         */
        private final Condition notEmpty;
    
        /**
         *  扩容时的锁,通过 CAS 写入
         */
        private transient volatile int allocationSpinLock;
    
        /**
         *  创建一个初始容量为 11,按照自然顺序排序的 PriorityBlockingQueue 实例
         */
        public PriorityBlockingQueue() {
            this(PriorityBlockingQueue.DEFAULT_INITIAL_CAPACITY, null);
        }
    
        /**
         *  创建一个初始容量为 initialCapacity,按照自然顺序排序的 PriorityBlockingQueue 实例
         */
        public PriorityBlockingQueue(int initialCapacity) {
            this(initialCapacity, null);
        }
    
        /**
         *  创建一个初始容量为 initialCapacity,按照自然顺序排序的 comparator 实例
         */
        public PriorityBlockingQueue(int initialCapacity,
                Comparator<? super E> comparator) {
            if (initialCapacity < 1) {
                throw new IllegalArgumentException();
            }
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            this.comparator = comparator;
            this.queue = new Object[initialCapacity];
        }
    
    • 写入元素
        /**
         *  将目标元素插入到队列中,由于是无界的,不会被阻塞
         */
        @Override
        public void put(E e) {
            offer(e); // never need to block
        }
    
        @Override
        public boolean offer(E e) {
            if (e == null) {
                throw new NullPointerException();
            }
            final ReentrantLock lock = this.lock;
            lock.lock();
            /**
             * n:length
             * cap:capacity
             */
            int n, cap;
            Object[] array;
            // 元素个数超出队列长度,则进行扩容
            while ((n = size) >= (cap = (array = queue).length)) {
                tryGrow(array, cap);
            }
            try {
                final Comparator<? super E> cmp = comparator;
                if (cmp == null) {
                    // 自然顺序的插入
                    PriorityBlockingQueue.siftUpComparable(n, e, array);
                } else {
                    // 使用指定比较器的插入
                    PriorityBlockingQueue.siftUpUsingComparator(n, e, array, cmp);
                }
                size = n + 1;
                // 唤醒在非空条件上阻塞等待的线程
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }
    
        private void tryGrow(Object[] array, int oldCap) {
            lock.unlock(); // must release and then re-acquire main lock
            Object[] newArray = null;
            // 当前没有线程在执行扩容 && 原子更新扩容标识为 1
            if (allocationSpinLock == 0 &&
                    PriorityBlockingQueue.ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
                try {
                    /**
                     * 1)旧容量小于 64,执行【双倍+2】扩容
                     * 2)旧容量大于等于 64,执行1.5倍向下取整扩容
                     */
                    int newCap = oldCap + (oldCap < 64 ?
                            oldCap + 2 : // grow faster if small
                                oldCap >> 1);
                    // 新容量超出最大容量
                    if (newCap - PriorityBlockingQueue.MAX_ARRAY_SIZE > 0) {    // possible overflow
                        final int minCap = oldCap + 1;
                        // 如果已经溢出,则抛出 OutOfMemoryError 异常
                        if (minCap < 0 || minCap > PriorityBlockingQueue.MAX_ARRAY_SIZE) {
                            throw new OutOfMemoryError();
                        }
                        // 写入最大容量
                        newCap = PriorityBlockingQueue.MAX_ARRAY_SIZE;
                    }
                    // 创建新的对象数组
                    if (newCap > oldCap && queue == array) {
                        newArray = new Object[newCap];
                    }
                } finally {
                    // 重置扩容标记
                    allocationSpinLock = 0;
                }
            }
            // 说明已经有线程在执行扩容,则等待其扩容完成
            if (newArray == null) {
                Thread.yield();
            }
            lock.lock();
            if (newArray != null && queue == array) {
                // 扩容成功的线程会将元素从旧数组拷贝到新数组中
                queue = newArray;
                System.arraycopy(array, 0, newArray, 0, oldCap);
            }
        }
    
        private static <T> void siftUpComparable(int k, T x, Object[] array) {
            final Comparable<? super T> key = (Comparable<? super T>) x;
            // 插入元素的目标索引
            while (k > 0) {
                // 计算父节点索引
                final int parent = k - 1 >>> 1;
                // 读取父节点值
                final Object e = array[parent];
                // 新增元素已经 >= 当前节点,则无需上移
                if (key.compareTo((T) e) >= 0) {
                    break;
                }
                // 父节点元素下移
                array[k] = e;
                // 递归比较祖父节点
                k = parent;
            }
            // 插入目标元素
            array[k] = key;
        }
    
        /**
         *  实现逻辑和 siftUpComparable 一致
         * created by ZXD at 6 Dec 2018 T 21:36:14
         * @param k
         * @param x
         * @param array
         * @param cmp
         */
        private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                Comparator<? super T> cmp) {
            while (k > 0) {
                final int parent = k - 1 >>> 1;
                final Object e = array[parent];
                if (cmp.compare(x, (T) e) >= 0) {
                    break;
                }
                array[k] = e;
                k = parent;
            }
            array[k] = x;
        }
    
    • 读取元素
    • 如果队列为空,则阻塞等待有可用元素后重试,否则移除并返回优先级最高的元素
        /**
         *  如果队列为空,则阻塞等待有可用元素后重试,否则移除并返回优先级最高的元素
         */
        @Override
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            E result;
            try {
                // 尝试移除并返回优先级最高的元素
                while ( (result = dequeue()) == null) {
                    // 当前线程在非空条件上阻塞等待,被唤醒后重试
                    notEmpty.await();
                }
            } finally {
                lock.unlock();
            }
            return result;
        }
    
        private E dequeue() {
            // 计算尾部元素索引
            final int n = size - 1;
            // 队列为空,则返回 null
            if (n < 0) {
                return null;
            } else {
                final Object[] array = queue;
                // 读取优先级最高的元素
                final E result = (E) array[0];
                // 读取尾部元素
                final E x = (E) array[n];
                // 清空尾部元素
                array[n] = null;
                final Comparator<? super E> cmp = comparator;
                if (cmp == null) {
                    PriorityBlockingQueue.siftDownComparable(0, x, array, n);
                } else {
                    PriorityBlockingQueue.siftDownUsingComparator(0, x, array, n, cmp);
                }
                size = n;
                return result;
            }
        }
    
        /**
         * Inserts item x at position k, maintaining heap invariant by
         * demoting x down the tree repeatedly until it is less than or
         * equal to its children or is a leaf.
         *
         * @param k 需要填充的目标索引
         * @param x 需要插入的目标元素
         * @param array 持有对象的数组
         * @param n 堆大小
         */
        private static <T> void siftDownComparable(int k, T x, Object[] array,
                int n) {
            if (n > 0) {
                final Comparable<? super T> key = (Comparable<? super T>)x;
                // 计算二分索引
                final int half = n >>> 1;           // loop while a non-leaf
                while (k < half) {
                    // 计算左子节点索引
                    int child = (k << 1) + 1; // assume left child is least
                    // 读取节点值
                    Object c = array[child];
                    // 计算右子节点索引
                    final int right = child + 1;
                    /**
                     *  右子节点索引小于目标堆大小 &&
                     *  左子节点值 > 右子节点值
                     */
                    if (right < n &&
                            ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) {
                        // 读取右子节点值,更新查找节点索引
                        c = array[child = right];
                    }
                    // 目标键已经小于查找节点,则可以直接插入
                    if (key.compareTo((T) c) <= 0) {
                        break;
                    }
                    // 否则,提升子节点为父节点
                    array[k] = c;
                    // 迭代处理子节点
                    k = child;
                }
                // 插入目标值
                array[k] = key;
            }
        }
    
        private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                int n,
                Comparator<? super T> cmp) {
            if (n > 0) {
                final int half = n >>> 1;
                    while (k < half) {
                        int child = (k << 1) + 1;
                        Object c = array[child];
                        final int right = child + 1;
                        if (right < n && cmp.compare((T) c, (T) array[right]) > 0) {
                            c = array[child = right];
                        }
                        if (cmp.compare(x, (T) c) <= 0) {
                            break;
                        }
                        array[k] = c;
                        k = child;
                    }
                    array[k] = x;
            }
        }
    
    • 如果队列为空,则立即返回 null;否则移除并返回优先级最高的元素
        /**
         *  如果队列为空,则立即返回 null;否则移除并返回优先级最高的元素。
         * created by ZXD at 6 Dec 2018 T 21:38:57
         * @return
         */
        @Override
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    • 在指定的超时时间内尝试移除并返回优先级最高的元素,如果已经超时,则返回 null
        /**
         *  在指定的超时时间内尝试移除并返回优先级最高的元素,如果已经超时,则返回 null.
         * created by ZXD at 6 Dec 2018 T 21:40:03
         * @param timeout
         * @param unit
         * @return
         * @throws InterruptedException
         */
        @Override
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            E result;
            try {
                // 尝试移除并返回优先级最高的元素 && 未超时
                while ( (result = dequeue()) == null && nanos > 0) {
                    // 当前线程在非空条件上阻塞等待,被唤醒后重试
                    nanos = notEmpty.awaitNanos(nanos);
                }
            } finally {
                lock.unlock();
            }
            return result;
        }
    
  • 相关阅读:
    4种方法帮你解决IntelliJ IDEA控制台中文乱码问题
    万字长文:解读区块链7类共识算法
    CoralCache:一个提高微服务可用性的中间件
    探究Python源码,终于弄懂了字符串驻留技术
    OAuth:每次授权暗中保护你的那个“MAN”
    厉害了!这群95后正在用三维成像技术让科幻变成现实
    华为云FusionInsight MRS在金融行业存算分离的实践
    【新春特辑】发压岁钱、看贺岁片、AI写春联……华为云社区给大家拜年了
    Java实现 蓝桥杯 算法训练 天数计算
    WebRTC框架中的硬件加速
  • 原文地址:https://www.cnblogs.com/zhuxudong/p/10079766.html
Copyright © 2011-2022 走看看