BlockingQueue
队列接口,具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
Throws exception | Special value | Blocks | Times out | |
Insert | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
Remove | remove() |
poll() |
take() |
poll(time, unit) |
Examine | element() |
peek() |
not applicable |
not applicable |
(1) ArrayBlockingQueue内部是根据数组实现的,对象内持有一个定长数组:final Object[] items;,初始化对象的时候执行 this.items = new Object[capacity];,所以ArrayBlockingQueue对象的长度是固定的。ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
(2) DelayQueue用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最段。类的定义如下
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {.....}
是一个有优先级的队列,内部存放了一个PriorityQueue队列q private final PriorityQueue<E> q = new PriorityQueue<E>(); q内的存放的必须都是实现了Delayed接口的对象用来确保
到期的先后顺序。添加元素方法比较简单,具体实现特性功能的获得元素方法代码如下
private final Condition available = lock.newCondition(); public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
把优先队列q的first拿出来(peek),如果没有达到延时阀值,则进行await处理。1.7里这个leader线程个人理解为当有多线程来消费队列时用来保证拿到过锁的线程也是有先后顺序来获得对象的。
(3) LinkedBlockingQueue内部实现了一个链表队列,所有元素都被包装在一个node节点里
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
添加和获取元素的方法最后对应于两个私有方法
/** * Links node at end of queue. * * @param node the node */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } /** * Removes a node from head of queue. * * @return the node */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
(4) PriorityBlockingQueue内部的是一个排序队列,所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现,队列内部的元素存在一个数组里 private transient Object[] queue;但是在数组的长度不够的时候会扩充长度,queue本质上实现了一个平衡二叉堆,具体代码请参考PriorityBlockingQueue队列的二叉堆实现。
(5) SynchronousQueue是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。队列内部通过实现了两个Transferer接口的类实现了这个功能,具体设计到的算法有待研究,分别是TransferStack,TransferQueue,前者实现后进先出,后者实现队列。