BlockingQueue顾名思义‘阻塞的队列’,是指在:队列的读取行为被阻塞直到队列不为空时,队列的写入行为被阻塞直到队列不满时。BlockingQueue是java.util.concurrent工具包(jdk1.5版本引入,作者:Doug Lea)的重要基础工具,在ThreadPoolExcutor及tomcat等服务端容器中都有使用到。从代码层面剖析BlockingQueue的实现细节。
/** The queued items */ final Object[] items; //使用数组作为对象存储的数据结构,所以是有界队列 /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /** * Creates an {@code SimpleArrayQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity < 1} */ public SimpleArrayQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; } /** * Inserts element at current put position, advances */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) //在示例代码中,由于没有加上lock判定和计数器判定,如果队列已满,指针会循环寻址,队列中先入的元素可能会被后来的元素覆盖 putIndex = 0; //实际的ArrayBlockingQueue不会有该问题,这里的循环寻址配合dequeue的同样逻辑是为了保证队列的FIFO。 count++; } /** * Extracts element at current take position, advances */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; return x; }
/** * Head of linked list. 单向链表的头节点 * Invariant: head.item == null */ transient Node<E> head; /** * Tail of linked list. 单向链表的尾节点 * Invariant: last.next == null */ private transient Node<E> last; /** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** * Linked list node class 单向链表的节点定义 */ static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } } public SimpleLinkedQueue(int capacity) { this.capacity = capacity; last = head = new Node<E>(null); //将头/尾节点均初始化为空节点 } /** * Links node at end of queue. 入队 * @param node the node */ private void enqueue(Node<E> node) { if(count.get() == capacity) { throw new IllegalStateException("Queue full"); } else { last = last.next = node; count.incrementAndGet(); } } /** * Removes a node from head of queue. 出队 * @return the node */ private E dequeue() { if(count.get()<=0) { throw new IllegalStateException("Queue empty"); } Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; count.getAndDecrement(); return x; }
/** Doubly-linked list node class */ static final class Node<E> { /** * The item, or null if this node has been removed. */ E item; /** * One of: * - the real predecessor Node * - this Node, meaning the predecessor is tail * - null, meaning there is no predecessor */ Node<E> prev; /** * One of: * - the real successor Node * - this Node, meaning the successor is head * - null, meaning there is no successor */ Node<E> next; Node(E x) { item = x; } } /** * Pointer to first node. * Invariant: (first == null && last == null) || * (first.prev == null && first.item != null) */ transient Node<E> first; /** * Pointer to last node. * Invariant: (first == null && last == null) || * (last.next == null && last.item != null) */ transient Node<E> last; /** Number of items in the deque */ private transient int count; /** Maximum number of items in the deque */ private final int capacity; /** * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity. * * @param capacity the capacity of this deque * @throws IllegalArgumentException if {@code capacity} is less than 1 */ public SimpleLinkedDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } /** * Links node as first element, or returns false if full. */ private boolean linkFirst(Node<E> node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; Node<E> f = first; node.next = f; first = node; if (last == null) last = node; else f.prev = node; ++count; return true; } /** * Links node as last element, or returns false if full. */ private boolean linkLast(Node<E> node) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; Node<E> l = last; node.prev = l; last = node; if (first == null) first = node; else l.next = node; ++count; return true; } /** * Removes and returns first element, or null if empty. */ private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Node<E> f = first; if (f == null) return null; Node<E> n = f.next; E item = f.item; f.item = null; f.next = f; // help GC first = n; if (n == null) last = null; else n.prev = null; --count; return item; } /** * Removes and returns last element, or null if empty. */ private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Node<E> l = last; if (l == null) return null; Node<E> p = l.prev; E item = l.item; l.item = null; l.prev = l; // help GC last = p; if (p == null) first = null; else p.next = null; --count; return item; }
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; //如果队列已满,则返回false,不抛出异常 else { enqueue(e); //向队列尾部插入元素e return true; } } finally { lock.unlock(); } }
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //在lock锁定期间,该生产者线程可以被中断,好处是什么呢? try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); //和offer(e)不一样,该方法会等待队列的notFull信号量,但等待时长不会超过设定的timeout时长。 } enqueue(e); return true; } finally { lock.unlock(); } }
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); //队列满,抛出异常 }
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //队列满时,生产者线程阻塞等待,直到该队列被消费者线程take后,notFull condition被signal触发 enqueue(e); } finally { lock.unlock(); } }
理论上,LinkedBlockQueue的put / take性能要好于ArrayBlockingQueue。
源码分析-SynchronousQueue 这篇文章讲的特别好。
DelayQueue 对元素进行持有直到一个特定的延迟到期。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final PriorityQueue<E> q = new PriorityQueue<E>();
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; }
ConcurrentLinkedQueue vs BlockingQueue
BlockingQueue是阻塞的队列,提供了阻塞式的put/take api,是天然的实现 consumer/producer模式的队列。当然他也提供了非阻塞式的api,如offer/poll,add/remove。
ConcurrentLinkedQueue的.size() 是要遍历一遍集合的,很慢的,所以尽量要避免用size,如果判断队列是否为空最好用isEmpty()而不是用size来判断.
CLQ 和 LBQ的典型应用场景?