zoukankan      html  css  js  c++  java
  • 生产者消费者模式

      学习Java的多线程,生产者消费者模式是避免不了的。下面将以wait/notify,await/singal,blockingquene几种方式来实现生产者消费者模式。

      下面的仅仅是例子,实际应用中应该使用Executor来完成,并且保证线程及线程池可以正常关闭。实际使用中需要考虑的情况是生产者生产的较慢,消费者较快,消费者线程也不能一直等待,这就需要正确的标识出生产者线线程池什么时候结束。需要使用volatile boolean和AtomaticInteger来设置生产者线线程池的状态,消费者线程才可以正常结束而不是一直等待。

      使用wait/notify先了解以下知识:

      每一个同步锁lock下面都挂了几个线程队列,包括就绪(Ready)队列,等待(Waiting)队列等。当线程A因为得不到同步锁lock,从而进入的是lock.ReadyQueue(就绪队列),一旦同步锁不被占用,JVM将自动运行就绪队列中的线程而不需要任何notify()的操作。
      但是当线程A被wait()了,那么将进入lock.WaitingQuene(等待队列),同时如果占据的同步锁也会放弃。而此时如果同步锁不唤醒等待队列中的进程(lock.notify()),这些进程将永远不会得到运行的机会。
      wait不是改变锁的状态,是把当前线程放到锁的等待队列里面,notify就是从锁的等待队列里面选择第一个等待的线程进行调度。每个对象都有一个唯一的锁。


      wait, notify操作首先保证当前线程持有锁,否则会抛出异常。

      也就是一般需要如下方式来使用wait/notify。

    Object o;
    
    //A thread
    synchronized(o){
        ...
        o.wait();
        ...
    }
    
    //B thread
    synchronized(o){
        ...
        o.notify();
        ...
    }

      下面是用wait, notify来完成生产者消费者的例子 

    package multiThread;
    
    import java.util.LinkedList;
    class Storage
    {
        // 仓库最大存储量
        private final int MAX_SIZE = 100;
    
        // 仓库存储的载体
        private LinkedList<Object> list = new LinkedList<Object>();
    
        // 生产num个产品
        public void produce(int num)
        {
            // 同步代码段
            synchronized (list)
            {
                // 如果仓库剩余容量不足
                // 这里用的是while,不能是if
                while (list.size() + num > MAX_SIZE)
                {
                    System.out.println("【要生产的产品数量】:" + num + "	【库存量】:"
                            + list.size() + "	暂时不能执行生产任务!");
                    try
                    {
                        // 由于条件不满足,生产阻塞
                        list.wait();
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
    
                // 生产条件满足情况下,生产num个产品
                for (int i = 1; i <= num; ++i)
                {
                    list.add(new Object());
                }
    
                System.out.println("【已经生产产品数】:" + num + "	【现仓储量为】:" + list.size());
    
                list.notifyAll();
            }
        }
    
        // 消费num个产品
        public void consume(int num)
        {
            // 同步代码段
            synchronized (list)
            {
                // 如果仓库存储量不足
                while (list.size() < num)
                {
                    System.out.println("【要消费的产品数量】:" + num + "	【库存量】:"
                            + list.size() + "	暂时不能执行消费任务!");
                    try
                    {
                        // 由于条件不满足,消费阻塞
                        list.wait();
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
    
                // 消费条件满足情况下,消费num个产品
                for (int i = 1; i <= num; ++i)
                {
                    list.remove();
                }
    
                System.out.println("【已经消费产品数】:" + num + "	【现仓储量为】:" + list.size());
    
                list.notifyAll();
            }
        }
    
        // get/set方法
        public LinkedList<Object> getList()
        {
            return list;
        }
    
        public void setList(LinkedList<Object> list)
        {
            this.list = list;
        }
    
        public int getMAX_SIZE()
        {
            return MAX_SIZE;
        }
    }
    
    class producer extends Thread
    {
        // 每次生产的产品数量
        private int num;
    
        // 所在放置的仓库
        private Storage storage;
    
        // 构造函数,设置仓库
        public producer(Storage storage)
        {
            this.storage = storage;
        }
    
        // 线程run函数
        public void run()
        {
            produce(num);
        }
    
        // 调用仓库Storage的生产函数
        public void produce(int num)
        {
            storage.produce(num);
        }
    
        // get/set方法
        public int getNum()
        {
            return num;
        }
    
        public void setNum(int num)
        {
            this.num = num;
        }
    
        public Storage getStorage()
        {
            return storage;
        }
    
        public void setStorage(Storage storage)
        {
            this.storage = storage;
        }
    }
    
    class consumer extends Thread
    {
        // 每次消费的产品数量
        private int num;
    
        // 所在放置的仓库
        private Storage storage;
    
        // 构造函数,设置仓库
        public consumer(Storage storage)
        {
            this.storage = storage;
        }
    
        // 线程run函数
        public void run()
        {
            consume(num);
        }
    
        // 调用仓库Storage的生产函数
        public void consume(int num)
        {
            storage.consume(num);
        }
    
        // get/set方法
        public int getNum()
        {
            return num;
        }
    
        public void setNum(int num)
        {
            this.num = num;
        }
    
        public Storage getStorage()
        {
            return storage;
        }
    
        public void setStorage(Storage storage)
        {
            this.storage = storage;
        }
    }
    
    public class WaitNotifyTest
    {
        public static void main(String[] args)
        {
            // 仓库对象
            Storage storage = new Storage();
    
            // 生产者对象
            producer p1 = new producer(storage);
            producer p2 = new producer(storage);
            producer p3 = new producer(storage);
            producer p4 = new producer(storage);
            producer p5 = new producer(storage);
            producer p6 = new producer(storage);
            producer p7 = new producer(storage);
    
            // 消费者对象
            consumer c1 = new consumer(storage);
            consumer c2 = new consumer(storage);
            consumer c3 = new consumer(storage);
    
            // 设置生产者产品生产数量
            p1.setNum(10);
            p2.setNum(10);
            p3.setNum(10);
            p4.setNum(10);
            p5.setNum(10);
            p6.setNum(10);
            p7.setNum(80);
    
            // 设置消费者产品消费数量
            c1.setNum(50);
            c2.setNum(20);
            c3.setNum(30);
    
            // 线程开始执行
            c1.start();
            c2.start();
            c3.start();
            p1.start();
            p2.start();
            p3.start();
            p4.start();
            p5.start();
            p6.start();
            p7.start();
        }
    }

      await/singal是Lock所对应的操作,上面的一个Objec对应的wait队列只能有一个,但是一个Lock对应的await却可以有多个,下面将使用getLock和putLock 2个队列来完成生产者消费者模式。使用await/singal也需要当前线程持有Lock,和上面的是一样的。ArrayBlockingQueue的底层就是用到了同一个Lock的2个队列。

    package multiThread;
    
    import java.util.LinkedList;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    class Storage2 extends Storage
    {
        // 仓库最大存储量
        private final int MAX_SIZE = 100;
    
        // 仓库存储的载体
        private LinkedList<Object> list = new LinkedList<Object>();
        
        private Lock lock = new ReentrantLock();
        private Condition getLock = lock.newCondition();
        private Condition putLock = lock.newCondition();
    
        // 生产num个产品
        public void produce(int num)
        {
                lock.lock();
                try {                
                    // 如果仓库剩余容量不足
                    // 这里用的是while,不能是if
                    while (list.size() + num > MAX_SIZE) {
                        System.out.println("【要生产的产品数量】:" + num + "	【库存量】:" + list.size() + "	暂时不能执行生产任务!");
                        try {
                            // 由于条件不满足,生产阻塞
                            putLock.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 生产条件满足情况下,生产num个产品
                    for (int i = 1; i <= num; ++i) {
                        list.add(new Object());
                    }
                    System.out.println("【已经生产产品数】:" + num + "	【现仓储量为】:" + list.size());
                    getLock.signalAll();
                } finally{
                    lock.unlock();
                }
        }
    
        // 消费num个产品
        public void consume(int num)
        {
                lock.lock();
                try {
                    // 如果仓库存储量不足
                    while (list.size() < num) {
                        System.out.println("【要消费的产品数量】:" + num + "	【库存量】:" + list.size() + "	暂时不能执行消费任务!");
                        try {
                            // 由于条件不满足,消费阻塞
                            getLock.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 消费条件满足情况下,消费num个产品
                    for (int i = 1; i <= num; ++i) {
                        list.remove();
                    }
                    System.out.println("【已经消费产品数】:" + num + "	【现仓储量为】:" + list.size());
                    putLock.signalAll();
                } finally{
                    lock.unlock();
                }
        }
    
        // get/set方法
        public LinkedList<Object> getList()
        {
            return list;
        }
    
        public void setList(LinkedList<Object> list)
        {
            this.list = list;
        }
    
        public int getMAX_SIZE()
        {
            return MAX_SIZE;
        }
    }
    
    
    public class AwaitSingalTest
    {
        public static void main(String[] args)
        {
            // 仓库对象
            Storage2 storage = new Storage2();
    
            // 生产者对象
            producer p1 = new producer(storage);
            producer p2 = new producer(storage);
            producer p3 = new producer(storage);
            producer p4 = new producer(storage);
            producer p5 = new producer(storage);
            producer p6 = new producer(storage);
            producer p7 = new producer(storage);
    
            // 消费者对象
            consumer c1 = new consumer(storage);
            consumer c2 = new consumer(storage);
            consumer c3 = new consumer(storage);
    
            // 设置生产者产品生产数量
            p1.setNum(10);
            p2.setNum(10);
            p3.setNum(10);
            p4.setNum(10);
            p5.setNum(10);
            p6.setNum(10);
            p7.setNum(80);
    
            // 设置消费者产品消费数量
            c1.setNum(50);
            c2.setNum(20);
            c3.setNum(30);
    
            // 线程开始执行
            c1.start();
            c2.start();
            c3.start();
            p1.start();
            p2.start();
            p3.start();
            p4.start();
            p5.start();
            p6.start();
            p7.start();
                    
        }
    }

      用blockingquene来实现生产者消费者模式和上面2中的实现略有不同。上面2中由于都基于了锁,所以增加10个对象时,会一次将这个10个对象增加到仓库中。

      blockingquene的底层也是基于await/singal的,保证了多个线程中对blockingquene的插入、读取操作只能有一个线程进行。blockingquene中对自己进行了加锁,所以生产者和消费者就不能再次对blockingquene加锁。

      但是多次运行该程序,仓库中最后的结果肯定是正确的,这个是由blockingquene来保证的。

    package multiThread;
    
    import java.util.concurrent.LinkedBlockingQueue;
    
    class Storage3 extends Storage {
        // 仓库最大存储量
        private final int MAX_SIZE = 100;
    
        // 仓库存储的载体,使用LinkedBlockingQueue的put和take方法
        // 使用LinkedBlockingQueue时就不要在显示的使用lock或者synchronized
        public static LinkedBlockingQueue<Object> lbq = new LinkedBlockingQueue<Object>(100);
    
        // 生产num个产品
        public void produce(int num) {
    
                // 如果仓库剩余容量不足
                if (lbq.size() + num > MAX_SIZE) {
                    System.out.println("【要生产的产品数量】:" + num + "	【库存量】:" + lbq.size() + "	暂时不能执行生产任务!");
                }
                // 生产条件满足情况下,生产num个产品
                for (int i = 1; i <= num; ++i) {
                    try {
                        if (!Thread.currentThread().isInterrupted()){
                            lbq.put(new Object());
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("【已经生产产品数】:" + num + "	【现仓储量为】:" + lbq.size());
    
        }
    
        // 消费num个产品
        public void consume(int num) {
                // 如果仓库存储量不足
                if (lbq.size() < num) {
                    System.out.println("【要消费的产品数量】:" + num + "	【库存量】:" + lbq.size() + "	暂时不能执行消费任务!");
                }
                // 消费条件满足情况下,消费num个产品
                for (int i = 1; i <= num; ++i) {
                    try {
                        if (!Thread.currentThread().isInterrupted()){
                            lbq.take();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("【已经消费产品数】:" + num + "	【现仓储量为】:" + lbq.size());
        }
    
        public void setList(LinkedBlockingQueue<Object> lbq) {
            this.lbq = lbq;
        }
    
        public int getMAX_SIZE() {
            return MAX_SIZE;
        }
    }
    
    public class BlockingQueneTest {
        public static void main(String[] args) {
            // 仓库对象
            Storage3 storage = new Storage3();
    
            // 生产者对象
            producer p1 = new producer(storage);
            producer p2 = new producer(storage);
            producer p3 = new producer(storage);
            producer p4 = new producer(storage);
            producer p5 = new producer(storage);
            producer p6 = new producer(storage);
            producer p7 = new producer(storage);
    
            // 消费者对象
            consumer c1 = new consumer(storage);
            consumer c2 = new consumer(storage);
            consumer c3 = new consumer(storage);
    
            // 设置生产者产品生产数量
            p1.setNum(10);
            p2.setNum(10);
            p3.setNum(10);
            p4.setNum(10);
            p5.setNum(10);
            p6.setNum(10);
            p7.setNum(80);
    
            // 设置消费者产品消费数量
            c1.setNum(50);
            c2.setNum(20);
            c3.setNum(30);
    
            // 线程开始执行
            c1.start();
            c2.start();
            c3.start();
            p1.start();
            p2.start();
            p3.start();
            p4.start();
            p5.start();
            p6.start();
            p7.start();
            
            try {
                c1.join();
                c2.join();
                c3.join();
                p1.join();
                p2.join();
                p3.join();
                p4.join();
                p5.join();
                p6.join();
                p7.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            System.out.println("最后的库存为:" + Storage3.lbq.size());
        }
    }
  • 相关阅读:
    golang 带参数 发送、上传本地文件到其他机器、服务器
    【比赛游记】北大集训2020垫底记
    【比赛游记】NOIP2020又当工具人记
    AtCoder Regular Contest 107
    AtCoder Regular Contest 108
    【比赛游记】CSP2020游记
    注意事项
    2020北大集训摸鱼记
    NOIP2020游记
    ARC109F
  • 原文地址:https://www.cnblogs.com/lnlvinso/p/4657287.html
Copyright © 2011-2022 走看看