zoukankan      html  css  js  c++  java
  • 并发编程 03—— 阻塞队列和生产者-消费者模式

    第2部分 什么是生产者消费者模式

    第3部分 代码示例

    第1部分 为什么要使用生产者和消费者模式

      在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    第2部分 什么是生产者消费者模式

      生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

      虽然生产者-消费者模式能够将生产者和消费者的代码彼此解耦开来,但它们的行为仍然会通过共享工作队列间接地耦合在一起。开发人员总是会假设消费者处理工作的速率能赶上生产者生成工作项的速率,因此通常不会为工作队列的大小设置边界,但这将导致在之后需要重新设计系统架构。因此,应该尽早地通过阻塞队列在设计中构建资源管理机制——这件事做得越早,就越容易。

      在类库中包含了BlockingQueue的多种实现,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列,当不希望按照某种顺序而不是FIFO来处理元素时,这个队列将非常有用。

    第3部分 代码示例

      有一种类型的程序适合被分解为生产者和消费者,例如代理程序,它将扫描本地驱动上的文件并建立索引以便随后进行搜索,类似于某些桌面搜索程序或者Windows索引服务。在下面程序中,FileCrawler 给出一个生产者任务,即在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。而且,在Indexer中给出一个消费者任务,即从队列中取出文件的名称并对它们建立索引。

     1 package com.concurrency.BasicBuildingBlocks_5;
     2 
     3 import java.io.File;
     4 import java.util.concurrent.BlockingQueue;
     5 
     6 
     7 /**
     8  * 5.8 Producer and consumer tasks in a desktop search application
     9  * 
    10  * @ClassName: ProducerConsumer 
    11  * @author Xingle
    12  * @date 2014-9-3 下午3:43:49
    13  */
    14 public class ProducerConsumer {
    15 
    16     //生产者任务,在某个文件层次结构中搜索符合索引标准的文件,并将它们放入工作队列
    17     static class FileCrawler implements Runnable {
    18         private final BlockingQueue<File> fileQueue;        
    19         private final File root;
    20 
    21         public FileCrawler(BlockingQueue<File> fileQueue,File root) {
    22             this.fileQueue = fileQueue;
    23 
    24             this.root = root;
    25             
    26         }
    27 
    28         @Override
    29         public void run() {
    30             crawl(root);
    31         }
    32 
    33         /**
    34          * 搜索没有被索引过的文件放入阻塞队列
    35          * @author xingle
    36          * @data 2014-9-3 下午5:13:52
    37          */
    38         private void crawl(File root) {
    39             File[] entries = root.listFiles();
    40             if (entries != null) {
    41                 for (File entrie : entries) {
    42                     if (entrie.isDirectory()) {
    43                         crawl(entrie);
    44                     } else {
    45                         if (!alreadyIndexed(entrie)){
    46                             System.out.println("放入生产者队列文件:"+entrie.getName()+"来自线程:"+Thread.currentThread().getName());
    47                             fileQueue.add(entrie);
    48                         }
    49                             
    50                     }
    51                 }
    52             }
    53         }
    54 
    55         /**
    56          * 是否已经被索引
    57          * @param entrie
    58          * @return
    59          * @author xingle
    60          * @data 2014-9-3 下午5:26:04
    61          */
    62         private boolean alreadyIndexed(File entrie) {
    63             return false;
    64         }
    65 
    66     }
    67     
    68     //消费者
    69     static class Indexer implements Runnable {
    70         private final BlockingQueue<File> queue;
    71         
    72         public Indexer(BlockingQueue<File> queue){
    73             this.queue = queue;
    74         }
    75         @Override
    76         public void run() {
    77             while(true){
    78                 try {
    79                     indexFile(queue.take());
    80                 } catch (InterruptedException e) {
    81                     Thread.currentThread().interrupt();
    82                 }
    83             }
    84         }
    85 
    86         private void indexFile(File file) {
    87             System.out.println("消费者取出文件:"+file.getName()+"来自线程:"+Thread.currentThread().getName());
    88         };
    89     }
    90 
    91 }

    测试程序:

     1 package com.concurrency.BasicBuildingBlocks_5;
     2 
     3 import java.io.File;
     4 import java.util.concurrent.BlockingQueue;
     5 import java.util.concurrent.LinkedBlockingQueue;
     6 
     7 import com.concurrency.BasicBuildingBlocks_5.ProducerConsumer.FileCrawler;
     8 import com.concurrency.BasicBuildingBlocks_5.ProducerConsumer.Indexer;
     9 
    10 /**
    11  * 测试生成者和消费者
    12  * @ClassName: testMain
    13  * TODO
    14  * @author Xingle
    15  * @date 2014-9-3 下午6:03:56
    16  */
    17 public class testMain {
    18     
    19     public static void main(String[] args) {
    20         
    21         File file = new File("D:\test1/");
    22         File[] roots = file.listFiles();
    23         final int BOUND = 10;
    24         final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
    25         BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
    26         
    27         //消费        
    28         for (int i = 0; i < N_CONSUMERS; i++) {
    29             new Thread(new Indexer(queue)).start();
    30         }
    31         //生产
    32         for (File root : roots) {
    33             new Thread(new FileCrawler(queue, root)).start();
    34         }        
    35     }
    36 
    37 }

    执行结果:


    参考:

    1.《Java 并发编程实战》5.3节

    2.聊聊并发——生产者消费者模式

  • 相关阅读:
    Redis进阶
    redis常用指令
    MarkDown基本语法
    JAVA多线程面试
    使用POI操作Excel
    IDEA+GIT的使用
    获取地址栏的参数
    mybatis逆向工程
    遍历map集合
    springboot批量删除
  • 原文地址:https://www.cnblogs.com/xingele0917/p/3956238.html
Copyright © 2011-2022 走看看