什么是阻塞队列?
阻塞队列,顾名思义,当您尝试从队列中出队并且队列为空时,或者尝试入队并且队列已满时,它将阻塞。
试图从空队列中出队的线程将被阻止,直到其他线程将一个对象插入队列中为止。
尝试使一个对象插入队列的线程被阻塞,直到某个其他线程在队列中腾出空间为止,方法是使一个或多个对象出队或完全清空队列。
两个线程通过阻塞队列的协作图
BlockingQueue的简单实现
public class BlockingQueue { private List queue = new LinkedList(); private int limit = 10; public BlockingQueue(int limit){ this.limit = limit; } public synchronized void enqueue(Object item) throws InterruptedException { while(this.queue.size() == this.limit) { wait(); } this.queue.add(item); if(this.queue.size() == 1) { notifyAll(); } } public synchronized Object dequeue() throws InterruptedException{ while(this.queue.size() == 0){ wait(); } if(this.queue.size() == this.limit){ notifyAll(); } return this.queue.remove(0); } }
BlockingQueue实现被设计为主要用于生产者-消费者队列。
package blockingqueue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { queue.put(produce()); } } catch (InterruptedException ex) { } } Object produce() { return new Object(); } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException ex) { } } void consume(Object x) { System.out.println(x); } } public class Main { public static void main(String[] args) { BlockingQueue q = new ArrayBlockingQueue<Object>(10);// Producer p0 = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p0).start(); new Thread(c1).start(); new Thread(c2).start(); } }
注意:BlockingQueue可以安全地与多个生产者和多个消费者一起使用。
参考:http://tutorials.jenkov.com/java-concurrency/blocking-queues.html