1 前言
队列是一种在尾部添加元素、从头部删除元素的数据结构,而阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
-
①支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。
-
②支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
ArrayBlockingQueue继承于抽象类AbstractQueue,并实现了BlockingQueue接口,它内部使用数组来储存元素。它内部主要使用两个”条件“来实现”阻塞插入“、”阻塞移出“,这两个条件分别是”未满“、”非空“。ArrayBlockingQueue是一个有界的阻塞队列,它需要在构造方法中指定队列的容量。
BlockingQueue接口方法说明
public interface BlockingQueue<E> extends Queue<E> { //若能立即成功插入元素,则返回true,若超出容量不能插入元素,则抛出IllegalStateException boolean add(E e); //若能立即成功插入元素,则返回true,若容量已满不能插入元素,则返回false boolean offer(E e); //向队列中插入元素,若容量已满就阻塞等待 void put(E e) throws InterruptedException; //向队列中插入元素,若等待给定的时长后还不能插入元素就返回false boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //不限时的等待元素出队 E take() throws InterruptedException; //超时等待元素出队,若等待超时就返回null E poll(long timeout, TimeUnit unit) throws InterruptedException; //返回当前的剩余容量 int remainingCapacity(); //从队列中删除这元素,若存在这个元素就返回true boolean remove(Object o); //当前队列中是否存在此元素 public boolean contains(Object o); //一次性将所有元素出队 int drainTo(Collection<? super E> c); //一次性出队指定个数元素 int drainTo(Collection<? super E> c, int maxElements); }
注:此处的源码分析基于JDK1.8
2 成员变量与构造方法
1) 成员变量
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; transient Itrs itrs = null;
-
items
:保存元素的数组 -
takeIndex
: 下次出队或删除元素的索引 -
putIndex
:下次入队(插入)元素的索引 -
count
:当前队列中的元素个数 -
lock
:可重入锁,保证多线程访问时数据的一致性、可见性。 -
notEmpty
: 队列”非空“的条件,出队时用到的条件(此条件与lock锁关联) -
notFull
: 队列”未满“的条件,入队时用到的条件(此条件与lock锁关联) -
itrs
:共享迭代器的状态,是迭代器Itr的辅助工具
2) 构造方法
可以看出构造方法主要涉及对items 、lock、notEmpty、notFull这四个实例变量的实例化。
构造方法中必须要指定容量,因为在数组items
初始化时必须知其长度;另外还可以选择锁的公平性,默认情况使用非公平锁。
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition();//与lock锁关联的条件 notFull = lock.newCondition();//与lock锁关联的条件 } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { //逐个放入 checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
3 主要方法
1) 添加元素
add 、offer 、put 、offer这几个方法都逻辑很相似:在方法最外层使用lock锁来保证线程安全,然后判断队列是否已满,若队列未满,执行enqueue,将元素入队,而若队列已满,根据方法自身的定义决定是立即返回、或是抛出异常或阻塞等待。
public boolean add(E e) { return super.add(e);//父类add方法调用offer方法实现 } //队列已满,就不添加元素,直接返回false public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } //若队列已满,就阻塞等待到队列不满时再添加元素 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 不限时长地休眠,直到某些元素被删除时被唤醒(notFull.signal()) notFull.await(); enqueue(e); } finally { lock.unlock(); } } //若队列已满,就超时等待.若在超时等待返回后,队列仍是已满状态就返回false public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; //休眠给定的时长nanos,但有可能因队列中某元素被删除而被提前唤醒(notFull.signal()) nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
上面的几个方法者调用了enqueue(e)
方法将元素添加到队列的尾部。
enqueue主要逻辑:先将元素e放入对数组items的对应索引处,然后唤醒一个等待”非空“条件的线程
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x;// if (++putIndex == items.length)//防止下标越界,将putIndex重置为0 putIndex = 0; count++;//元素个数加1 //唤醒一个等待非空条件的线程(因为在添加一个元素后,队列中至少含有一个元素了,队列不再是空的) //这里不是signalAll(),signal()只会唤醒一个线程,这样做的目的是减少线程竞争 notEmpty.signal(); }
2) 元素出队
poll、take 这3个方法都逻辑很相似:在方法最外层使用lock锁来保证线程安全,然后判断队列是否”非空“,若队列非空 ,执行dequeue将队列头部元素出队,若队列已空,根据方法自身的定义决定是立即返回还是阻塞等待。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 不限时长地休眠,直到有元素入队后被唤醒(notEmpty.signal()) notEmpty.await(); return dequeue();//出队 } finally { lock.unlock(); } } //超时版本的poll public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; //休眠给定的时长nanos,但有可能因有元素入队而被提前唤醒(notEmpty.signal()) nanos = notEmpty.awaitNanos(nanos); } return dequeue();//出队 } finally { lock.unlock(); } }
dequeue()方法将队列的头部元素从队列中移除并返回此元素
enqueue主要逻辑:先将队列头部位置的元素引用清空,然后唤醒一个等待”未满“条件的线程。
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null;//将指定索引处引用清空,元素从队列中移除 if (++takeIndex == items.length)//防止下标越界,重置takeIndex,回到起点索引0 takeIndex = 0; count--;//无数个数减1 if (itrs != null) itrs.elementDequeued(); //唤醒一个等待"未满"条件的线程(因为在删除一个元素后,队列中至少剩余一个空位,队列不再是满的) //这里不是signalAll(),signal()只会唤醒一个线程,在队列已满的情况下删除一个元素后,最多只能让一个元素入队, //所以只唤醒一个线程,能有效减少线程竞争 notFull.signal(); return x; }
3) 删除指定元素
remove(Object)方法用于从队列中移除指定元素. 它的逻辑很简单:它循环遍历队列中的所有元素,若存在此元素就将其移除,返回true,若不存在此元素就返回false.
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) {//队列中含有元素才能移除元素 final int putIndex = this.putIndex; int i = takeIndex; do {//遍历查找队列中是否存在此元素 if (o.equals(items[i])) { //如果相等,就移除此元素 removeAt(i); return true; } //与当前查找的元素不等,将索引位置加1,准备比较队列中的下个元素 if (++i == items.length)//防止下标越界,i重置为零 i = 0; } while (i != putIndex);//直到重新回到队列的头部,才退出循环。 } return false; } finally { lock.unlock(); } }
removeAt() 移除指定下标处的元素。removeAt()的主要逻辑:
①如果移除的元素在队列的头部,则与dequeue的处理方式相同,这种情况下比较简单。
void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; if (removeIndex == takeIndex) {//要移除的元素在队列的头部,与dequeue方法类似 // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length)//防止下标越界,重置为零 next = 0; //在队列的中部删除元素,需要移除位置removeIndex之后的元素向前移动一位 //第一次向前移动时item[removeIndex]的引用就被后继元素item[removeIndex+1]给覆盖了, //因此指定索引处的元素被移除了 if (next != putIndex) { items[i] = items[next]; i = next; } else { //next=putIndex, 即i+1=putIndex //向前移的元素位置和下次添加元素的位置相同,将items[i]清空, //因为items[i]是唯一一个没有后继元素可覆盖它的元素 items[i] = null; //删除了一个元素,所以下次添加元素位置也要前移一位,即this.putIndex要减一, //即this.putIndex=oldPutIndex-1=(i+1)-1=i this.putIndex = i; break; } } count--;//元素个数减1 if (itrs != null) itrs.removedAt(removeIndex); } notFull.signal();//唤醒一个等待'未满'条件的线程 }
4) 批量出队
drainTo系列方法用于批量出队,它相对于多次单元素出队,能提高性能、减少阻塞。
public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); int take = takeIndex; int i = 0; try { //从头部开始,将队列中的元素引用清空,并将这些元素添加到集合c中 while (i < n) { @SuppressWarnings("unchecked") E x = (E) items[take]; c.add(x); items[take] = null; if (++take == items.length) take = 0; i++; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { count -= i;//更新元素个数 takeIndex = take;//更新下次出队的位置 if (itrs != null) { if (count == 0) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped(); } //唤醒i个等待"未满"条件的线程 for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); } } } finally { lock.unlock(); } }
5) 其他方法
contains()
方法返回 队列中是否存在指定元素的布尔值。它主要是从队列的头部到队列的尾部遍历所有元素,查看是否存在此元素。
public boolean contains(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex;//从头部开始 do { if (o.equals(items[i])) return true; if (++i == items.length) i = 0; } while (i != putIndex);//到尾部结束 } return false; } finally { lock.unlock(); } }
size()
方法返回队列中的元素个数
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
clear()
清空队列中的所有元素
public void clear() { final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int k = count; if (k > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { items[i] = null; if (++i == items.length) i = 0; } while (i != putIndex); takeIndex = putIndex;//入队位置和出队位置相同,也能表示队列中没有元素了 count = 0; if (itrs != null) itrs.queueIsEmpty(); //唤醒count个等待”未满“条件的线程 for (; k > 0 && lock.hasWaiters(notFull); k--) notFull.signal(); } } finally { lock.unlock(); } }
4 总结
① 此阻塞队列实现的关键在于两个条件, '未满'和‘非空’。在删除元素后可以唤醒等待“未满”条件的线程,而在添加元素后可以唤醒等待“非空”条件的线程。两者的唤醒条件恰好相反地对应。
②在删除一个元素或添加一个元素之后,只会唤醒等待相应条件的一个线程,在批量删除n个元素后,也只唤醒n个等待”未满“条件的线程.这样做的目的是减少线程竞争,唤醒更多的线程无助于提高入队或出队的效率,相反,多出来的线程不能继续入队或出队,它们只是加剧线程竞争而已。