生产者-消费者模式
1. 阻塞队列
生产者消费者模式中,一般采用阻塞队列来实现。阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
2. 阻塞队列的运用
当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
阻塞队列中提供了四种处理方法
JDK7提供了7个阻塞队列。分别是
1)ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
2)LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
3)PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
4)DelayQueue:一个使用优先级队列实现的无界阻塞队列。
5)SynchronousQueue:一个不存储元素的阻塞队列。
6)LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
7)LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
3. 生产者-消费者的代码实现
生产者:
package com.commonCode.producerConsumer; import java.util.concurrent.BlockingQueue; /** * Created by Demrystv. */ public class Producer implements Runnable { BlockingQueue<String> queue; public Producer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { String temp = "A Product, 生产线程:" + Thread.currentThread().getName(); System.out.println("I have made a product:" + Thread.currentThread().getName()); queue.put(temp);//如果对垒是满的话,会阻塞当前线程 } catch (InterruptedException e) { e.printStackTrace(); } } }
消费者:
package com.commonCode.producerConsumer; import java.util.concurrent.BlockingQueue; /** * Created by Demrystv. */ public class Consumer implements Runnable { BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { String temp = queue.take();//如果队列为空,就会阻塞线程 System.out.println(temp); } catch (InterruptedException e) { e.printStackTrace(); } } }
测试类:
package com.commonCode.producerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; /** * Created by Demrystv. */ public class test { public static void main(String[] args) { BlockingQueue<String> queue = new LinkedBlockingDeque<String>(2); //不设置的话,默认值为Integer.MAX_VALUE Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); for (int i = 0; i < 5; i++) { new Thread(producer, "Producer" + (i + 1)).start(); new Thread(consumer, "Consumer" + (i + 1)).start(); } } }
执行结果: