Java合集框架提供了ArrayBlockingQueue , LinkedBlockingQueue 和 PriorityBlockingQueue来支持阻塞队列。
阻塞队列(blocking queue) 在试图向一个满队列添加元素或者从空队列中删除元素时会导致线程阻塞。BlockingQueue接口继承了java.util.Queue,并且提供同步的put和take方法向队列尾部添加元素,以及从队列头部删除元素。
ArrayBlockingQueue使用数组实现阻塞队列。必须使用一个容量或者可选的公平性策略来构造ArrayBlockingQueue。
LinkedBlockingQueue使用链表实现阻塞队列。可以创建无边界或有边界的LinkedBlockingQueue.
PriorityBlockingQueue是优先队列。可以创建无边界的或有 边界的优先队列。
注意:对于无边界的LinkedBlockingQueue或PriorityBlockingQueue而言,put方法将永远不会阻塞。
本案例使用ArrayBlockingQueue来简化程序 消费者/生产者案例。生产者线程将一个整数放入队列中,而消费者线程从队列中取走一个整数。
ConsumerProducerUsingBlockingQueue.java
package TaskThread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerProducerUsingBlockingQueue { private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(2); public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new ProducerTask()); executor.execute(new ConsumerTask()); executor.shutdown(); } //A task for adding an int to the buffer private static class ProducerTask implements Runnable{ @Override public void run() { try { int i = 1; while(true) { System.out.println("Producer writes " + i); buffer.put(i++); Thread.sleep((int) (Math.random()*10000)); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //A task for reading and deleting an int from the buffer private static class ConsumerTask implements Runnable{ @Override public void run() { try { while(true) { System.out.println(" Consumer reads " + buffer.take()); Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
使用锁和条件同步生产者和消费者线程。因为同步已经在ArrayBlockingQueue中实现,所以无需使用锁和条件。