在集合框架里,想必都用过ArrayList和LinkedList,ArrayList和ArrayBlockingQueue一样,内部基于数组来存放元素,而LinkedBlockingQueue则和LinkedList一样,内部基于链表来存放元素。
队列常见的出队和入队方法
根据下面代码看下 [ArrayBlockingQueue] 的源码
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20,true);
queue.put("element");//阻塞方法
String element = queue.take();//阻塞方法
[ArrayBlockingQueue] 构造方法
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
元素入队列 [put] 方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//获取锁,如果interrupt,直接抛出异常
try {
while (count == items.length)
notFull.await();//队列满了,阻塞等待被唤醒
enqueue(e);//入队
} finally {
lock.unlock();//释放锁
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)//如果入队下标已满,重置putIndex
putIndex = 0;
count++;//队列总数+1
notEmpty.signal();//唤醒取队列阻塞的线程
}
元素入队列 [offer] 方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)//只要时间没有超过给定值,一直自旋
return false;
//自己释放锁后重新竞争锁
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
取队列元素 [take] 方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//获取锁,如果interrupt,直接抛出异常
try {
while (count == 0)
notEmpty.await();//队列空了,阻塞等待被唤醒
return dequeue();//出队
} finally {
lock.unlock();//释放锁
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)//如果出队下标已满,重置takeIndex
takeIndex = 0;
count--;//总数减1
//迭代器有关
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//唤醒入队列阻塞的线程
return x;
}
取队列的非阻塞方法 [poll] 方法
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
其他方法类似。
根据下面代码看下 [LinkedBlockingQueue] 的源码
//如果没有指定大小,默认为Integer.MAX_VALUE,也就是无界队列 LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(4); queue.put("element");//阻塞方法 queue.take();//阻塞方法
元素入队列 [put] 方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取putLock锁,如果interrupt,直接抛出异常
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();//队列满了,阻塞等待被唤醒
}
enqueue(node);//入队
c = count.getAndIncrement();
// 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();//释放putLock锁
}
if (c == 0)//唤醒取队列阻塞的线程
signalNotEmpty();
}
private void enqueue(Node<E> node) {
//新增节点变为最后一个节点
last = last.next = node;
}
元素出队列 [take] 方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;//总数
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); //获取takeLock锁,如果interrupt,直接抛出异常
try {
while (count.get() == 0) {
notEmpty.await();//队列空了,阻塞等待被唤醒
}
x = dequeue();//出队
c = count.getAndDecrement();
//c > 1说明c至少为2, 队列中还有元素,唤醒下一个消费线程进行消费
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();//释放takeLock锁
}
if (c == capacity)//唤醒入队列阻塞的线程
signalNotFull();
return x;
}
private E dequeue() {
// 获取到head节点
Node<E> h = head;
// 获取到head节点指向的下一个节点
Node<E> first = h.next;
// head节点原来指向的节点的next指向自己,等待下次gc回收
h.next = h; // help GC
// head节点指向新的节点
head = first;
// 获取到新的head节点的item值,即队列中的元素
E x = first.item;
first.item = null;
return x;
}
元素出队列 [poll] 方法
public E poll() {
final AtomicInteger count = this.count;
//poll方法和take方法不一样的地方,take是阻塞,poll直接返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
删除队列元素 [remove] 方法
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();//两个lock全部上锁
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {//循环查找
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();//释放锁
}
}
void unlink(Node<E> p, Node<E> trail) {
//删除节点后,node之间重新指向
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
总结两者的区别:
1.ArrayBlockingQueue是有界的,而LinkedBlockingQueue默认是无界的(可以通过指定大小来变为有界)。ArrayBlockingQueue有界就意味着我们使用ArrayBlockingQueue必须指定capacity大小。这样的话,内存空间会直接预先分配好,所以在使用LinkedBlockingQueue无界情况下时要考虑到内存实际使用问题,防止内存溢出问题的发生。
2.锁使用的比较。ArrayBlockingQueue内部使用1个锁来控制队列项的插入、取出操作,而LinkedBlockingQueue则是使用了2个锁来控制,一个名为putLock,另一个是takeLock,但是锁的本质都是ReentrantLock。因为LinkedBlockingQueue使用了2个锁的情况下,所以在一定程度上提升了LinkedBlockingQueue支持高并发的场景操作。
3.在ArrayBlockingQueue内部,因为是直接使用数组空间的,而且都是预先分配好的,所以操作没有那么复杂,而在LinkedBlockingQueue中,是通过链表进行维护的,而且每次插入的对象还要转为Node<>(e)对象,相当于多做了一步操作,但是根据LinkedBlockingQueue的官方描述,它是具有更好吞吐性能的。
官方解释: