功能简介:
- LinkedBlockingQueue是一种基于单向链表实现的有界的(可选的,不指定默认int最大值)阻塞队列。队列中的元素遵循先入先出 (FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。(在并发程序中,基于链表实现的队列和基于数组实现的队列相比,往往具有更高的吞吐 量,但性能稍差一些)
源码分析:
- 首先看下LinkedBlockingQueue内部的数据结构:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; /** * Linked list node class */ static class Node<E> { /** The item, volatile to ensure barrier separating write and read */ volatile E item; Node<E> next; Node(E x) { item = x; } } /** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** 这里的count为原子量,避免了一些使用count的地方需要加两把锁。 */ private final AtomicInteger count = new AtomicInteger(0); /** Head of linked list */ private transient Node<E> head; /** Tail of linked list */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); /** * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if <tt>capacity</tt> is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); for (E e : c) add(e); }
首先可见,内部为单向链表;其次,内部为两把锁:存锁和取锁,并分别关联一个条件(是一种双锁队列)。
- 还是从put和take入手,先看下put方法:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset // local var holding count negative to indicate failure unless set. int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from * capacity. Similarly for all other uses of count in * other wait guards. */ try { while (count.get() == capacity) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to a non-interrupted thread throw ie; } insert(e); c = count.getAndIncrement(); if (c + 1 < capacity) /* * 注意这里的处理:和单锁队列不同,count为原子量,不需要锁保护。 * put过程中可能有其他线程执行多次get,所以这里需要判断一下当前 * 如果还有剩余容量,那么继续唤醒notFull条件上等待的线程。 */ notFull.signal(); } finally { putLock.unlock(); } if (c == 0) //如果count又0变为1,说明在队列是空的情况下插入了1个元素,唤醒notNull条件上等待的线程。 signalNotEmpty(); } /** * Creates a node and links it at end of queue. * @param x the item */ private void insert(E x) { last = last.next = new Node<E>(x); } /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
代码很容易看懂,再看下take方法实现:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { try { while (count.get() == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to a non-interrupted thread throw ie; } x = extract(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } /** * Removes a node from head of queue, * @return the node */ private E extract() { Node<E> first = head.next; head = first; E x = first.item; first.item = null; return x; } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
和put对等的逻辑,也很容易看懂。
- 上面看到,主要方法里并没有同时用两把锁,但有些方法里会同时使用两把锁,比如remove方法等:
public boolean remove(Object o) { if (o == null) return false; boolean removed = false; fullyLock(); try { Node<E> trail = head; Node<E> p = head.next; while (p != null) { if (o.equals(p.item)) { removed = true; break; } trail = p; p = p.next; } if (removed) { p.item = null; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signalAll(); } } finally { fullyUnlock(); } return removed; } /** * Lock to prevent both puts and takes. */ private void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Unlock to allow both puts and takes. */ private void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }