BlockingQueue
它实现了Queue接口。它是A BlockingQueue with one thread putting into it, and another thread taking from it. 一端生产一端消费。
其中的一个线程将不断的将任务放入BlockingQueue,直到遇到它的临界值,但是不允许插入NULL,否则会抛出NullPointerException。另一个线程从中不断的取任务。
BlockingQueue 的方法
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常 | 特定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) |
检查 | element(o) | peek(o) |
四组不同的行为方式解释:
- 抛异常:如果试图的操作无法立即执行,抛一个异常。IllegalStateException
- 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
- 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
- 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
BlockingQueue 的实现
BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的实现(Java 6):
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
借助ArrayBlockingQueue来实现:
//消费者 public class Consumer implements Runnable { private BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //生产者 public class Producer implements Runnable { private BlockingQueue<String> queue; public Producer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { queue.add("001"); Thread.sleep(1000); queue.add("002"); Thread.sleep(1000); queue.add("003"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //test public static void main(String[] args) { Executor executor = Executors.newCachedThreadPool(); BlockingQueue<String> queue = new ArrayBlockingQueue<String>(100); executor.execute(new Producer(queue)); executor.execute(new Consumer(queue)); }
上面test中跟consumer/producer执行顺序没有关系。正如上面所说的在take的时候,会阻塞。
数组阻塞队列 ArrayBlockingQueue
ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。既然是使用数组来实现的,就必须要满足数组的一些特性,比如删除的效率较低,无法改变其size。ArrayBlockingQueue使用FIFO的方式来组织数据。同时ArrayBlockingQueue提供了一种公平性的策略。
public ArrayBlockingQueue(int capacity, boolean fair) {} fair=true的时候,会满足FIFO的策略。
延迟队列 DelayQueue
这是一个无界的阻塞队列,队列中的每一个元素都有一个延迟期,只有在元素的延迟期失效的情况下才能take。队首元素是过期时间最长的元素。如果没有过期元素,那么返回null。
getDelay(TimeUnit unit)
返回元素的过期时长。只有在小于等于0的时候,元素才可以使用。
链阻塞队列 LinkedBlockingQueue
LinkedBlockingQueue是一个可选的有界阻塞队列。处理元素的策略同样为FIFO。满足链表的性质。LinkedBlockingQueue相对数组实现的阻塞队列有较高的吞吐量,但是性能方面稍差。
有优先级的阻塞队列 PriorityBlockingQueue
PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。
所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。
注意 PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。
同步队列 SynchronousQueue
SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。
阻塞双端队列 BlockingDeque
j ava.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。本小节我将给你演示如何使用 BlockingDeque。
BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。
deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。
【参阅】http://blog.csdn.net/defonds/article/details/44021605/
http://tutorials.jenkov.com/java-concurrency/index.html