ArrayBlockingQueue
ArrayBlockingQueue
是一个基于数组存储实现的有界阻塞队列,新增、获取、移除元素由内部持有的重入锁控制。重入锁支持公平和非公平模式,默认使用非公平模式。
如果新增移除是阻塞的,那么新增时如果队列满了,会在
notFull
上等待,移除元素时如果队列空了,会在notEmpty
上等待。
ArrayBlockingQueue
提供了读指针takeIndex
和写指针putIndex
控制着队列元素的新增和移除。当移除元素之后takeIndex
向前移动,当新增元素之后putIndex
向后移动,当指针移动到数组尾部,指针置为0,继续移动。
count = items.length
队列满,count = 0
队列空。
关键属性
// 底层存储元素数组
final Object[] items;
// 当前移除元素指针
int takeIndex;
// 当前添加元素位置
int putIndex;
// 总元素个数
int count;
// 并发控制锁
final ReentrantLock lock;
// 非空条件,阻塞获取元素线程
private final Condition notEmpty;
// 非满条件,阻塞添加元素线程
private final Condition notFull;
构造函数
/**
* 指定队列容量,使用非公平锁
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 指定队列容量,fair=false
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 初始化重入锁及等待条件
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
添加元素
ArrayBlockingQueue
提供了add
、offer
、put
方法用于新增元素。add
在队列满时抛出异常;
offer
在队列满时立即返回;put
队列满时在notFull
上等待
/**
* offer遇到队列满时立即返回false
*/
public boolean offer(E e) {
checkNotNull(e);
// 添加元素时上锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列满,立即返回
if (count == items.length)
return false;
// 元素入队,插入元素后新增元素指针(putIndex)向前移动一位,唤醒非空条件上等待的线程
// 当putIndex=items.length,将putIndex设为0,即数组将会形成一个环形
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
/**
* add遇到队列满时立即抛出异常
*/
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
/**
* 元素入队,将新增元素放入putIndex位,唤醒在非空条件上等到的线程
* 如果已经到数组末尾,将putIndex置为0
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
/**
* put新增元素
* 1.如果队列已满,线程进入非满条件等待
* 2.队列未满,元素入队,并唤醒在非空上等待的元素
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
移除元素
/**
* poll移除元素
* 当队列空时,立即返回null
* 队列不空时,移除takenIndex位置的元素
* 1、如果takeIndex=items.length时,takeIndex置为0
* 2、唤醒在非空上等待的线程
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列空,返回null,窦泽移除takeIndex位置元素
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 移除takeIndex位置元素
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取takeIndex位置的元素,并置空
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 已经到了数组尾部,takeIndex置为0,从头开始
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒在非满上等待的线程
notFull.signal();
return x;
}
/**
* take移除元素
* 如果队列空,线程在非空条件上等待
* 如果队列不空,移除takeIndex位置的元素,并唤醒在非满条件上等待的线程
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列空,当前线程在非空条件上等待
while (count == 0)
notEmpty.await();
// 队列不空,移除takeIndex位置上的元素
return dequeue();
} finally {
lock.unlock();
}
}
移除指定元素
/**
* 移除指定元素
* 从takeIndex开始遍历队列,如果存在相等的元素,移除元素
*/
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 从takeIndex开始遍历队列,如果存在相等的元素,移除元素
do {
// 存在相等的元素,移除,返回true
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
/**
* 移除指定位置的元素
* 1、如果移除的元素是队尾,直接置空takeIndex位置元素
* 2、如果移除的元素不是队尾,从removeIndex开始将后一个元素向前移动,直到putIndex
*/
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
// 移除的是队尾元素,直接将takeIndex位置置空,takeIndex前移一位
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
}
// 移除的不是队尾元素,则从removeIndex开始将后一个元素向前移动,直到putIndex
else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
// 从removeIndex开始遍历
for (int i = removeIndex;;) {
// 后一个元素
int next = i + 1;
// 移动到数组尾部,调到数组头部
if (next == items.length)
next = 0;
// 还没到putIndex位置,将元素前移一位
if (next != putIndex) {
items[i] = items[next];
i = next;
}
// 到了putIndex,所有元素已经前移完成,putIndex设置为i,即putIndex也前移一位,因为前一位元素已经被移除
else {
items[i] = null;
this.putIndex = i;
break;
}
}
// 总元素-1
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 唤醒非满条件上等待的线程
notFull.signal();
}
获取队首元素
/**
* peek只获取队首元素,不会移除元素
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 返回takeIndex位置的元素
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
清空队列
/**
* 清空队列
* 1、遍历置空数组元素位
* 2、唤醒在非满条件上等待的线程
*/
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 遍历置空数组元素位
do {
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
// 唤醒在非满条件上等待的线程
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}