zoukankan      html  css  js  c++  java
  • ArrayBlockingQueue核心源码解读

     

    1 前言

    队列是一种在尾部添加元素、从头部删除元素的数据结构,而阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

    • ①支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。

    • ②支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变为非空。

    阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

    ArrayBlockingQueue继承于抽象类AbstractQueue,并实现了BlockingQueue接口,它内部使用数组来储存元素。它内部主要使用两个”条件“来实现”阻塞插入“、”阻塞移出“,这两个条件分别是”未满“、”非空“。ArrayBlockingQueue是一个有界的阻塞队列,它需要在构造方法中指定队列的容量。

    BlockingQueue接口方法说明

    public interface BlockingQueue<E> extends Queue<E> {
        //若能立即成功插入元素,则返回true,若超出容量不能插入元素,则抛出IllegalStateException
        boolean add(E e);
        //若能立即成功插入元素,则返回true,若容量已满不能插入元素,则返回false
        boolean offer(E e);
        //向队列中插入元素,若容量已满就阻塞等待
        void put(E e) throws InterruptedException;
        //向队列中插入元素,若等待给定的时长后还不能插入元素就返回false
        boolean offer(E e, long timeout, TimeUnit unit)
                throws InterruptedException;
        //不限时的等待元素出队
        E take() throws InterruptedException;
        //超时等待元素出队,若等待超时就返回null
        E poll(long timeout, TimeUnit unit)
                throws InterruptedException;
        //返回当前的剩余容量
        int remainingCapacity();
        //从队列中删除这元素,若存在这个元素就返回true
        boolean remove(Object o);
        //当前队列中是否存在此元素
        public boolean contains(Object o);
        //一次性将所有元素出队
        int drainTo(Collection<? super E> c);
        //一次性出队指定个数元素
        int drainTo(Collection<? super E> c, int maxElements);
    }
    BlockingQueue

     注:此处的源码分析基于JDK1.8

    2 成员变量与构造方法

    1) 成员变量

    /** The queued items */
    final Object[] items;
    /** items index for next take, poll, peek or remove */
    int takeIndex;
    /** items index for next put, offer, or add */
    int putIndex;
    /** Number of elements in the queue */
    int count;
    /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
    ​
    transient Itrs itrs = null;
    • items:保存元素的数组

    • takeIndex : 下次出队或删除元素的索引

    • putIndex:下次入队(插入)元素的索引

    • count:当前队列中的元素个数

    • lock:可重入锁,保证多线程访问时数据的一致性、可见性。

    • notEmpty: 队列”非空“的条件,出队时用到的条件(此条件与lock锁关联)

    • notFull: 队列”未满“的条件,入队时用到的条件(此条件与lock锁关联)

    • itrs:共享迭代器的状态,是迭代器Itr的辅助工具

    2) 构造方法

    可以看出构造方法主要涉及对items 、lock、notEmpty、notFull这四个实例变量的实例化。

    构造方法中必须要指定容量,因为在数组items初始化时必须知其长度;另外还可以选择锁的公平性,默认情况使用非公平锁。

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity]; 
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();//与lock锁关联的条件
        notFull =  lock.newCondition();//与lock锁关联的条件
    }
    public ArrayBlockingQueue(int capacity, boolean fair,  Collection<? extends E> c) {
        this(capacity, fair);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) { //逐个放入
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

    3 主要方法

    1) 添加元素

    add 、offer 、put 、offer这几个方法都逻辑很相似:在方法最外层使用lock锁来保证线程安全,然后判断队列是否已满,若队列未满,执行enqueue,将元素入队,而若队列已满,根据方法自身的定义决定是立即返回、或是抛出异常或阻塞等待。

    public boolean add(E e) {
        return super.add(e);//父类add方法调用offer方法实现
    }
    //队列已满,就不添加元素,直接返回false
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    //若队列已满,就阻塞等待到队列不满时再添加元素
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                // 不限时长地休眠,直到某些元素被删除时被唤醒(notFull.signal())
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    //若队列已满,就超时等待.若在超时等待返回后,队列仍是已满状态就返回false
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                //休眠给定的时长nanos,但有可能因队列中某元素被删除而被提前唤醒(notFull.signal())
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

    上面的几个方法者调用了enqueue(e)方法将元素添加到队列的尾部。

    enqueue主要逻辑:先将元素e放入对数组items的对应索引处,然后唤醒一个等待”非空“条件的线程

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;//
        if (++putIndex == items.length)//防止下标越界,将putIndex重置为0
            putIndex = 0;
        count++;//元素个数加1
        //唤醒一个等待非空条件的线程(因为在添加一个元素后,队列中至少含有一个元素了,队列不再是空的)
          //这里不是signalAll(),signal()只会唤醒一个线程,这样做的目的是减少线程竞争
        notEmpty.signal();
    }

    2) 元素出队

    poll、take 这3个方法都逻辑很相似:在方法最外层使用lock锁来保证线程安全,然后判断队列是否”非空“,若队列非空 ,执行dequeue将队列头部元素出队,若队列已空,根据方法自身的定义决定是立即返回还是阻塞等待。

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    ​
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
            // 不限时长地休眠,直到有元素入队后被唤醒(notEmpty.signal())
                notEmpty.await();
            return dequeue();//出队
        } finally {
            lock.unlock();
        }
    }
    //超时版本的poll
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                //休眠给定的时长nanos,但有可能因有元素入队而被提前唤醒(notEmpty.signal())
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();//出队
        } finally {
            lock.unlock();
        }
    }

    dequeue()方法将队列的头部元素从队列中移除并返回此元素

    enqueue主要逻辑:先将队列头部位置的元素引用清空,然后唤醒一个等待”未满“条件的线程。

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;//将指定索引处引用清空,元素从队列中移除
        if (++takeIndex == items.length)//防止下标越界,重置takeIndex,回到起点索引0
            takeIndex = 0;
        count--;//无数个数减1
        if (itrs != null)
            itrs.elementDequeued();
      //唤醒一个等待"未满"条件的线程(因为在删除一个元素后,队列中至少剩余一个空位,队列不再是满的)
       //这里不是signalAll(),signal()只会唤醒一个线程,在队列已满的情况下删除一个元素后,最多只能让一个元素入队,
     //所以只唤醒一个线程,能有效减少线程竞争
        notFull.signal();
        return x;
    }

    3) 删除指定元素

    remove(Object)方法用于从队列中移除指定元素. 它的逻辑很简单:它循环遍历队列中的所有元素,若存在此元素就将其移除,返回true,若不存在此元素就返回false.

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {//队列中含有元素才能移除元素
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {//遍历查找队列中是否存在此元素
                    if (o.equals(items[i])) {
                            //如果相等,就移除此元素
                        removeAt(i);
                        return true;
                    }
                    //与当前查找的元素不等,将索引位置加1,准备比较队列中的下个元素
                    if (++i == items.length)//防止下标越界,i重置为零
                        i = 0;
                } while (i != putIndex);//直到重新回到队列的头部,才退出循环。
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

     removeAt() 移除指定下标处的元素。removeAt()的主要逻辑:

    ①如果移除的元素在队列的头部,则与dequeue的处理方式相同,这种情况下比较简单。 ②但如果移除的元素不在队列的头部,则需要将待移除元素之后的所有元素整体向前移动一个索引位,然后将下次入队位置putIndex也前移一个索引位。

    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {//要移除的元素在队列的头部,与dequeue方法类似
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove
    // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)//防止下标越界,重置为零
                    next = 0;
                //在队列的中部删除元素,需要移除位置removeIndex之后的元素向前移动一位
                //第一次向前移动时item[removeIndex]的引用就被后继元素item[removeIndex+1]给覆盖了,
                //因此指定索引处的元素被移除了
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    //next=putIndex, 即i+1=putIndex
                    //向前移的元素位置和下次添加元素的位置相同,将items[i]清空,
                    //因为items[i]是唯一一个没有后继元素可覆盖它的元素
                    items[i] = null;
               //删除了一个元素,所以下次添加元素位置也要前移一位,即this.putIndex要减一,
                    //即this.putIndex=oldPutIndex-1=(i+1)-1=i
                    this.putIndex = i;
                    break;
                }
            }
            count--;//元素个数减1
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();//唤醒一个等待'未满'条件的线程
    }

      

    4) 批量出队

    drainTo系列方法用于批量出队,它相对于多次单元素出队,能提高性能、减少阻塞。

     public int drainTo(Collection<? super E> c) {
            return drainTo(c, Integer.MAX_VALUE);
     }
    ​
    public int drainTo(Collection<? super E> c, int maxElements) {
        checkNotNull(c);
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = Math.min(maxElements, count);
            int take = takeIndex;
            int i = 0;
            try {
                //从头部开始,将队列中的元素引用清空,并将这些元素添加到集合c中
                while (i < n) {
                    @SuppressWarnings("unchecked")
                    E x = (E) items[take];
                    c.add(x);
                    items[take] = null;
                    if (++take == items.length)
                        take = 0;
                    i++;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    count -= i;//更新元素个数
                    takeIndex = take;//更新下次出队的位置
                    if (itrs != null) {
                        if (count == 0)
                            itrs.queueIsEmpty();
                        else if (i > take)
                            itrs.takeIndexWrapped();
                    }
                    //唤醒i个等待"未满"条件的线程
                    for (; i > 0 && lock.hasWaiters(notFull); i--)
                        notFull.signal();
                }
            }
        } finally {
            lock.unlock();
        }
    }

     

    5) 其他方法

    contains()方法返回 队列中是否存在指定元素的布尔值。它主要是从队列的头部到队列的尾部遍历所有元素,查看是否存在此元素。

    public boolean contains(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;//从头部开始
                do {
                    if (o.equals(items[i]))
                        return true;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);//到尾部结束
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

     

    size()方法返回队列中的元素个数

    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

     

    clear()清空队列中的所有元素

    public void clear() {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int k = count;
            if (k > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    items[i] = null;
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
                takeIndex = putIndex;//入队位置和出队位置相同,也能表示队列中没有元素了
                count = 0;
                if (itrs != null)
                    itrs.queueIsEmpty();
                //唤醒count个等待”未满“条件的线程
                for (; k > 0 && lock.hasWaiters(notFull); k--)
                    notFull.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    4 总结

    ① 此阻塞队列实现的关键在于两个条件, '未满'和‘非空’。在删除元素后可以唤醒等待“未满”条件的线程,而在添加元素后可以唤醒等待“非空”条件的线程。两者的唤醒条件恰好相反地对应。

    ②在删除一个元素或添加一个元素之后,只会唤醒等待相应条件的一个线程,在批量删除n个元素后,也只唤醒n个等待”未满“条件的线程.这样做的目的是减少线程竞争,唤醒更多的线程无助于提高入队或出队的效率,相反,多出来的线程不能继续入队或出队,它们只是加剧线程竞争而已。

     

     

     

  • 相关阅读:
    23.什么是控制文件?
    路由器原理及路由协议
    大家帮忙看看,不会即使显示串口缓冲数据
    c# interface
    ip classless&no ip classless
    Visual Studio 2010 Visual C++ 确定要重新分发的 DLL
    fibonacci算法
    loopback端口作用
    疑惑的virtual method
    Read MBR via ReadFile function
  • 原文地址:https://www.cnblogs.com/gocode/p/analysis-source-code-of-ArrayBlockingQueue.html
Copyright © 2011-2022 走看看