一、Queue
Queye接口体系图
体系分析:
Deque实现类:ArrayDeque, LinkedList(数组和链表实现双向队列)
BlockingDeque实现类:LinkedBlockingDeque(链表实现阻塞双向队列)
BlockingQueue实现类:ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,DelayQueue
Queue源码
public interface Queue<E> extends Collection<E> { //jdk1.5 boolean add(E e); boolean offer(E e); E remove(); E poll(); E element(); E peek(); }
在处理元素前用于保存元素的 collection.除了基本的 Collection
操作外,队列还提供其他的插入、提取和检查操作.每个方法都存在两种形式:一种抛出异常(操作失败时),另一种返回一个特殊值(null 或 false,具体取决于操作).插入操作的后一种形式是用于专门为有容量限制的 Queue 实现设计的;在大多数实现中,插入操作不会失败.
Queue 接口并未定义阻塞队列的方法,而这在并发编程中是很常见的.BlockingQueue
接口定义了那些等待元素出现或等待队列中有可用空间的方法,这些方法扩展了此接口.
二、Deque
一个线性 collection,支持在两端插入和移除元素.名称 deque 是“double ended queue(双端队列)”的缩写.
此接口定义在双端队列两端访问元素的方法.提供插入、移除和检查元素的方法.每种方法都存在两种形式:一种形式在操作失败时抛出异常,另一种形式返回一个特殊值(null 或 false,具体取决于操作).插入操作的后一种形式是专为使用有容量限制的 Deque 实现设计的;在大多数实现中,插入操作不能失败.
下表总结了上述 12 种方法:
此接口扩展了 Queue
接口.在将双端队列用作队列时,将得到 FIFO(先进先出)行为.将元素添加到双端队列的末尾,从双端队列的开头移除元素.从 Queue 接口继承的方法完全等效于 Deque 方法,如下表所示:
双端队列也可用作 LIFO(后进先出)堆栈.应优先使用此接口而不是遗留 Stack
类.在将双端队列用作堆栈时,元素被推入双端队列的开头并从双端队列开头弹出.堆栈方法完全等效于 Deque 方法,如下表所示:
此接口提供了两种移除内部元素的方法:removeFirstOccurrence
和 removeLastOccurrence
.注意,在将双端队列用作队列或堆栈时,peek
方法同样正常工作;无论哪种情况下,都从双端队列的开头抽取元素.
三、BlockingQueue
支持两个附加操作的 Queue
,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用.
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞.下表中总结了这些方法:
BlockingQueue 实现是线程安全的,实现主要用于生产者-消费者队列,以下是基于典型的生产者-使用者场景的一个用例:
class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { queue.put(produce()); } } catch (InterruptedException ex) {... handle ...} } Object produce() { ... } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... }
} class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
改写之前的消费者生产者模式
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; class Product{ int id; Product(int id){ this.id=id; } @Override public String toString() { return super.toString(); } } class Productor implements Runnable{ BlockingQueue<Product> blockingQueue = null; public Productor(BlockingQueue<Product> blockingQueue ) { this.blockingQueue=blockingQueue; } @Override public void run() { for(int i=0;i<20;i++){ Product p =new Product(i); try { blockingQueue.put(p); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"生成了一个"+p); } } } class Comsumer implements Runnable{ BlockingQueue<Product> blockingQueue = null; public Comsumer(BlockingQueue<Product> blockingQueue ) { this.blockingQueue=blockingQueue; } @Override public void run() { Product p=null; for(;;){ try { p = blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"消费了一个"+p); } } } public class TestBlockingQueue { public static void main(String[] args) { BlockingQueue<Product> b = new LinkedBlockingQueue<Product>(); Productor p = new Productor(b); Comsumer c = new Comsumer(b); new Thread(p).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); } }
四、ArrayBlockingQueue
一个由数组支持的有界阻塞队列,构造方法必须定义确定大小的队列,此队列按 FIFO(先进先出)原则对元素进行排序.队列的头部 是在队列中存在时间最长的元素.队列的尾部 是在队列中存在时间最短的元素.新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素.
这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素.一旦创建了这样的缓存区,就不能再增加其容量.试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞.
ArrayBlockingQueue中的方法基本都使用了同步,例生产者消费者的底层代码:
final Object[] items; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
五、LinkedBlockingQueue
一个基于已链接节点的、范围任意的 blocking queue.此队列按 FIFO(先进先出)排序元素.队列的头部 是在队列中时间最长的元素.队列的尾部 是在队列中时间最短的元素.新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素.链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低.
可选的容量范围构造方法参数作为防止队列过度扩展的一种方法.如果未指定容量,则它等于 Integer.MAX_VALUE
.除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点.例其中一个方法:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
private void enqueue(Node<E> node) { last = last.next = node; }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
PriorityQueue一个基于优先级无界优先级队列.优先级队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的Comparator
进行排序,具体取决于所使用的构造方法.
PriorityBlockingQueue一个无界阻塞队列,它使用与类 PriorityQueue
相同的顺序规则,并且提供了阻塞获取操作.
LinkedBlockingDeque一个基于已链接节点的、任选范围的阻塞双端队列。
SynchronousQueue每个插入操作必须等待另一个线程的对应移除操作.
DelayQueue个无界阻塞队列,只有在延迟期满时才能从中提取元素.
注:1.以上所有队列都不能插入null元素,因为很多方法的返回值是null.
2.队列与List
接口不同,此接口不支持通过索引访问元素.队列只能从头部和尾部获取元素.