zoukankan      html  css  js  c++  java
  • java并发:阻塞队列之PriorityBlockingQueue

    优先级阻塞队列

    PriorityBlockingQueue是一个支持优先级排序无界阻塞队列。

    Note:

    PriorityBlockingQueue并不会阻塞生产者,而只是在没有可消费的数据时阻塞消费者;因此使用的时候需要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,最终会耗尽所有可用的内存空间。

    PriorityBlockingQueue的类图如下:

    PriorityBlockingQueue的定义如下:

    public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        private static final long serialVersionUID = 5595510919245408276L;
    
        /*
         * The implementation uses an array-based binary heap, with public
         * operations protected with a single lock. However, allocation
         * during resizing uses a simple spinlock (used only while not
         * holding main lock) in order to allow takes to operate
         * concurrently with allocation.  This avoids repeated
         * postponement of waiting consumers and consequent element
         * build-up. The need to back away from lock during allocation
         * makes it impossible to simply wrap delegated
         * java.util.PriorityQueue operations within a lock, as was done
         * in a previous version of this class. To maintain
         * interoperability, a plain PriorityQueue is still used during
         * serialization, which maintains compatibility at the expense of
         * transiently doubling overhead.
         */
    
        /**
         * Default array capacity.
         */
        private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
        /**
         * The maximum size of array to allocate.
         * Some VMs reserve some header words in an array.
         * Attempts to allocate larger arrays may result in
         * OutOfMemoryError: Requested array size exceeds VM limit
         */
        private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    
        /**
         * Priority queue represented as a balanced binary heap: the two
         * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
         * priority queue is ordered by comparator, or by the elements'
         * natural ordering, if comparator is null: For each node n in the
         * heap and each descendant d of n, n <= d.  The element with the
         * lowest value is in queue[0], assuming the queue is nonempty.
         */
        private transient Object[] queue;
    
        /**
         * The number of elements in the priority queue.
         */
        private transient int size;
    
        /**
         * The comparator, or null if priority queue uses elements'
         * natural ordering.
         */
        private transient Comparator<? super E> comparator;
    
        /**
         * Lock used for all public operations.
         */
        private final ReentrantLock lock = new ReentrantLock();
    
        /**
         * Condition for blocking when empty.
         */
        private final Condition notEmpty = lock.newCondition();
    
        /**
         * Spinlock for allocation, acquired via CAS.
         */
        private transient volatile int allocationSpinLock;
    
        /**
         * A plain PriorityQueue used only for serialization,
         * to maintain compatibility with previous versions
         * of this class. Non-null only during serialization/deserialization.
         */
        private PriorityQueue<E> q;

    其构造函数如下:

        /**
         * Creates a {@code PriorityBlockingQueue} with the default
         * initial capacity (11) that orders its elements according to
         * their {@linkplain Comparable natural ordering}.
         */
        public PriorityBlockingQueue() {
            this(DEFAULT_INITIAL_CAPACITY, null);
        }
    
        /**
         * Creates a {@code PriorityBlockingQueue} with the specified
         * initial capacity that orders its elements according to their
         * {@linkplain Comparable natural ordering}.
         *
         * @param initialCapacity the initial capacity for this priority queue
         * @throws IllegalArgumentException if {@code initialCapacity} is less
         *         than 1
         */
        public PriorityBlockingQueue(int initialCapacity) {
            this(initialCapacity, null);
        }
    
        /**
         * Creates a {@code PriorityBlockingQueue} with the specified initial
         * capacity that orders its elements according to the specified
         * comparator.
         *
         * @param initialCapacity the initial capacity for this priority queue
         * @param  comparator the comparator that will be used to order this
         *         priority queue.  If {@code null}, the {@linkplain Comparable
         *         natural ordering} of the elements will be used.
         * @throws IllegalArgumentException if {@code initialCapacity} is less
         *         than 1
         */
        public PriorityBlockingQueue(int initialCapacity,
                                     Comparator<? super E> comparator) {
            if (initialCapacity < 1)
                throw new IllegalArgumentException();
            this.comparator = comparator;
            this.queue = new Object[Math.max(1, initialCapacity)];
        }
    
        /**
         * Creates a {@code PriorityBlockingQueue} containing the elements
         * in the specified collection.  If the specified collection is a
         * {@link SortedSet} or a {@link PriorityQueue}, this
         * priority queue will be ordered according to the same ordering.
         * Otherwise, this priority queue will be ordered according to the
         * {@linkplain Comparable natural ordering} of its elements.
         *
         * @param  c the collection whose elements are to be placed
         *         into this priority queue
         * @throws ClassCastException if elements of the specified collection
         *         cannot be compared to one another according to the priority
         *         queue's ordering
         * @throws NullPointerException if the specified collection or any
         *         of its elements are null
         */
        public PriorityBlockingQueue(Collection<? extends E> c) {
            boolean heapify = true; // true if not known to be in heap order
            boolean screen = true;  // true if must screen for nulls
            if (c instanceof SortedSet<?>) {
                SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
                this.comparator = (Comparator<? super E>) ss.comparator();
                heapify = false;
            }
            else if (c instanceof PriorityBlockingQueue<?>) {
                PriorityBlockingQueue<? extends E> pq =
                    (PriorityBlockingQueue<? extends E>) c;
                this.comparator = (Comparator<? super E>) pq.comparator();
                screen = false;
                if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                    heapify = false;
            }
            Object[] es = c.toArray();
            int n = es.length;
            if (c.getClass() != java.util.ArrayList.class)
                es = Arrays.copyOf(es, n, Object[].class);
            if (screen && (n == 1 || this.comparator != null)) {
                for (Object e : es)
                    if (e == null)
                        throw new NullPointerException();
            }
            this.queue = ensureNonEmpty(es);
            this.size = n;
            if (heapify)
                heapify();
        }

    解读:

    PriorityBlockingQueue 内部有一个数组 queue,用来存放队列元素,size用来存放队列元素个数。

    独占锁对象lock 用来控制某个时间只能有一个线程可以进行入队、出队操作;notEmpty 条件变量用来实现 take 方法阻塞模式(跟其它阻塞队列相比,这里没有 notFull 条件变量,这是因为PriorityBlockingQueue是无界队列,其put 方法是非阻塞的)。

    每次出队都返回优先级最高或者最低的元素,默认使用对象的 compareTo 方法提供比较规则,这意味着队列元素必须实现了 Comparable 接口;如果需要自定义比较规则则可以通过构造函数自定义 comparator。

    Note:

    allocationspinLock是个自旋锁,它使用 CAS操作来保证某个时间只有一个线程可以扩容队列,状态为 0或者 1,0表示当前没有进行扩容,1表示当前正在扩容。

    PriorityBlockingQueue内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证元素有序。

    添加元素

    offer的方法如下:

        /**
         * Inserts the specified element into this priority queue.
         * As the queue is unbounded, this method will never return {@code false}.
         *
         * @param e the element to add
         * @return {@code true} (as specified by {@link Queue#offer})
         * @throws ClassCastException if the specified element cannot be compared
         *         with elements currently in the priority queue according to the
         *         priority queue's ordering
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] es;
            while ((n = size) >= (cap = (es = queue).length))
                tryGrow(es, cap);
            try {
                final Comparator<? super E> cmp;
                if ((cmp = comparator) == null)
                    siftUpComparable(n, e, es);
                else
                    siftUpUsingComparator(n, e, es, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }

    解读:

    由于是无界队列,所以一直返回 true。

    put方法的代码如下:

        /**
         * Inserts the specified element into this priority queue.
         * As the queue is unbounded, this method will never block.
         *
         * @param e the element to add
         * @throws ClassCastException if the specified element cannot be compared
         *         with elements currently in the priority queue according to the
         *         priority queue's ordering
         * @throws NullPointerException if the specified element is null
         */
        public void put(E e) {
            offer(e); // never need to block
        }

    解读:

    put方法是直接调用offer方法来实现的

    获取元素

    poll方法的定义如下:

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    take方法的定义如下:

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            E result;
            try {
                while ( (result = dequeue()) == null)
                    notEmpty.await();
            } finally {
                lock.unlock();
            }
            return result;
        }

    扩容

    此处重点研究一下扩容方法的实现,代码如下:

        /**
         * Tries to grow array to accommodate at least one more element
         * (but normally expand by about 50%), giving up (allowing retry)
         * on contention (which we expect to be rare). Call only while
         * holding lock.
         *
         * @param array the heap array
         * @param oldCap the length of the array
         */
        private void tryGrow(Object[] array, int oldCap) {
            lock.unlock(); // must release and then re-acquire main lock
            Object[] newArray = null;
            if (allocationSpinLock == 0 &&
                ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
                try {
                    int newCap = oldCap + ((oldCap < 64) ?
                                           (oldCap + 2) : // grow faster if small
                                           (oldCap >> 1));
                    if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                        int minCap = oldCap + 1;
                        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                            throw new OutOfMemoryError();
                        newCap = MAX_ARRAY_SIZE;
                    }
                    if (newCap > oldCap && queue == array)
                        newArray = new Object[newCap];
                } finally {
                    allocationSpinLock = 0;
                }
            }
            if (newArray == null) // back off if another thread is allocating
                Thread.yield();
            lock.lock();
            if (newArray != null && queue == array) {
                queue = newArray;
                System.arraycopy(array, 0, newArray, 0, oldCap);
            }
        }

    解读:

    此方法在offer方法中被调用。

    此方法在扩容前释放锁。

    问题:为什么在扩容前要先释放锁,然后使用 CAS 控制只有一个线程可以扩容成功?

    扩容需要花费一定时间,如果在整个扩容期间一直持有锁,则扩容期间其他线程不能进行入队、出队操作,这降低了并发性。

    扩容线程扩容时,其他线程原地自旋(会进入tryGrow方法,通过Thread.yield方法让出CPU,让扩容线程在扩容完毕后优先调用 lock.lock()重新获取锁,但这得不到保证),当扩容线程扩容完毕后才退出offer方法中的while循环。

    扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里没有使用 Unsafe 的 CAS 进行设置是因为某个时间只有一个线程获取到该锁,并且 allocationSpinLock 被修饰为 volatile。

    扩容线程扩容完毕后在获取锁后复制当前 queue 里面的元素到新数组。

    小结:

  • 相关阅读:
    用例图设计
    第二次结队作业
    第一次结对作业
    第二次个人编程作业
    简易的中文语言翻译器
    第一次博客作业
    个人总结
    第三次个人作业——用例图设计
    第二次结对作业
    第一次结对作业
  • 原文地址:https://www.cnblogs.com/studyLog-share/p/15140331.html
Copyright © 2011-2022 走看看