zoukankan      html  css  js  c++  java
  • java阻塞队列之ArrayBlockingQueue

    在Java的java.util.concurrent包中定义了和多线程并发相关的操作,有许多好用的工具类,今天就来看下阻塞队列。阻塞队列很好的解决了多线程中数据的安全传输问题,其中最典型的例子就是客园很好的解决“生产者--消费者”问题。下面来看其中一个实现类ArrayBlockingQueue。看到这个名字,就很好理解这个队列肯定是使用数组实现的队列,即使用数组实现的“先进先出”的队列,下面看其具体的实现。(均为JDK8)

    一、构造方法

    在ArrayBlockingQueue类中有下面的3个构造方法,

    1、ArrayBlockingQueue(int)

    接收一个整型的参数,这个整型参数指的是队列的长度,其定义如下,

    public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }

    可以看到这个方法调用的是ArrayBlockingQueue(int,boolean)方法,那么看下这个方法,

    2、ArrayBlockingQueue(int,boolean)

    接收两个参数,一个整型,一个boolean类型,前边已经知道整型参数是队列的长度,那么boolean类型参数代表什么意思那,其定义如下,

    public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }

    可以看到在这个构造方法中进行了相关逻辑实现,对items进行了数组初始化,boolean类型的参数是作为可重入锁的参数进行初始化,规定可重入锁是公平还是不公平,默认为false,另外初始化了notEmpty、notFull两个信号量。

    3、ArrayBlockingQueue(int,boolean,Collection<? extends E>)

    接收两三个参数,第一个整型,第二个boolean类型,第三个集合类型,此构造方法不常用,其定义如下,

    public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }

    通过上面的三个构造方法可以构造一个ArrayBlockingQueue的队列,在构造方法中初始化了实列变量,下面是一些实例变量,

    private static final long serialVersionUID = -817911632652898426L;
    
        /** The queued items */
        //保存队列元素的数组
        final Object[] items;
    
        /** items index for next take, poll, peek or remove */
        //取出元素的位置
        int takeIndex;
    
        /** items index for next put, offer, or add */
       //添加元素的位置
        int putIndex;
    
        /** Number of elements in the queue */
       //队列中元素的数量
        int count;
    
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** Main lock guarding all access */
        //锁对象
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        //不空的信号量
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        //不满的信号量
        private final Condition notFull;
    
        /**
         * Shared state for currently active iterators, or null if there
         * are known not to be any.  Allows queue operations to update
         * iterator state.
         */
        transient Itrs itrs = null;

    二、队列的操作

    需要使用阻塞队列,那么就需要向队列中添加或取出元素,在ArrayBlocking中已经实现了相关操作,对于添加/取出均是成对出现,提供的方法中有抛出异常、返回false、线程阻塞等几种情形。

    1、add/peek

    add/peek是一对互斥的操作,add向队列种放入元素,peek取出元素。

    1.1、add

    add的定义如下

    public boolean add(E e) {
            return super.add(e);
        }

    从上面可以看出add方法调用了其父类的实现,父类实现如下

    public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }

    父类实现种调用的offer方法,在offer方法返回true时,add方法返回true,其他则抛出“Queue full”的异常。offer方法下面会讲到。

    1.2、peek

    peek方法定义如下

    public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return itemAt(takeIndex); // null when queue is empty
            } finally {
                lock.unlock();
            }
        }

    在peek方法种使用可重入锁,返回takeIndex处的元素,前面注释中写道,此变量代表的是待取出元素的索引。itemAt方法定义如下,

    @SuppressWarnings("unchecked")
        final E itemAt(int i) {
            return (E) items[i];
        }

    此方法未进行任何的判断直接返回takeIndex出的数组元素。

    2、put/take

    put/take是一对互斥的操作,put向队列种放入元素,take取出元素,其实现方式和add/peek不一样。

    2.1、put

    put的定义如下

    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }

    此方法会判断元素是否null,然后判断当前队列中的元素数量和队列的长度,如果二者相等则阻塞当前线程;如果不相等则执行enqueue(e)方法,其定义如下,

    private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }

    把待插入的元素放在数组的putIndex位置,如果执行完插入后putIndex等于数组的长度,说明队列已经满了,那么把putIndex的值置为0,即下次插入的位置为0,下次要插入成功的必要条件是取出了一个元素,取出的位置为takeIndex。

    2.2、take

    take方法是取出一个元素,其定义如下,

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    此方法中首先判断当前队列的元素数量如果为0,则当前线程进行等待,等待notEmpty.singal(),如果不为空则执行dequeue()方法,其定义如下,

    private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return x;
        }

    此方法取出takeIndex位置的元素,把数组中此位置的引用置为null,判断takeIndex和数组的长度,如果相等证明,已经取到了最后一个元素,下次再取元素需要从位置0开始,为此把takeIndex置为0。

    3、offer(E)/poll

    offer/poll是一对互斥的操作,offer向队列种放入元素,poll取出元素,

    3.1、offer

    offer的定义如下

    public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }

    此方法判断如果队列中的元素数量和队列长度相等,则直接返回false,否则执行enqueue方法,put方法会将线程挂起,直到被中断或插入成功。

    3.2、poll

    poll方法如下

    public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }

    此方法判断当前队列中的元素个数如果为0返回null,否则执行dequeue操作,take方法会将线程挂起,直到被中断或取出成功。

    4、offer(E,long,TimeUnit)/poll(long,TimeUnnit)

     这两个方法是普通offer/poll方法的加强版,在队列满时指定了重试的时间,如果超过指定的时间后还是无法添加或取出则返回false。

    4.1、offer

    offer方法如下

    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            checkNotNull(e);
    //超时时间
    long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false;//超过规定时间,返回false nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }

    此方法会在指定的规定时间内一直重试,如果规定时间内无法退出循环即添加元素,则返回false。

    4.2、poll

    poll方法如下,

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    此方法同样是在取出元素时进行规定时间的重试,超过规定时间则返回null。

    三、方法比较

    1、添加方法比较

    序号 方法名 队列满时处理方式 方法返回值
    1 add(E e) 抛出“Queue full”异常 boolean
    2 offer(E e) 返回false boolean
    3 put(E e) 线程阻塞,直到中断或被唤醒 void
    4 offer(E e, long timeout, TimeUnit unit) 在规定时间内重试,超过规定时间返回false boolean

    2、取出方法比较

    序号 方法名 队列空时处理方式 方法返回值
    1 peek() 返回null E
    2 poll() 返回null E
    3 take() 线程阻塞,指定中断或被唤醒 E
    4 poll(long timeout, TimeUnit unit) 在规定时间内重试,超过规定时间返回null E

    四、总结

    以上时关于ArrayBlockingQueue这个阻塞队列的相关实现及方法介绍,此队列以数组为载体,配合可重入锁实现生产线程和消费线程共享数据,ArrayBlockingQueue作为共享池,实现了并发条件下的添加及取出等方法。

    有不正之处欢迎指正,感谢!

  • 相关阅读:
    Today is 5.20,her birthday...
    哭有时,笑有时,悲伤有时,欢乐有时。
    母亲节……
    性格决定命运...
    男人一生只会最爱一个女人?男人的爱一辈子只会付出一次?
    利物浦输了……You'll never walk alone!
    Liverpool,give me power again.Control myself.
    Python编程题27合并两个有序列表
    Python编程题25最长回文串的长度
    Python编程题26爬楼梯
  • 原文地址:https://www.cnblogs.com/teach/p/10665199.html
Copyright © 2011-2022 走看看