zoukankan      html  css  js  c++  java
  • JUC回顾之-ArrayBlockingQueue底层实现和原理

    ArrayBlockingQueue的原理和底层实现的数据结构 :

     ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列,可以按照 FIFO(先进先出)原则对元素进行排序。

    线程安全是指,ArrayBlockingQueue内部通过“互斥锁”保护竞争资源,实现了多线程对竞争资源的互斥访问。而有界,则是指ArrayBlockingQueue对应的数组是有界限的。 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待;所谓公平的访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,先阻塞的线程先访问ArrayBlockingQueue队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才能够访问队列。然而为了保证公平性,通常会降低吞吐量。

     1. ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。 并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

    2.ArrayBlockingQueue内部通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是在创建创建ArrayBlockingQueue时候指定的。

    3.如下图所示,ArrayBlockingQueue和ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象。ReentrantLock是可重入的互斥锁。ArrayBlockingQueue就是根据ReentrantLock互斥锁实现"多线程对共享资源的访问"。ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。

    4.ArrayBlockingQueue和Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象(notEmpty和notFull)。使用通知模式实现:所谓通知模式,当生产者往满的队列里面添加元素的时候,会阻塞生产者(调用Condition notFull.await()进行等待);当消费者消费了一个队列中的元素后,会通知(调用Condition notFull.signal()唤醒生产者)生产者当前队列可用。反之,当消费者消费的时候,发现队列是空的,则消费者会被阻塞(通过Condition的 notEmpty.await()进行等待),当生产者插入了队列中的一个元素后,则会调用notEmpty.signal()唤醒消费者继续消费。

     

    ArrayBlockingQueue的数据结构如下: 

    ArrayBlockingQueue方法列表:

    // 创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue。
    ArrayBlockingQueue(int capacity)
    // 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue。
    ArrayBlockingQueue(int capacity, boolean fair)
    // 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。
    ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
    
    // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。
    boolean add(E e)
    // 自动移除此队列中的所有元素。
    void clear()
    // 如果此队列包含指定的元素,则返回 true。
    boolean contains(Object o)
    // 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
    int drainTo(Collection<? super E> c)
    // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
    int drainTo(Collection<? super E> c, int maxElements)
    // 返回在此队列中的元素上按适当顺序进行迭代的迭代器。
    Iterator<E> iterator()
    // 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
    boolean offer(E e)
    // 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
    boolean offer(E e, long timeout, TimeUnit unit)
    // 获取但不移除此队列的头;如果此队列为空,则返回 null。
    E peek()
    // 获取并移除此队列的头,如果此队列为空,则返回 null。
    E poll()
    // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
    E poll(long timeout, TimeUnit unit)
    // 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。
    void put(E e)
    // 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的其他元素数量。
    int remainingCapacity()
    // 从此队列中移除指定元素的单个实例(如果存在)。
    boolean remove(Object o)
    // 返回此队列中元素的数量。
    int size()
    // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
    E take()
    // 返回一个按适当顺序包含此队列中所有元素的数组。
    Object[] toArray()
    // 返回一个按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
    <T> T[] toArray(T[] a)
    // 返回此 collection 的字符串表示形式。
    String toString()

    总结下上面的方法:

     *非阻塞队列中的方法:
     * 
     *  抛出异常的方法 Exception in thread "main" java.lang.IllegalStateException: Queue full
     
     *1. add(e) throw exception,将元素e插入到队列的末尾,如果插入成功,则返回true,如果插入失败 (队列已经满) 抛出异常
     *2. remove(e) throw exception,移除队首元素,若移除成功,则返回true;若移除失败(队列为空)则抛出异常
     *3. element() throw exception 获取队列首元素,若获取成功,则返回首元素,否则抛出异常 java.util.NoSuchElementException
     * 
     *  返回特定值的方法
     *
     * 1.offer(E e),将元素e插入到队列末尾,如果插入成功,则返回true,如果插入失败(队列已满),返回false
     * 2.poll(E e),移除并获取队首元素,若成功,则返回队首元素,否则返回null
     * 3.peek(E e),获取队首元素,若成功,则返回队首元素,否则则返回null
     *  可以指定TimeOut:
     * 
     * 3.offer(E e,long timeout, TimeUnit unit):向队列尾部存入元素e,如果队列满,则等待一定的时间,当达到timeout时候,则返回false,否则返回true
      4.poll(long timeout, TimeUnit unit):从队首获取元素,如果队列为空,则等待一定时间,当达到timeout时,如果没有取到,则返回null,如果取到则返回取到的元素
     阻塞队列中的几个重要方法:
    * 1.put(E e) :用于队列尾部存入元素e,如果对满,则等待。 * 2.take():用于从对列首取出元素e,如果队列为空,则等待

     注意:阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施,都是线程安全的。
    
    

    ArrayBlockingQueue源码分析(JDK1.7)

     

    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent;
    import java.util.concurrent.locks.*;
    import java.util.*;
    
    /**
     * A bounded {@linkplain BlockingQueue blocking queue} backed by an
     * array.  This queue orders elements FIFO (first-in-first-out).  The
     * <em>head</em> of the queue is that element that has been on the
     * queue the longest time.  The <em>tail</em> of the queue is that
     * element that has been on the queue the shortest time. New elements
     * are inserted at the tail of the queue, and the queue retrieval
     * operations obtain elements at the head of the queue.
     *
     * <p>This is a classic &quot;bounded buffer&quot;, in which a
     * fixed-sized array holds elements inserted by producers and
     * extracted by consumers.  Once created, the capacity cannot be
     * changed.  Attempts to {@code put} an element into a full queue
     * will result in the operation blocking; attempts to {@code take} an
     * element from an empty queue will similarly block.
     *
     * <p>This class supports an optional fairness policy for ordering
     * waiting producer and consumer threads.  By default, this ordering
     * is not guaranteed. However, a queue constructed with fairness set
     * to {@code true} grants threads access in FIFO order. Fairness
     * generally decreases throughput but reduces variability and avoids
     * starvation.
     *
     * <p>This class and its iterator implement all of the
     * <em>optional</em> methods of the {@link Collection} and {@link
     * Iterator} interfaces.
     *
     * <p>This class is a member of the
     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
     * Java Collections Framework</a>.
     *
     * @since 1.5
     * @author Doug Lea
     * @param <E> the type of elements held in this collection
     */
    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        /**
         * Serialization ID. This class relies on default serialization
         * even for the items array, which is default-serialized, even if
         * it is empty. Otherwise it could not be declared final, which is
         * necessary here.
         */
        private static final long serialVersionUID = -817911632652898426L;
    
        /** The queued items */
        final Object[] items;
    
        /** items index for next take, poll, peek or remove */
        int takeIndex;
    
        /** items index for next put, offer, or add */
        int putIndex;
    
        /** Number of elements in the queue */
        int count;
    
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** Main lock guarding all access */
        final ReentrantLock lock;
        /** Condition for waiting takes */
        private final Condition notEmpty;
        /** Condition for waiting puts */
        private final Condition notFull;
    
        // Internal helper methods
    
        /**
         * Circularly increment i.
         */
        final int inc(int i) {
            return (++i == items.length) ? 0 : i;
        }
    
        /**
         * Circularly decrement i.
         */
        final int dec(int i) {
            return ((i == 0) ? items.length : i) - 1;
        }
    
        @SuppressWarnings("unchecked")
        static <E> E cast(Object item) {
            return (E) item;
        }
    
        /**
         * Returns item at index i.
         */
        final E itemAt(int i) {
            return this.<E>cast(items[i]);
        }
    
        /**
         * Throws NullPointerException if argument is null.
         *
         * @param v the element
         */
        private static void checkNotNull(Object v) {
            if (v == null)
                throw new NullPointerException();
        }
    
        /**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
        private void insert(E x) {
            items[putIndex] = x;
            putIndex = inc(putIndex);
            ++count;
            notEmpty.signal();
        }
    
        /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E extract() {
            final Object[] items = this.items;
            E x = this.<E>cast(items[takeIndex]);
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return x;
        }
    
        /**
         * Deletes item at position i.
         * Utility for remove and iterator.remove.
         * Call only when holding lock.
         */
        void removeAt(int i) {
            final Object[] items = this.items;
            // if removing front item, just advance
            if (i == takeIndex) {
                items[takeIndex] = null;
                takeIndex = inc(takeIndex);
            } else {
                // slide over all others up through putIndex.
                for (;;) {
                    int nexti = inc(i);
                    if (nexti != putIndex) {
                        items[i] = items[nexti];
                        i = nexti;
                    } else {
                        items[i] = null;
                        putIndex = i;
                        break;
                    }
                }
            }
            --count;
            notFull.signal();
        }
    
        /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed)
         * capacity and default access policy.
         *
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if {@code capacity < 1}
         */
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed)
         * capacity and the specified access policy.
         *
         * @param capacity the capacity of this queue
         * @param fair if {@code true} then queue accesses for threads blocked
         *        on insertion or removal, are processed in FIFO order;
         *        if {@code false} the access order is unspecified.
         * @throws IllegalArgumentException if {@code capacity < 1}
         */
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
        /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed)
         * capacity, the specified access policy and initially containing the
         * elements of the given collection,
         * added in traversal order of the collection's iterator.
         *
         * @param capacity the capacity of this queue
         * @param fair if {@code true} then queue accesses for threads blocked
         *        on insertion or removal, are processed in FIFO order;
         *        if {@code false} the access order is unspecified.
         * @param c the collection of elements to initially contain
         * @throws IllegalArgumentException if {@code capacity} is less than
         *         {@code c.size()}, or less than 1.
         * @throws NullPointerException if the specified collection or any
         *         of its elements are null
         */
        public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Inserts the specified element at the tail of this queue if it is
         * possible to do so immediately without exceeding the queue's capacity,
         * returning {@code true} upon success and throwing an
         * {@code IllegalStateException} if this queue is full.
         *
         * @param e the element to add
         * @return {@code true} (as specified by {@link Collection#add})
         * @throws IllegalStateException if this queue is full
         * @throws NullPointerException if the specified element is null
         */
        public boolean add(E e) {
            return super.add(e);
        }
    
        /**
         * Inserts the specified element at the tail of this queue if it is
         * possible to do so immediately without exceeding the queue's capacity,
         * returning {@code true} upon success and {@code false} if this queue
         * is full.  This method is generally preferable to method {@link #add},
         * which can fail to insert an element only by throwing an exception.
         *
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    insert(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Inserts the specified element at the tail of this queue, waiting
         * for space to become available if the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                insert(e);
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Inserts the specified element at the tail of this queue, waiting
         * up to the specified wait time for space to become available if
         * the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            checkNotNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0)
                        return false;
                    nanos = notFull.awaitNanos(nanos);
                }
                insert(e);
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : extract();
            } finally {
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return extract();
            } finally {
                lock.unlock();
            }
        }
    
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                return extract();
            } finally {
                lock.unlock();
            }
        }
    
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : itemAt(takeIndex);
            } finally {
                lock.unlock();
            }
        }
    
        // this doc comment is overridden to remove the reference to collections
        // greater in size than Integer.MAX_VALUE
        /**
         * Returns the number of elements in this queue.
         *
         * @return the number of elements in this queue
         */
        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return count;
            } finally {
                lock.unlock();
            }
        }
    
        // this doc comment is a modified copy of the inherited doc comment,
        // without the reference to unlimited queues.
        /**
         * Returns the number of additional elements that this queue can ideally
         * (in the absence of memory or resource constraints) accept without
         * blocking. This is always equal to the initial capacity of this queue
         * less the current {@code size} of this queue.
         *
         * <p>Note that you <em>cannot</em> always tell if an attempt to insert
         * an element will succeed by inspecting {@code remainingCapacity}
         * because it may be the case that another thread is about to
         * insert or remove an element.
         */
        public int remainingCapacity() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return items.length - count;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Removes a single instance of the specified element from this queue,
         * if it is present.  More formally, removes an element {@code e} such
         * that {@code o.equals(e)}, if this queue contains one or more such
         * elements.
         * Returns {@code true} if this queue contained the specified element
         * (or equivalently, if this queue changed as a result of the call).
         *
         * <p>Removal of interior elements in circular array based queues
         * is an intrinsically slow and disruptive operation, so should
         * be undertaken only in exceptional circumstances, ideally
         * only when the queue is known not to be accessible by other
         * threads.
         *
         * @param o element to be removed from this queue, if present
         * @return {@code true} if this queue changed as a result of the call
         */
        public boolean remove(Object o) {
            if (o == null) return false;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Returns {@code true} if this queue contains the specified element.
         * More formally, returns {@code true} if and only if this queue contains
         * at least one element {@code e} such that {@code o.equals(e)}.
         *
         * @param o object to be checked for containment in this queue
         * @return {@code true} if this queue contains the specified element
         */
        public boolean contains(Object o) {
            if (o == null) return false;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
                    if (o.equals(items[i]))
                        return true;
                return false;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Returns an array containing all of the elements in this queue, in
         * proper sequence.
         *
         * <p>The returned array will be "safe" in that no references to it are
         * maintained by this queue.  (In other words, this method must allocate
         * a new array).  The caller is thus free to modify the returned array.
         *
         * <p>This method acts as bridge between array-based and collection-based
         * APIs.
         *
         * @return an array containing all of the elements in this queue
         */
        public Object[] toArray() {
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final int count = this.count;
                Object[] a = new Object[count];
                for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
                    a[k] = items[i];
                return a;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Returns an array containing all of the elements in this queue, in
         * proper sequence; the runtime type of the returned array is that of
         * the specified array.  If the queue fits in the specified array, it
         * is returned therein.  Otherwise, a new array is allocated with the
         * runtime type of the specified array and the size of this queue.
         *
         * <p>If this queue fits in the specified array with room to spare
         * (i.e., the array has more elements than this queue), the element in
         * the array immediately following the end of the queue is set to
         * {@code null}.
         *
         * <p>Like the {@link #toArray()} method, this method acts as bridge between
         * array-based and collection-based APIs.  Further, this method allows
         * precise control over the runtime type of the output array, and may,
         * under certain circumstances, be used to save allocation costs.
         *
         * <p>Suppose {@code x} is a queue known to contain only strings.
         * The following code can be used to dump the queue into a newly
         * allocated array of {@code String}:
         *
         * <pre>
         *     String[] y = x.toArray(new String[0]);</pre>
         *
         * Note that {@code toArray(new Object[0])} is identical in function to
         * {@code toArray()}.
         *
         * @param a the array into which the elements of the queue are to
         *          be stored, if it is big enough; otherwise, a new array of the
         *          same runtime type is allocated for this purpose
         * @return an array containing all of the elements in this queue
         * @throws ArrayStoreException if the runtime type of the specified array
         *         is not a supertype of the runtime type of every element in
         *         this queue
         * @throws NullPointerException if the specified array is null
         */
        @SuppressWarnings("unchecked")
        public <T> T[] toArray(T[] a) {
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final int count = this.count;
                final int len = a.length;
                if (len < count)
                    a = (T[])java.lang.reflect.Array.newInstance(
                        a.getClass().getComponentType(), count);
                for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
                    a[k] = (T) items[i];
                if (len > count)
                    a[count] = null;
                return a;
            } finally {
                lock.unlock();
            }
        }
    
        public String toString() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int k = count;
                if (k == 0)
                    return "[]";
    
                StringBuilder sb = new StringBuilder();
                sb.append('[');
                for (int i = takeIndex; ; i = inc(i)) {
                    Object e = items[i];
                    sb.append(e == this ? "(this Collection)" : e);
                    if (--k == 0)
                        return sb.append(']').toString();
                    sb.append(',').append(' ');
                }
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Atomically removes all of the elements from this queue.
         * The queue will be empty after this call returns.
         */
        public void clear() {
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
                    items[i] = null;
                count = 0;
                putIndex = 0;
                takeIndex = 0;
                notFull.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * @throws UnsupportedOperationException {@inheritDoc}
         * @throws ClassCastException            {@inheritDoc}
         * @throws NullPointerException          {@inheritDoc}
         * @throws IllegalArgumentException      {@inheritDoc}
         */
        public int drainTo(Collection<? super E> c) {
            checkNotNull(c);
            if (c == this)
                throw new IllegalArgumentException();
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = takeIndex;
                int n = 0;
                int max = count;
                while (n < max) {
                    c.add(this.<E>cast(items[i]));
                    items[i] = null;
                    i = inc(i);
                    ++n;
                }
                if (n > 0) {
                    count = 0;
                    putIndex = 0;
                    takeIndex = 0;
                    notFull.signalAll();
                }
                return n;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * @throws UnsupportedOperationException {@inheritDoc}
         * @throws ClassCastException            {@inheritDoc}
         * @throws NullPointerException          {@inheritDoc}
         * @throws IllegalArgumentException      {@inheritDoc}
         */
        public int drainTo(Collection<? super E> c, int maxElements) {
            checkNotNull(c);
            if (c == this)
                throw new IllegalArgumentException();
            if (maxElements <= 0)
                return 0;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = takeIndex;
                int n = 0;
                int max = (maxElements < count) ? maxElements : count;
                while (n < max) {
                    c.add(this.<E>cast(items[i]));
                    items[i] = null;
                    i = inc(i);
                    ++n;
                }
                if (n > 0) {
                    count -= n;
                    takeIndex = i;
                    notFull.signalAll();
                }
                return n;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Returns an iterator over the elements in this queue in proper sequence.
         * The elements will be returned in order from first (head) to last (tail).
         *
         * <p>The returned {@code Iterator} is a "weakly consistent" iterator that
         * will never throw {@link java.util.ConcurrentModificationException
         * ConcurrentModificationException},
         * and guarantees to traverse elements as they existed upon
         * construction of the iterator, and may (but is not guaranteed to)
         * reflect any modifications subsequent to construction.
         *
         * @return an iterator over the elements in this queue in proper sequence
         */
        public Iterator<E> iterator() {
            return new Itr();
        }
    
        /**
         * Iterator for ArrayBlockingQueue. To maintain weak consistency
         * with respect to puts and takes, we (1) read ahead one slot, so
         * as to not report hasNext true but then not have an element to
         * return -- however we later recheck this slot to use the most
         * current value; (2) ensure that each array slot is traversed at
         * most once (by tracking "remaining" elements); (3) skip over
         * null slots, which can occur if takes race ahead of iterators.
         * However, for circular array-based queues, we cannot rely on any
         * well established definition of what it means to be weakly
         * consistent with respect to interior removes since these may
         * require slot overwrites in the process of sliding elements to
         * cover gaps. So we settle for resiliency, operating on
         * established apparent nexts, which may miss some elements that
         * have moved between calls to next.
         */
        private class Itr implements Iterator<E> {
            private int remaining; // Number of elements yet to be returned
            private int nextIndex; // Index of element to be returned by next
            private E nextItem;    // Element to be returned by next call to next
            private E lastItem;    // Element returned by last call to next
            private int lastRet;   // Index of last element returned, or -1 if none
    
            Itr() {
                final ReentrantLock lock = ArrayBlockingQueue.this.lock;
                lock.lock();
                try {
                    lastRet = -1;
                    if ((remaining = count) > 0)
                        nextItem = itemAt(nextIndex = takeIndex);
                } finally {
                    lock.unlock();
                }
            }
    
            public boolean hasNext() {
                return remaining > 0;
            }
    
            public E next() {
                final ReentrantLock lock = ArrayBlockingQueue.this.lock;
                lock.lock();
                try {
                    if (remaining <= 0)
                        throw new NoSuchElementException();
                    lastRet = nextIndex;
                    E x = itemAt(nextIndex);  // check for fresher value
                    if (x == null) {
                        x = nextItem;         // we are forced to report old value
                        lastItem = null;      // but ensure remove fails
                    }
                    else
                        lastItem = x;
                    while (--remaining > 0 && // skip over nulls
                           (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
                        ;
                    return x;
                } finally {
                    lock.unlock();
                }
            }
    
            public void remove() {
                final ReentrantLock lock = ArrayBlockingQueue.this.lock;
                lock.lock();
                try {
                    int i = lastRet;
                    if (i == -1)
                        throw new IllegalStateException();
                    lastRet = -1;
                    E x = lastItem;
                    lastItem = null;
                    // only remove if item still at index
                    if (x != null && x == items[i]) {
                        boolean removingHead = (i == takeIndex);
                        removeAt(i);
                        if (!removingHead)
                            nextIndex = dec(nextIndex);
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
    
    }
    View Code

    下面从ArrayBlockingQueue的创建,添加,取出,遍历这几个方面对ArrayBlockingQueue进行分析。

    1.创建:

    /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed)
         * capacity and default access policy.
         *
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if {@code capacity < 1}
    只是指定
    ArrayBlockingQueue的容量,默认采用非公平互斥锁
    */ public ArrayBlockingQueue(int capacity) { this(capacity, false); }
      /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed)
         * capacity and the specified access policy.
         *
         * @param capacity the capacity of this queue
         * @param fair if {@code true} then queue accesses for threads blocked
         *        on insertion or removal, are processed in FIFO order;
         *        if {@code false} the access order is unspecified.
         * @throws IllegalArgumentException if {@code capacity < 1}
    指定容量和ReetrantLock的类型是否为公平锁创建阻塞队列
    */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

       上面源码进行说明:

         items是保存“阻塞队列”数据的数组。它的定义如下:

     /** The queued items */
        final Object[] items;

        fair是“可重入的独占锁(ReentrantLock)”的类型。fair为true,表示是公平锁;fair为false,表示是非公平锁。notEmpty和notFull是锁的两个Condition条件。它们的定义如下:

        /** Main lock guarding all access */
        final ReentrantLock lock;
        /** Condition for waiting takes */
        private final Condition notEmpty;
        /** Condition for waiting puts */
        private final Condition notFull;

     说明Lock的作用是提供独占锁机制,来保护竞争的资源;而Condition是为了更精细的对锁进行控制,但是依赖于lock,通过某个条件对多线程进行控制。

    notEmpty表示"锁的非空条件"。当某线程想从队列中获取数据的时候,而此时队列中的数据为空,则该线程通过notEmpty.await()方法进行等待;

    当其他线程向队列中插入元素之后,就调用notEmpty.signal()方法进行唤醒之前等待的线程。同理,notFull表示“锁满的条件“。当某个线程向队列中插入元素

    ,而此时队列已满时,该线程等待,即阻塞通过notFull.wait()方法;其他线程从队列中取出元素之后,就唤醒该等待的线程,这个线程调用notFull.signal()方法。 

    2.添加:

       

       

        /**
         * Inserts the specified element at the tail of this queue, waiting
         * for space to become available if the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
    添加元素,当队列满的时候,该线程等待,即阻塞。
    */ public void put(E e) throws InterruptedException {
    //校验插入的元素不能为null checkNotNull(e);
    final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {
    //队列满的时候
    while (count == items.length)
    //线程调用await方法阻塞 notFull.await(); insert(e); }
    finally { lock.unlock(); } }
      /** items index for next take, poll, peek or remove
    下一个被取出元素的索引
    */ int takeIndex; /** items index for next put, offer, or add
    下一个被添加元素的索引
    */ int putIndex; /** Number of elements in the queue
    队列中的元素的个数
    */ int count;
      /**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
        private void insert(E x) {
            items[putIndex] = x;
            putIndex = inc(putIndex);
    //队列中的元素个数
    ++count;
    //唤醒notEmpty Condition锁上面等待的线程,告诉该线程队列不为空了,可以消费了 notEmpty.signal(); }
     /**
         * Circularly increment i.
    队列中的元素个数==队列的长度的时候,队列满,则设置下一个被添加元素的索引为0
    */ final int inc(int i) { return (++i == items.length) ? 0 : i; }

    2.取出: 

       



    public E take() throws InterruptedException { final ReentrantLock lock = this.lock;
    //获取锁,如果当前线程是中断状态,则抛出interruptedException异常 lock.lockInterruptibly();
    try {
    //队列为空的时候,则线程一直阻塞等待
    while (count == 0) notEmpty.await();
    //取元素
    return extract(); } finally { lock.unlock(); } }
    /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E extract() {
            final Object[] items = this.items;
    //强制将元素转化为"泛型E" E x
    = this.<E>cast(items[takeIndex]); items[takeIndex] = null;
    //设置下一个被取出元素的索引 takeIndex
    = inc(takeIndex);
    //将队列中的元素-1
    --count;
    //唤醒notFull条件上面等待的线程,告诉该线程队列不是满的了,可以添加元素了 notFull.signal();
    return x; }
  • 相关阅读:
    数据结构之单链表的实现java
    从尾到头打印列表——牛客剑指offer
    Java重要类之LinkedList
    删除链表中重复的结点——牛客剑指offer
    二维数组中的查找——牛客剑指offer
    爬虫常见异常
    持续集成常见异常及排除方案
    VMware安装与基本使用
    web开发常见异常及处理
    Linux简单介绍与基本使用(微系统,)
  • 原文地址:https://www.cnblogs.com/200911/p/5994044.html
Copyright © 2011-2022 走看看