zoukankan      html  css  js  c++  java
  • 多线程设计模式(四):生产者-消费模式

    生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信。

    一、架构模式图:

    类图:

    生产者:提交用户请求,提取用户任务,并装入内存缓冲区;

    消费者:在内存缓冲区中提取并处理任务;

    内存缓冲区:缓存生产者提交的任务或数据,供消费者使用;

    任务:生产者向内存缓冲区提交的数据结构

    Main:使用生产者和消费者的客户端。

    二、代码实现一个基于生产者-消费者模式的求整数平方的并行计算:

    (1)Producer生产者线程:

    [java] view plain copy
     
    1. <span style="font-size:18px;">package ProducerConsumer;  
    2.   
    3. import java.util.Random;  
    4. import java.util.concurrent.BlockingQueue;  
    5. import java.util.concurrent.TimeUnit;  
    6. import java.util.concurrent.atomic.AtomicInteger;  
    7.   
    8. public class Producer  implements Runnable{  
    9.       
    10.     //Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。  
    11.     //而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。  
    12.     //这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。  
    13.     private volatile  boolean isRunning= true;  
    14.       
    15.     //内存缓冲区  
    16.     private BlockingQueue<PCData> queue;  
    17.       
    18.     //总数,原子操作  
    19.     private static AtomicInteger count = new AtomicInteger();  
    20.        
    21.     private static final int SLEEPTIME=1000;  
    22.       
    23.       
    24.     public Producer(BlockingQueue<PCData> queue) {  
    25.           
    26.         this.queue = queue;  
    27.     }  
    28.   
    29.   
    30.   
    31.   
    32.     @Override  
    33.     public void run() {  
    34.         PCData data=null;  
    35.         Random r  = new Random();  
    36.         System.out.println("start producer id = "+ Thread .currentThread().getId());  
    37.         try{  
    38.             while(isRunning){  
    39.                 Thread.sleep(r.nextInt(SLEEPTIME));  
    40.                 //构造任务数据  
    41.                 data= new PCData(count.incrementAndGet());  
    42.                 System.out.println("data is put into queue ");  
    43.                 //提交数据到缓冲区  
    44.                 if(!queue.offer(data,2,TimeUnit.SECONDS)){  
    45.                     System.out.println("faile to  put data:  "+ data);  
    46.                 }  
    47.             }  
    48.         }catch (InterruptedException e){  
    49.             e.printStackTrace();  
    50.             Thread.currentThread().interrupt();  
    51.               
    52.         }  
    53.           
    54.           
    55.     }  
    56.   
    57.     public void stop(){  
    58.           
    59.         isRunning=false;  
    60.     }  
    61.   
    62.   
    63. }  
    64. </span>  

    (2)Consumer消费者线程:

    [java] view plain copy
     
    1. <span style="font-size:18px;">package ProducerConsumer;  
    2.   
    3. import java.text.MessageFormat;  
    4. import java.util.Random;  
    5. import java.util.concurrent.BlockingQueue;  
    6.   
    7. public class Consumer implements Runnable {  
    8.     //缓冲区     
    9.     private BlockingQueue<PCData> queue;  
    10.     private static final int SLEEPTIME=1000;  
    11.       
    12.       
    13.     public Consumer(BlockingQueue<PCData> queue) {          
    14.         this.queue = queue;  
    15.     }  
    16.   
    17.   
    18.     @Override  
    19.     public void run() {  
    20.         System.out.println("start Consumer id= "+ Thread .currentThread().getId());  
    21.         Random r = new Random();  
    22.           
    23.             try {  
    24.                 //提取任务  
    25.                 while(true){  
    26.                     PCData data= queue.take();  
    27.                     if(null!= data){  
    28.                         //计算平方  
    29.                         int re= data.getData()*data.getData();  
    30.                         System.out.println(MessageFormat.format("{0}*{1}={2}",  
    31.                                     data.getData(),data.getData(),re  
    32.                                 ));  
    33.                         Thread.sleep(r.nextInt(SLEEPTIME));  
    34.                                                   
    35.                     }  
    36.                 }  
    37.             } catch (InterruptedException e) {                
    38.                 e.printStackTrace();  
    39.                 Thread.currentThread().interrupt();  
    40.             }  
    41.               
    42.           
    43.           
    44.     }  
    45.       
    46.       
    47.   
    48.       
    49.   
    50. }  
    51. </span>  

    (3)PCData共享数据模型:

    [java] view plain copy
     
    1. <span style="font-size:18px;">package ProducerConsumer;  
    2.   
    3. public  final class PCData {  
    4.   
    5.     private final int intData;  
    6.   
    7.     public PCData(int d) {  
    8.         intData=d;  
    9.     }  
    10.       
    11.     public PCData(String  d) {  
    12.         intData=Integer.valueOf(d);  
    13.     }  
    14.       
    15.     public int getData(){  
    16.           
    17.         return intData;  
    18.           
    19.     }  
    20.     @Override  
    21.     public String toString(){  
    22.         return "data:"+ intData ;  
    23.     }  
    24.       
    25. }  
    26. </span>  

    (4)Main函数:

    [java] view plain copy
     
    1. <span style="font-size:18px;">package ProducerConsumer;  
    2.   
    3. import java.util.concurrent.BlockingQueue;  
    4. import java.util.concurrent.Executor;  
    5. import java.util.concurrent.ExecutorService;  
    6. import java.util.concurrent.Executors;  
    7. import java.util.concurrent.LinkedBlockingDeque;  
    8.   
    9. public class Main {  
    10.   
    11.     /** 
    12.      * @param args 
    13.      */  
    14.     public static void main(String[] args)  throws InterruptedException{  
    15.         //建立缓冲区  
    16.         BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10);  
    17.         //建立生产者  
    18.         Producer producer1 = new Producer(queue);  
    19.         Producer producer2 = new Producer(queue);  
    20.         Producer producer3 = new Producer(queue);  
    21.           
    22.         //建立消费者  
    23.         Consumer consumer1 = new Consumer(queue);  
    24.         Consumer consumer2 = new Consumer(queue);  
    25.         Consumer consumer3 = new Consumer(queue);         
    26.                   
    27.         //建立线程池  
    28.         ExecutorService service = Executors.newCachedThreadPool();  
    29.           
    30.         //运行生产者  
    31.         service.execute(producer1);  
    32.         service.execute(producer2);  
    33.         service.execute(producer3);  
    34.         //运行消费者  
    35.         service.execute(consumer1);  
    36.         service.execute(consumer2);  
    37.         service.execute(consumer3);  
    38.       
    39.         Thread.sleep(10*1000);  
    40.           
    41.         //停止生产者  
    42.         producer1.stop();  
    43.         producer2.stop();  
    44.         producer3.stop();  
    45.           
    46.         Thread.sleep(3000);  
    47.         service.shutdown();  
    48.     }  
    49.   
    50. }  
    51. </span>  

    三、注意:

        volatile关键字:Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。

        生产-消费模式的核心组件是共享内存缓冲区,是两者的通信桥梁,起到解耦作用,优化系统整体结构。

        由于缓冲区的存在,生产者和消费者,无论谁在某一局部时间内速度相对较高,都可以使用缓冲区得到缓解,保证系统正常运行,这在一定程度上缓解了性能瓶颈对系统系能的影响。

    转:http://blog.csdn.net/lmdcszh/article/details/39699261

  • 相关阅读:
    BAT 批处理脚本教程
    javascript定时器
    使用命令行打开文件夹并显示
    用cmd加密文件夹
    烟波钓叟歌概述讲解
    奇门遁甲的起源
    八卦基本知识
    word2vec和word embedding有什么区别?
    Privoxy shadowscocks代理
    Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求
  • 原文地址:https://www.cnblogs.com/duanxz/p/5143186.html
Copyright © 2011-2022 走看看