zoukankan      html  css  js  c++  java
  • 生产者-消费者模式

      生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。在生产者-消费者模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。生产者和消费者之间通过共享内存缓冲区进行通信。

      生产者-消费者模式的核心组件是共享内存缓冲区,它作为生产者和消费者时间通信的桥梁,避免了生产者和消费者之间直接通信,从而将生产者和消费者时间进行解耦。生产者不需要知道消费者的存在,消费者也不需要知道生产者的存在。同时由于缓冲区的存在,允许生产者和消费者在执行速度上存在时间差,无论是生产者在某一局部时间内速度高于消费者,还是消费者在某一局部时间内速度高于生产者,都可以通过共享内存缓冲区得到缓解,确保系统正常运行。

      生产者-消费者模式的主要角色输入下表所示:

    生产者-消费者模式主要角色
    生产者 用于提交用户请求,提取用户任务,并装入内存缓冲区
    消费者 在内存缓冲区中提取并处理任务
    内存缓冲区 缓存生产者提交的任务或者数据,供消费者使用
    任务 生产者向内存缓冲区提交的数据结构
    Main 使用生产者和消费者的客户端

    下图显示了生产者-消费者模式一种实现的具体结构:

     

      其中,BlockingQueue充当了共享内存缓冲区,用于维护任务或数据队列(PCData对象)。PCData对象表示一个生产任务或者相关任务的数据。生产者对象和消费者对象均引用同一个BlockingQueue对象。生产者负责创建PCData,并将它加入到BlockingQueue对象中,消费者则从同一个BlockingQueue中获取PCData,并执行完该任务。
    下面代码实现了基于生产者-消费者模式的求整数平方的并行程序。

     1 public class PCData {
     2     private final int intData;//数据
     3 
     4     public PCData(int intData){
     5         this.intData = intData;
     6     }
     7 
     8     public PCData(String d){
     9         intData = Integer.valueOf(d);
    10     }
    11 
    12     public int getIntData(){
    13         return intData;
    14     }
    15 
    16     @Override
    17     public String toString(){
    18         return "data:" + intData;
    19     }
    20 }
     1 public class Producer implements Runnable {
     2 
     3     private volatile boolean isRunning = true;
     4     //内存缓冲区
     5     private BlockingQueue<PCData> queue;
     6     //总数,原子操作
     7     private static AtomicInteger count = new AtomicInteger();
     8     private static final int SLEEP_TIME = 1000;
     9 
    10     public Producer(BlockingQueue<PCData> queue){
    11         this.queue = queue;
    12     }
    13 
    14     @Override
    15     public void run() {
    16         PCData data = null;
    17         Random random = new Random();
    18 
    19         System.out.println("start producer id = " + Thread.currentThread().getId());
    20         while (isRunning){
    21             try {
    22                 Thread.sleep(random.nextInt(SLEEP_TIME));
    23                 data = new PCData(count.incrementAndGet());//构造任务数据
    24                 System.out.println(data + " is put into queue!");
    25                 if (!queue.offer(data,2, TimeUnit.SECONDS)){//提交数据到缓冲区,offer(),当队列满时,直接返回false。
    26                     System.out.println("failed to put data:" + data);
    27                 }
    28             } catch (InterruptedException e) {
    29                 e.printStackTrace();
    30                 Thread.currentThread().interrupt();
    31             }
    32         }
    33     }
    34 
    35     public void stop(){
    36         isRunning = false;
    37     }
    38 }
    public class Customer implements Runnable {
    
        private BlockingQueue<PCData> queue;//缓冲区
        private static final int SLEEP_TIME = 1000;
    
        public Customer(BlockingQueue<PCData> queue){
            this.queue = queue;
        }
    
        @Override
        public void run() {
            System.out.println("start customer id = " + Thread.currentThread().getId());
            Random random = new Random();
    
            while (true){
                try {
                    PCData data = queue.poll();//提取数据
                    if (null != data){
                        int re = data.getIntData() * data.getIntData();//计算平方
                        System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntData(),data.getIntData(),re));
                        Thread.sleep(random.nextInt(SLEEP_TIME));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    public class Client{
        //测试
        public static void main(String[] args) throws InterruptedException {
            //建立缓冲区
            BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
            Producer producer1 = new Producer(queue);//生产者
            Producer producer2 = new Producer(queue);
            Producer producer3 = new Producer(queue);
            Customer customer1 = new Customer(queue);//消费者
            Customer customer2 = new Customer(queue);
            Customer customer3 = new Customer(queue);
    
            ExecutorService es = Executors.newCachedThreadPool();
            es.execute(producer1);//运行生产者
            es.execute(producer2);
            es.execute(producer3);
            es.execute(customer1);//运行消费者
            es.execute(customer2);
            es.execute(customer3);
    
            Thread.sleep(1000);
            producer1.stop();//停止生产者
            producer2.stop();
            producer3.stop();
            Thread.sleep(3000);
            es.shutdown();
        }
    }

    输出结果:

    start producer id = 11
    start customer id = 14
    start customer id = 15
    start producer id = 12
    start customer id = 16
    start producer id = 13
    data:1 is put into queue!
    1*1=1
    data:2 is put into queue!
    2*2=4
    data:3 is put into queue!
    3*3=9
    data:4 is put into queue!
    4*4=16
    data:5 is put into queue!
    5*5=25
    data:6 is put into queue!
    6*6=36
    data:7 is put into queue!
    7*7=49
    data:8 is put into queue!
    8*8=64
    data:9 is put into queue!
    9*9=81

    上述代码很简单,看过文章数据共享通道:BlockingQueue后,注释也比较详细,就不再赘述代码的意思了。

    总结:
      生产者-消费者模式很好的对生产者和消费者进行解耦,优化了系统整体结构。同时,由于缓冲区的作用,允许生产者线程和消费者线程存在执行上的性能差异,从一定的程度上缓解了性能瓶颈对系统性能的影响。

    作者:Joe
    努力了的才叫梦想,不努力的就是空想,努力并且坚持下去,毕竟这是我相信的力量
  • 相关阅读:
    hdu5002 Tree
    hdu6858(杭电多校第八场) Discovery of Cycles
    杭电多校第八场总结
    ubuntu刷新swap
    python 如何关闭warning的输出
    python 如何获取整数,浮点数的最大值
    补码
    LaTeX 公式集锦
    Codeforces 581D Three Logos
    Codeforces 582 A. GCD Table
  • 原文地址:https://www.cnblogs.com/Joe-Go/p/9805079.html
Copyright © 2011-2022 走看看