/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } /** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of the * given collection, * added in traversal order of the collection's iterator. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
LinkedBlockingQueue维护有两个 Node 节点分别用来存放队列的首、尾节点。
创建队列时头、尾节点指向一个 item 为 null 的哨兵节点。
/** * Linked list node class. */ static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
对头 、 尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程。
/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to * insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() == capacity) return false; enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
如果待插入元素 e为 null,则抛出 NullPointerException异常。
如果队列中有空闲,则插入成功后返回 true;如果队列己满,则返回 false。
从代码可知offer方法通过使用 putLock锁保证了在队尾新增元素操作的原子性。
该方法在元素入队成功后,将唤醒 notFull 的条件队列里面被阻塞的一个线程(如:执行 put 方法时,因队列满了而被阻塞的线程),因为队列现在有空闲所以这里可以提前唤醒一个生产者线程。
/** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
如果待插入元素 e为 null,则抛出 NullPointerException异常。
如果队列中有空闲,则插入成功后返回 true;如果队列己满,则阻塞当前线程(调用 notFull 的 await()方法把当前线程放入 notFull 的条件队列,并释放putLock锁),直到队列有空闲后插入成功后再返回。
问题:上述代码在判断队列是否为空时为何使用 while 循环而不是 if语句?
这是考虑到虚假唤醒的情况,notFull.await()在其他线程没有调用 notFull 的 singal 方法时也可能返回。
如果使用 if语句,那么虚假唤醒后会执行元素入队操作,并且递增计数器;由于队列己经满,将导致队列元素个数大于设置的容量,进而导致程序出错。
使用 while循环时,如果 notFull.await()被虚假唤醒,则再次循环检查当前队列是否己满。
put方法使用 putLock.locklntenuptibly()获取独占锁,所以这个方法可以被中断,即:当前线程在获取锁的过程中被其他线程设置了中断标志,则当前线程会抛出 IntenuptedException 异常而返回。
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; final E x; final int c; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() == 0) return null; x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
如果队列为空,则返回 null。
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
take方法使用 takeLock.locklntenuptibly()获取独占锁,所以这个方法可以被中断,即:当前线程在获取锁的过程中被其他线程设置了中断标志,则当前线程会抛出 IntenuptedException 异常而返回。