zoukankan      html  css  js  c++  java
  • java源码阅读ArrayBlockingQueue

    1类签名与简介

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable

    一个有限的阻塞队列由数组支持。 这个队列排列元素FIFO(先进先出)。新元素插入队列的尾部,队列检索操作获取队列头部的元素。

    ArrayBlockingQueue是线程安全的,其内部通过“互斥锁”(Lock)保护竞争资源,实现了多线程对竞争资源的互斥访问。

    ArrayBlockingQueue的容量是固定的,确定之后无法更改。该实现既包含了一组非阻塞的操作的API,也包括了一组阻塞操作的API。

    阻塞情况有如下两种:(1)当队列为null时,取操作会阻塞;(2)当队列为满时,插入操作会被阻塞。

    ArrayBlockingQueue是并发包中的类,可以用来实现经典的“生产者/消费者”模型。

    2重要属性

    //items数组存储队列中的元素
    final Object[] items;
    
    //下一个取元素的索引(take, poll, peek or remove )
    int takeIndex;
    
    //下一个插入元素应该的索引(put, offer, or add)
    int putIndex;
    
    //队列中元素的个数
    int count;
    
    /** Main lock guarding all access */
    final ReentrantLock lock;
    
    /** Condition for waiting takes */
    private final Condition notEmpty;
    
    /** Condition for waiting puts */
    private final Condition notFull;

    takeIndex表示下一个待取元素的索引,对于队列来说就是队头,相应的putIndex指向队尾。lock是ArrayBlockingQueue类的主锁,该类的主要方法都要加上此锁。

    notEmpty和notFull实现了队列的阻塞,当为队列为null时,再取元素就会调用notEmpty.await()进入阻塞状态,当入队时肯定不为空,就会调用notEmpty.signal()唤醒阻塞的线程继续执行。同理notFull就不多介绍,对Lock锁与Condition 不熟悉的多线程基本知识了解一下。

    3 基本操作(入队/出队)

    (1)入队enqueue

    private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }

    首先,在队尾也就是putIndex指向的位置插入新元素;然后,putIndex增1,若此时putIndex等于items.length(超过了数组的最后一位items.length-1),putIndex循环到数组的第一位。同理面的takeIndex也有次操作,所以该队列是由循环数组实现的,每次插入队尾后移1位,每次取出队头向后移1位。图就不画,自行脑补。然后计数count++;最后随机唤醒1个由notEmpty.await阻塞的线程(若有的话),因为有元素入队了,此时队列肯定不为null了。

    (2)出队dequeue

    private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return x;
        }

    删除队头元素。

    4常用方法

    (1)添加/入队(add、offer、put)

    add方法如下

    public boolean add(E e) {
            return super.add(e);
        }
    
    //父类AbstractQueue中的add
    public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }

    ArrayBlockingQueue的add调用了父类AbstractQueue的add,add方法调用offer方法进行插入,offer返回true则add成功,offer返回false也就是插入失败会抛出异常。ArrayBlockingQueue实现了自己的offer,所以此处虽然调用父类的add,但是继承过来的add会调用自己的offer(多态)。

    offer代码如下

    public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }

    这里offer调用enqueue进行入队操作,当队列为满时不会阻塞,而是直接返回false。所以与add相比,offer一个明显的不同是,插入失败时不会报异常。但是add与这里的offer都不是阻塞方法。offer也提供了一个阻塞的重载版本

    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            checkNotNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0)
                        return false;
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            } finally {
                lock.unlock();
            }
        }

    队列若满,则线程会阻塞timeout长的时间,超时唤醒后会继续判断队列是否为满。还有一个添加的阻塞方法就是put,对于阻塞似乎put方法更加常用。

    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }

    与上面的offer类似,不同的是没有阻塞时间,需要被notFull Condition来唤醒。

    (2)删除/出队(poll、take)

    poll方法如下

    public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }

    这里的poll提供了一个非阻塞的出队操作。当队列为空时,返回null,否则出队并返回出队的元素。

    当然,与offer类似,poll还有一个阻塞的重载版本。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    ArrayBlockingQueue还提供了一个专门的出队阻塞方法take

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    当队列为null时,调用 notEmpty.await()使当前线程进入阻塞状态。下一次入队时会有notEmpty.signal()唤醒阻塞线程,这个唤醒只能唤醒一个,而且是随机的。

    (3)查看队头元素peek

    public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return itemAt(takeIndex); // null when queue is empty
            } finally {
                lock.unlock();
            }
        }

    返回队头元素,不删除。

    (4)其他

    由于ArrayBlockingQueue是由数组实现的,除了出队(删除队头元素),当然也可以删除任意位置的元素,所以此类对于删除操作还提供了如下方法

    remove(Object o) //查找队列,若存在元素o则删除
    
    removeAt(final int removeIndex) //删除指定位置的元素

    ArrayBlockingQueue重写了toString方法,将队列中所有元素用[]括起来,然后元素之间用逗号加空格分开。详细自行阅读源码。

    drainTo

    //将队列中的所有元素放到集合c中
    public int drainTo(Collection<? super E> c) {
            return drainTo(c, Integer.MAX_VALUE);
        }
    
    //将队列中的所有元素放到集合c中   
    public int drainTo(Collection<? super E> c, int maxElements) {
            checkNotNull(c);
            if (c == this)
                throw new IllegalArgumentException();
            if (maxElements <= 0)
                return 0;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int n = Math.min(maxElements, count);
                int take = takeIndex;
                int i = 0;
                try {
                    while (i < n) {
                        @SuppressWarnings("unchecked")
                        E x = (E) items[take];
                        c.add(x);
                        items[take] = null;
                        if (++take == items.length)
                            take = 0;
                        i++;
                    }
                    return n;
                } finally {
                    // Restore invariants even if c.add() threw
                    if (i > 0) {
                        count -= i;
                        takeIndex = take;
                        if (itrs != null) {
                            if (count == 0)
                                itrs.queueIsEmpty();
                            else if (i > take)
                                itrs.takeIndexWrapped();
                        }
                        for (; i > 0 && lock.hasWaiters(notFull); i--)
                            notFull.signal();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

  • 相关阅读:
    每日一博文
    用flash builder创建手机项目以及发布app需要注意的细节
    用xml配置加载cs中为ActionScript导出的类
    cs里面层,帧的处理方法
    一个项目要被自己加载的swf的项目调用方法
    App调用Android设备本地相机拍照并保存到本地相册
    让SWF文件从原始保存位置拿出来到任意位置都可以播放的设置
    记事
    一个主项目调用被加载的小游戏SWF项目的方法
    从一个App退出,关闭app
  • 原文地址:https://www.cnblogs.com/ouym/p/9035351.html
Copyright © 2011-2022 走看看