第2部分 什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
虽然生产者-消费者模式能够将生产者和消费者的代码彼此解耦开来,但它们的行为仍然会通过共享工作队列间接地耦合在一起。开发人员总是会假设消费者处理工作的速率能赶上生产者生成工作项的速率,因此通常不会为工作队列的大小设置边界,但这将导致在之后需要重新设计系统架构。因此,应该尽早地通过阻塞队列在设计中构建资源管理机制——这件事做得越早,就越容易。
在类库中包含了BlockingQueue的多种实现,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列,当不希望按照某种顺序而不是FIFO来处理元素时,这个队列将非常有用。
第3部分 代码示例
有一种类型的程序适合被分解为生产者和消费者,例如代理程序,它将扫描本地驱动上的文件并建立索引以便随后进行搜索,类似于某些桌面搜索程序或者Windows索引服务。在下面程序中,FileCrawler 给出一个生产者任务,即在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。而且,在Indexer中给出一个消费者任务,即从队列中取出文件的名称并对它们建立索引。
1 package com.concurrency.BasicBuildingBlocks_5; 2 3 import java.io.File; 4 import java.util.concurrent.BlockingQueue; 5 6 7 /** 8 * 5.8 Producer and consumer tasks in a desktop search application 9 * 10 * @ClassName: ProducerConsumer 11 * @author Xingle 12 * @date 2014-9-3 下午3:43:49 13 */ 14 public class ProducerConsumer { 15 16 //生产者任务,在某个文件层次结构中搜索符合索引标准的文件,并将它们放入工作队列 17 static class FileCrawler implements Runnable { 18 private final BlockingQueue<File> fileQueue; 19 private final File root; 20 21 public FileCrawler(BlockingQueue<File> fileQueue,File root) { 22 this.fileQueue = fileQueue; 23 24 this.root = root; 25 26 } 27 28 @Override 29 public void run() { 30 crawl(root); 31 } 32 33 /** 34 * 搜索没有被索引过的文件放入阻塞队列 35 * @author xingle 36 * @data 2014-9-3 下午5:13:52 37 */ 38 private void crawl(File root) { 39 File[] entries = root.listFiles(); 40 if (entries != null) { 41 for (File entrie : entries) { 42 if (entrie.isDirectory()) { 43 crawl(entrie); 44 } else { 45 if (!alreadyIndexed(entrie)){ 46 System.out.println("放入生产者队列文件:"+entrie.getName()+"来自线程:"+Thread.currentThread().getName()); 47 fileQueue.add(entrie); 48 } 49 50 } 51 } 52 } 53 } 54 55 /** 56 * 是否已经被索引 57 * @param entrie 58 * @return 59 * @author xingle 60 * @data 2014-9-3 下午5:26:04 61 */ 62 private boolean alreadyIndexed(File entrie) { 63 return false; 64 } 65 66 } 67 68 //消费者 69 static class Indexer implements Runnable { 70 private final BlockingQueue<File> queue; 71 72 public Indexer(BlockingQueue<File> queue){ 73 this.queue = queue; 74 } 75 @Override 76 public void run() { 77 while(true){ 78 try { 79 indexFile(queue.take()); 80 } catch (InterruptedException e) { 81 Thread.currentThread().interrupt(); 82 } 83 } 84 } 85 86 private void indexFile(File file) { 87 System.out.println("消费者取出文件:"+file.getName()+"来自线程:"+Thread.currentThread().getName()); 88 }; 89 } 90 91 }
测试程序:
1 package com.concurrency.BasicBuildingBlocks_5; 2 3 import java.io.File; 4 import java.util.concurrent.BlockingQueue; 5 import java.util.concurrent.LinkedBlockingQueue; 6 7 import com.concurrency.BasicBuildingBlocks_5.ProducerConsumer.FileCrawler; 8 import com.concurrency.BasicBuildingBlocks_5.ProducerConsumer.Indexer; 9 10 /** 11 * 测试生成者和消费者 12 * @ClassName: testMain 13 * TODO 14 * @author Xingle 15 * @date 2014-9-3 下午6:03:56 16 */ 17 public class testMain { 18 19 public static void main(String[] args) { 20 21 File file = new File("D:\test1/"); 22 File[] roots = file.listFiles(); 23 final int BOUND = 10; 24 final int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); 25 BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND); 26 27 //消费 28 for (int i = 0; i < N_CONSUMERS; i++) { 29 new Thread(new Indexer(queue)).start(); 30 } 31 //生产 32 for (File root : roots) { 33 new Thread(new FileCrawler(queue, root)).start(); 34 } 35 } 36 37 }
执行结果:
参考:
1.《Java 并发编程实战》5.3节