zoukankan      html  css  js  c++  java
  • Java并发编程(4)--生产者与消费者模式介绍

    一、前言

      这种模式在生活是最常见的,那么它的场景是什么样的呢? 下面是我假象的,假设有一个仓库,仓库有一个生产者和一个消费者,消费者过来消费的时候会检测仓库中是否有库存,如果没有了则等待生产,如果有就先消费直至消费完成;而生产者每天的工作就是先检测仓库是否有库存,如果没有就开始生产,满仓了就停止生产等待消费,直至工作结束。下图是根据假象画的流程图:

      那么在程序中怎么才能达到这样的效果呢?下面介绍三种方式实现。

    二、使用notify() 和 wait()实现

      相信大家这两个方法都不陌生,它是Object类中的两个方法,具体请看源码中的解释。提醒一点就是使用notify()和wait()方法时必须拥有对象锁

      根据上面假象我这定义一下明确场景:仓库库存有个最大值,如果仓库库存已经达到最大值那么就停止生产,小于就需要生产; 如果库存等于0则需要等待生产停止消费。另外生产者有个生产目标,当它生产了目标数后就结束生产;消费者也是,当消费一定的数据后就结束消费,否则等待消费。

      见下面代码:

    package com.yuanfy.jmm.threads;
    
    import com.yuanfy.util.SleepUtils;
    
    import java.util.concurrent.TimeUnit;
    
    public class Factory {
        // 当前库存大小
        private int size;
        // 库存容量(最大库存值)
        private int capacity;
    
        public Factory(int capacity) {
            this.capacity = capacity;
        }
    
        public synchronized void produce(int num) {
            try {
                System.out.println("+++++生产者【" + Thread.currentThread().getName()
                        + "】, 他的任务是生产" + num + "件产品.");
                // 当生产完成就停止
                while (num > 0) {
                    // 如果当前库存大小大于或等于库存容量值了,则停止生产等待消费。
                    if (size >= capacity) {
                        System.out.println("+++++" + Thread.currentThread().getName() +
                                "检测库存已满,停止生产等待消费...");
                        // 等待消费
                        wait();
                        System.out.println("+++++" + Thread.currentThread().getName() + "开始生产...");
                    }
                    // 否则继续生产
                    int inc = (num + size) > capacity ? (capacity - size) : num;
                    num -= inc;
                    size += inc;
                    SleepUtils.second(1);
                    System.out.println("+++++" + Thread.currentThread().getName() + " 生产了" + inc + "件,当前库存有" + size + "件.");
                    // 生产后唤醒消费者
                    notify();
                }
                System.out.println("+++++生产者【" + Thread.currentThread().getName()
                        + "】 生产结束.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public synchronized void consume(int num) {
            try {
                System.out.println("-----消费者【" + Thread.currentThread().getName()
                        + "】, 他需要消费" + num + "件产品.");
                // 当消费完成则停止
                while (num > 0) {
                    // 如果当前库存大小小于等于0,则停止消费等待生产。
                    if (size <= 0) {
                        System.out.println("-----" + Thread.currentThread().getName() + " 检测库存已空,停止消费等待生产...");
                        // 等待生产
                        wait();
                        System.out.println("-----" + Thread.currentThread().getName() + " 开始消费...");
                    }
                    // 否则继续消费
                    int dec = (size - num) > 0 ? num : size;
                    num -= dec;
                    size -= dec;
                    SleepUtils.second(1);
                    System.out.println("-----" + Thread.currentThread().getName() + " 消费了" + dec + "件,当前有" + size + "件.");
                    // 消费后唤醒生产者继续生产
                    notify();
                }
                System.out.println("-----消费者【" + Thread.currentThread().getName()
                        + "】 消费结束.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

      上面是工厂(仓库)类,主要包含两个任务一个是生产一个是消费,接下来创建两个线程去调用它,如下:

    package com.yuanfy.jmm.threads;
    
    /**
     * 生产线程
     */
    class Produce {
        private Factory factory;
    
        public Produce(Factory factory) {
            this.factory = factory;
        }
    
        public void produce(String name, final int num) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    factory.produce(num);
                }
            }, name).start();
        }
    }
    /**
     * 消费线程
     */
    class Consume {
        private Factory factory;
    
        public Consume(Factory factory) {
            this.factory = factory;
        }
    
        public void consume(String name, final int num) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    factory.consume(num);
                }
            }, name).start();
        }
    }
    
    public class ProduceConsumeDemo {
    
        public static void main(String[] args) {
            Factory f = new Factory(500);
    
            Consume consume = new Consume(f);
            consume.consume("消费线程",600);
    
            Produce produce = new Produce(f);
            produce.produce("生产线程",800);
        }
    }

      注意上方,消费线程和生产线程都是拥有同一个工厂对象,然后进行生产和消费模式。那么我们直接运行,结果如下:

      

     三、使用锁中的Condition对象进行控制

      这种方式估计用的比较少,因为使用Condition必须先使用锁Lock。这里我只介绍怎么用Condition对象进行控制实现生产者与消费者模式的实现。

      其实它跟上面那种方法有点类似,Condition对象中await()方法表示等待,signal()方法表示唤醒(看了AQS源码的应该都知道有这个对象且了解过这两个方法)。下面看下具体怎么实现:

    public class Factory {
        // 当前大小
        private int size;
    
        // 总容量
        private int capacity;
    
        private Lock lock;
    
        // 已满的条件
        private Condition fullCondition;
    
        // 已空的条件
        private Condition emptyCondition;
    
        public Factory(int capacity) {
            this.capacity = capacity;
            lock = new ReentrantLock();
            fullCondition = lock.newCondition();
            emptyCondition = lock.newCondition();
        }
    
        public void produce(int no) {
            lock.lock();
            try {
                while (no > 0) {
                    while (size >= capacity) {
                        System.out.println(Thread.currentThread().getName() + " 报告仓库已满,等待快递员取件...");
                        fullCondition.await();
                        System.out.println(Thread.currentThread().getName() + " 报告开始进货...");
                    }
                    int inc = (no + size) > capacity ? (capacity - size) : no;
                    no -= inc;
                    size += inc;
                    System.out.println(Thread.currentThread().getName() +
                            " 报告进货了: " + inc + "件, 当前库存数: " + size);
                    emptyCondition.signal();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void consume(int no) {
            lock.lock();
            try {
                while (no > 0) {
                    while (size <= 0) {
                        System.out.println(Thread.currentThread().getName() + " 报告仓库已空,等待仓库管理员进货");
                        emptyCondition.await();
                        System.out.println(Thread.currentThread().getName() + " 报告开始取件...");
                    }
                    int dec = (size - no) > 0 ? no : size;
                    no -= dec;
                    size -= dec;
                    System.out.println(Thread.currentThread().getName() +
                            " 报告取件: " + dec + ", 当前库存数: " + size);
                    fullCondition.signal();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

      看了上面工厂类的代码后是不是跟使用Object中wait()和notify()方法类似呢。 主要区别就是拥有对象的方式不一样,这里使用的lock进行且需要手动释放,而第一种是需要Synchronized进行控制。

    四、使用阻塞队列进行实现

      这个就很简单了,它已经封装好等待和唤醒的操作,所以不进行案例分享了。其中涉及到两个重要方法put() 和 take

      

  • 相关阅读:
    操作系统 chapter3 进程线程模型
    操作系统 chapter1 操作系统概述
    操作系统 chapter2 操作系统运行环境
    计算机网络 chapter 9 无线网络
    计算机网络 chapter 10 下一代因特网
    计算机网络 chapter 8 因特网上的音频/视频服务
    汇总常用的jQuery操作Table tr td方法
    jquery判断checkbox是否选中及改变checkbox状态
    $.ajax()方法详解
    wamp设置mysql默认编码
  • 原文地址:https://www.cnblogs.com/yuanfy008/p/9574509.html
Copyright © 2011-2022 走看看