zoukankan      html  css  js  c++  java
  • Java中生产者与消费者模式

     生产者消费者模式

    首先来了解什么是生产者消费者模式。该模式也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

    要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

    2 用wait和notify实现

    这种方法的基本原理是:一个线程负责生产数据,放到共享区域,然后通知另一个线程去消耗数据。如果没有wait()和notify(),消费者线程就要不停去检查是否有数据被产生。

    接下来介绍一下wait()和notify(),在这里把它们和sleep()做一个对比,方便理解

    不同点sleep()wait()和notify()
    原理 线程用来控制自身流程,会使该线程暂停执行一段时间,把执行机会让给其它线程。时间一到就复苏。 是Object类的方法,会使当前拥有该对象锁的进程等待,直到其他线程调用notify()方法。
    锁的处理机制 只是让线程暂停执行一段时间,不会释放锁 调用wait(),线程会释放掉锁
    使用区域 必须放在同步控制方法或者同步语句块中 可以放在任何地方
    异常 必须捕获异常,例如InterruptedException等 不用捕获异常

    sleep不会释放锁,容易导致死锁(在我的上一篇博客 JAVA多线程(二)竞态条件、死锁及同步机制有描述)。因此推荐使用wait()和notify()。下面转载一份源代码,

    import java.util.LinkedList;
    import java.util.Queue;
    import org.apache.log4j.Logger;
    
    public class InterThreadCommunicationExample {
    
        public static void main(String args[]) {
    
            final Queue sharedQ = new LinkedList();
    
            Thread producer = new Producer(sharedQ);
            Thread consumer = new Consumer(sharedQ);
    
            producer.start();
            consumer.start();
    
        }
    }
    
    public class Producer extends Thread {
        private static final Logger logger = Logger.getLogger(Producer.class);
        private final Queue sharedQ;
    
        public Producer(Queue sharedQ) {
            super("Producer");
            this.sharedQ = sharedQ;
        }
    
        @Override
        public void run() {
    
            for (int i = 0; i < 4; i++) {
    
                synchronized (sharedQ) {
                    //waiting condition - wait until Queue is not empty
                    while (sharedQ.size() >= 1) {
                        try {
                            logger.debug("Queue is full, waiting");
                            sharedQ.wait();
                        } catch (InterruptedException ex) {
                            ex.printStackTrace();
                        }
                    }
                    logger.debug("producing : " + i);
                    sharedQ.add(i);
                    sharedQ.notify();
                }
            }
        }
    }
    
    public class Consumer extends Thread {
        private static final Logger logger = Logger.getLogger(Consumer.class);
        private final Queue sharedQ;
    
        public Consumer(Queue sharedQ) {
            super("Consumer");
            this.sharedQ = sharedQ;
        }
    
        @Override
        public void run() {
            while(true) {
    
                synchronized (sharedQ) {
                    //waiting condition - wait until Queue is not empty
                    while (sharedQ.size() == 0) {
                        try {
                            logger.debug("Queue is empty, waiting");
                            sharedQ.wait();
                        } catch (InterruptedException ex) {
                            ex.printStackTrace();
                        }
                    }
                    int number = sharedQ.poll();
                    logger.debug("consuming : " + number );
                    sharedQ.notify();
    
                    //termination condition
                    if(number == 3){break; }
                }
            }
        }
    }
    
    Output:
    05:41:57,244 0    [Producer] DEBUG concurrency.Producer  - producing : 0
    05:41:57,260 16   [Producer] DEBUG concurrency.Producer  - Queue is full, waiting
    05:41:57,260 16   [Consumer] DEBUG concurrency.Consumer  - consuming : 0
    05:41:57,260 16   [Consumer] DEBUG concurrency.Consumer  - Queue is empty, waiting
    05:41:57,260 16   [Producer] DEBUG concurrency.Producer  - producing : 1
    05:41:57,260 16   [Producer] DEBUG concurrency.Producer  - Queue is full, waiting
    05:41:57,260 16   [Consumer] DEBUG concurrency.Consumer  - consuming : 1
    05:41:57,260 16   [Consumer] DEBUG concurrency.Consumer  - Queue is empty, waiting
    05:41:57,260 16   [Producer] DEBUG concurrency.Producer  - producing : 2
    05:41:57,260 16   [Producer] DEBUG concurrency.Producer  - Queue is full, waiting
    05:41:57,260 16   [Consumer] DEBUG concurrency.Consumer  - consuming : 2
    05:41:57,260 16   [Consumer] DEBUG concurrency.Consumer  - Queue is empty, waiting
    05:41:57,260 16   [Producer] DEBUG concurrency.Producer  - producing : 3
    05:41:57,276 32   [Consumer] DEBUG concurrency.Consumer  - consuming : 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101

    3 使用阻塞队列实现

    使用wait()和notify()是经典方法,这里介绍一个高级方法。

    BlockingQueue中提供了put()和take()方法,可以极大简化生产者消费者模式的实现过程。这一过程的基本原理是,如果队列满了,put()方法就会被阻塞;如果队列是空的,take()方法会阻塞。与传统的wait()和notify()方法相比,使用阻塞队列更简单,更便于理解。下面是一个简单的例子:

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class ProducerConsumerPattern {
    
        public static void main(String args[]){
    
         //Creating shared object
         BlockingQueue sharedQueue = new LinkedBlockingQueue();
    
         //Creating Producer and Consumer Thread
         Thread prodThread = new Thread(new Producer(sharedQueue));
         Thread consThread = new Thread(new Consumer(sharedQueue));
    
         //Starting producer and Consumer thread
         prodThread.start();
         consThread.start();
        }
    
    }
    
    //Producer Class in java
    class Producer implements Runnable {
    
        private final BlockingQueue sharedQueue;
    
        public Producer(BlockingQueue sharedQueue) {
            this.sharedQueue = sharedQueue;
        }
    
        @Override
        public void run() {
            for(int i=0; i<10; i++){
                try {
                    System.out.println("Produced: " + i);
                    sharedQueue.put(i);
                } catch (InterruptedException ex) {
                    Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    
    }
    
    //Consumer Class in Java
    class Consumer implements Runnable{
    
        private final BlockingQueue sharedQueue;
    
        public Consumer (BlockingQueue sharedQueue) {
            this.sharedQueue = sharedQueue;
        }
    
        @Override
        public void run() {
            while(true){
                try {
                    System.out.println("Consumed: "+ sharedQueue.take());
                } catch (InterruptedException ex) {
                    Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    
    
    }
    
    Output:
    Produced: 0
    Produced: 1
    Consumed: 0
    Produced: 2
    Consumed: 1
    Produced: 3
    Consumed: 2
    Produced: 4
    Consumed: 3
    Produced: 5
    Consumed: 4
    Produced: 6
    Consumed: 5
    Produced: 7
    Consumed: 6
    Produced: 8
    Consumed: 7
    Produced: 9
    Consumed: 8
    Consumed: 9
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    在后面的一篇博客中,给本文提供了补充的例子。用lock、synchronized、阻塞队列三种方法实现生产者消费者模式,实现的内容是生产者产生随机数(为了方便阅读结果,我把随机数限定在10以内的整数),消费者读取并打印。

  • 相关阅读:
    STL逆序迭代器(reverse_iterator)
    STL容器之vector容器API(二)
    STL容器之vector容器巧用swap收缩空间
    STL容器之vector容器API(一)
    STL容器vector概念和注意事项(每次扩充都会重新开辟空间,释放原空间,即首元素地址会变一次)
    STL容器之string内存重定义
    STL容器之string与c_style类型转换
    STL容器之string插入和删除
    STL容器之string字串
    STL容器之string比较
  • 原文地址:https://www.cnblogs.com/du-0210/p/8384464.html
Copyright © 2011-2022 走看看