JDK1.7
BlockingQueue<E>接口 (extends Queue<E>)
所有父接口:Collection<E>,Iterable<E>,Queue<E>
所有子接口:BlockingDeque<E>,TransferQueue<E>
所有实现该接口的类:ArrayBlockingQueue,DelayQueue,LinkedBlockingQueue,LinkedTransferQueue,PriorityBlockingQueue,SynchronousQueue,LinkedBlockingDeque
一种支持操作等待的队列,取元素时如果队列为空则等待队列变为非空,放元素时如果队列满则等待队列有可用空间。元素出入方式为FIFO。
BlockingQueue的方法有四种形式构成:见下表:
Throws exception | Special value | Blocks | Times out | |
Insert | add(e) | offer(e) | put(e) | offer(e,time,unit) |
Remove | remove() | poll() | take() | poll(time,unit) |
Examine | element() | peek() | not applicable | not applicable |
BlockingQueue不接受null值元素,通过add,offer,put方法试图添加null值会抛NullPointerException. 因为null值是用来指明poll方法失败的。
BlockingQueue有容量界限,任何时候都可以通过remainingCapacity值查看可用容量,当已用容量超过remainingCapacity值时,额外的元素不能无阻塞的添加到队列中。
没有内在容量限制的队列,remainingCapacity会返回Integer.MAX值。
BlockingQueue实现类设计初衷主要用于生产者-消费者队列,但是也支持了Collection接口,因此,可以使用remove(e)方法从队列中移除任何元素。但是需要注意的是,
这种操作并不高效,偶尔才使用到。
BlockingQueue实现类都是线程安全(Thread-safe)的,所有排队方法都通过内部锁或者其他并发控制实现了原子操作。但是,像集合堆的操作,addAll(),containsAll(),
retainAll(),removeAll(),实现类中没有特殊规定的话就没有必要实现原子操作,所以,当使用addAll(c)可能会存在添加一部分之后操作失败了。
代码示例,基于典型的生产者-消费者场景。BlockingQueue可以安全的被用于多个生产者和多核
1 class Producer implements Runnable { 2 private final BlockingQueue queue; 3 Producer(BlockingQueue q) { 4 queue = q; 5 } 6 public void run() { 7 try { 8 while (true) { 9 queue.put(produce()); 10 } 11 } catch (InterruptedException ex) { ...} 12 } 13 Object produce() { ... } 14 } 15 16 class Consumer implements Runnable { 17 private final BlockingQueue queue; 18 Consumer(BlockingQueue q) { 19 queue = q; 20 } 21 public void run() { 22 try { 23 while (true) { 24 consume(queue.take()); 25 } 26 } catch (InterruptedException ex) { ... } 27 } 28 void consume(Object x) { ... } 29 } 30 31 public class Main{ 32 public static void main(String[] args) { 33 BlockingQueue q = new SomeQueueImplementation(); 34 Producer p = new Producer(q); 35 Consumer c1 = new Consumer(q); 36 Consumer c2 = new Consumer(q); 37 new Thread(p).start(); 38 new Thread(c1).start(); 39 new Thread(c2).start(); 40 } 41 }