zoukankan      html  css  js  c++  java
  • Jdk1.6 JUC源码解析(12)-ArrayBlockingQueue

    功能简介:
    • ArrayBlockingQueue是一种基于数组实现的有界的阻塞队列。队列中的元素遵循先入先出(FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。
    • 和普通队列有所不同,该队列支持阻塞操作。比如从空队列中取元素,会导致当前线程阻塞,直到其他线程将元素放入队列;将元素插入已经满的队列,同样会导致当前线程阻塞,直到其他线程从队列中取出元素。
    • ArrayBlockingQueue也支持公平和非公平策略(针对队列中元素的存取线程,也可认为是元素的生产者和消费者)。
    源码分析:
    • ArrayBlockingQueue继承了AbstractQueue并实现了BlockingQueue,AbstractQueue是Queue的公共骨架实现,这个不看了,简单看下BlockingQueue接口:
    public interface BlockingQueue<E> extends Queue<E> {
        /**
         * 将一个元素放入队列。
         * 成功返回true;失败抛IllegalStateException异常。
         */
        boolean add(E e);
        /**
         * 将一个元素放入队列。
         * 成功返回true;失败返回false。
         */
        boolean offer(E e);
        /**
         * 将一个元素放入队列。
         * 如果元素无法放入队列,当前操作线程会等待,直到元素可以放入队列。
         */
        void put(E e) throws InterruptedException;
        /**
         * 将一个元素放入队列。
         * 如果元素无法放入队列,当前操作线程会等待,直到元素可以放入队列或者
         * 给定的时间超时。
         * 成功返回true;超时返回false;
         */
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
        /**
         * 从队列头部获取并删除一个元素。
         * 如果无法获取元素,当前操作线程等待,直到有元素可以被获取。
         */
        E take() throws InterruptedException;
        /**
         * 从队列头部获取并删除一个元素。
         * 如果无法获取元素,当前操作线程等待,直到有元素可以被获取或者给定时间超时。
         * 如果超时,返回null。
         */
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
        /**
         * 获取队列剩余容量。
         */
        int remainingCapacity();
        /**
         * 移除队列中和给定元素相同的元素。
         */
        boolean remove(Object o);
        /**
         * 判断队列中是否包含给定元素。
         */
        public boolean contains(Object o);
        /**
         * 移除队列中所有的可用元素,并把它们添加到给定集合。
         */
        int drainTo(Collection<? super E> c);
        /**
         * 移除队列中不超过给定数量的可用元素,并把它们添加到给定集合。
         */
        int drainTo(Collection<? super E> c, int maxElements);
    }
           可以重点关注下put和take方法的行为。
    • 接下来看下ArrayBlockingQueue内部的数据结构:
    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        private static final long serialVersionUID = -817911632652898426L;
        /** 保存内部元素的数组  */
        private final E[] items;
        /** 取元素使用的下标 */
        private int takeIndex;
        /** 存元素使用的下标 */
        private int putIndex;
        /** 队列中元素数量 */
        private int count;
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
        /** 保护存取的锁 */
        private final ReentrantLock lock;
        /** 取的等待条件 */
        private final Condition notEmpty;
        /** 存的等待条件 */
        private final Condition notFull;
    
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = (E[]) new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
        public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
            if (capacity < c.size())
                throw new IllegalArgumentException();
            for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
                add(it.next());
        }
           ArrayBlockingQueue内部结构非常简单,就是一个数组,一把锁,两个条件;也可以看到,上面提到的公平和非公平策略是由内部的重入锁来支持的。
    • 继续看下ArrayBlockingQueue的重要方法,重点看下put和take,先看下put方法:
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            final E[] items = this.items;
            final ReentrantLock lock = this.lock;
            //由于需要支持方法可中断行为,这里使用可中断的锁操作。
            lock.lockInterruptibly();
            try {
                try {
                    while (count == items.length)
                        notFull.await();//队列满时,在notFull条件上等待。
                } catch (InterruptedException ie) {
                    notFull.signal(); // 被中断后,唤醒其他等待notFull条件的线程。
                    throw ie;
                }
                insert(e);
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Circularly increment i.
         */
        final int inc(int i) {
            return (++i == items.length)? 0 : i;
        }
        /**
         * 在内部数组的putIndex位置插入元素,调整putIndex和count,然后唤醒notEmpty条件上等待的线程。
         * 本方法只有在持有锁的情况下才会被调用。
         */
        private void insert(E x) {
            items[putIndex] = x;
            putIndex = inc(putIndex);
            ++count;
            notEmpty.signal();
        }

           再看下take方法: 

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                try {
                    while (count == 0)
                        notEmpty.await();//队列空时,在notEmpty条件上等待。
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // 被中断后,唤醒其他等待notEmpty条件的线程。
                    throw ie;
                }
                E x = extract();
                return x;
            } finally {
                lock.unlock();
            }
        }
        /**
         * 从takeInde的位置取出元素,增加takeIndex,减少count,唤醒在notFull上等待的线程。
         * 本方法只有在持有锁的情况下才会被调用。
         */
        private E extract() {
            final E[] items = this.items;
            E x = items[takeIndex];
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return x;
        }
    • 其他方法的实现也都比较简单,不进行一一解析。最后注意一下,ArrayBlockingQueue的Iterator是弱一致的。
  • 相关阅读:
    zookeeper部署
    Hadoop集群的构建和安装
    numpy ndarray求其最值的索引
    两数之和
    盛最多水的容器
    mysql 单表卡死
    Leetcode 258. Add Digits
    Matlab中添加路径与去除路径
    Leetcode 136 Single Number
    Leetcode 485. Max Consecutive Ones
  • 原文地址:https://www.cnblogs.com/wxgblogs/p/5806863.html
Copyright © 2011-2022 走看看