zoukankan      html  css  js  c++  java
  • 多线程生产者消费者问题处理

    一、比较低级的办法是用wait和notify来解决这个问题。

    消费者生产者问题:

    这个问题是一个多线程同步问题的经典案例,生产者负责生产对象,消费者负责将生成者产生的对象取出,两者不断重复此过程。这过程需要注意几个问题:

    不论生产者和消费者有几个,必须保证:

    1.生产者每次产出的对象必须不一样,产生的对象有且仅有出现一次;

    2.消费者每次取出的对象必须不一样,取出的对象有且仅有出现一次;

    3.一定是先产生该对象,然后对象才能被取出,顺序不能乱;

    第一种情况:
    多个生产者轮流负责生产,多个消费者负责取出。一旦生产者产生一个对象,其他生产者不能生产,只能由消费者执行取出操作;

    需要的对象有商品类、消费者、生产者;

    //测试类
    public class ProducerConsumer {
     
        public static void main(String[] args) {
            // 定义资源对象
            Resource r = new Resource();
     
            //定义一个生产者和一个消费者
            Producer p = new Producer(r);
            Consumer c = new Consumer(r);
     
            //启动四个线程,2个负责生产者,两个消费者
            Thread t1 = new Thread(p);
            Thread t2 = new Thread(p);
            Thread t3 = new Thread(c);
            Thread t4 = new Thread(c);
     
            t1.start();
            t2.start();
            t3.start();
            t4.start();
        }
     
    }
     
    //商品类
    class Resource{
        private String name;
        private int count = 1;
        private  boolean flag = false;
     
        //产生商品
        public synchronized void set(String name) {
            while (flag) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            this.name = name + "---" + count++;
            System.out.println(Thread.currentThread().getName() + " 生产者" + this.name);
            flag = true;
            //唤醒所有线程
            this.notifyAll();
     
        }
        //取出商品
        public synchronized void out() {
                while (!flag) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + " 消费者________" + this.name);
                flag = false;
                this.notifyAll();
        }
    }
     
    //定义生产者
    class Producer implements Runnable{
     
        private Resource res;
     
        public Producer(Resource res) {
            this.res = res;
        }
     
        @Override
        public void run() {
            while (true) {
                res.set("+商品+");
            }
        }
    }
     
    //定义消费者
    class Consumer implements Runnable{
     
        private Resource res;
     
        Consumer(Resource res) {
            this.res = res;
        }
     
        @Override
        public void run() {
            while (true) {
                res.out();
            }
        }
    }
    

      


    运行结果是产生一个,随即取出一个,循环往复,其运行结果的部分如下:

    Thread-2 消费者________+商品+---67821
    Thread-1 生产者+商品+---67822
    Thread-3 消费者________+商品+---67822
    Thread-0 生产者+商品+---67823
    Thread-2 消费者________+商品+---67823
    Thread-1 生产者+商品+---67824
    Thread-3 消费者________+商品+---67824
    Thread-0 生产者+商品+---67825
    Thread-2 消费者________+商品+---67825
    Thread-1 生产者+商品+---67826
    Thread-3 消费者________+商品+---67826
    Thread-0 生产者+商品+---67827
    Thread-2 消费者________+商品+---67827
    Thread-1 生产者+商品+---67828
    Thread-3 消费者________+商品+---67828
    Thread-0 生产者+商品+---67829
    Thread-2 消费者________+商品+---67829
    Thread-1 生产者+商品+---67830
    Thread-3 消费者________+商品+---67830
    Thread-0 生产者+商品+---67831
    Thread-2 消费者________+商品+---67831
    Thread-1 生产者+商品+---67832
    

      


    第二种情况:
    目标:生产者与消费者轮换着抢夺执行权,但是生产者最多可以库存5个,消费者最多可以连续取出5个

    此时需要定义一种中间对象:仓库类。该类是生产者和消费者共享的一块区域,里面数据类型选择链表结果存放产生的对象。仓库是有容量上限的,当数量达到上限后,生产者不允许继续生产产品.当前线程进入等待状态,等待其他线程唤醒。当仓库没有产品时,消费者不允许继续消费,当前线程进入等待状态,等待其他线程唤醒。

    第一种解决方式,采用同步代码块(synchronized),结合着 wait() 和 notifyAll() 的方法,具体代码如下:

    package Thread;
    /**
     * 2个消费者,3个生产者
     */
     
    import java.util.LinkedList;
     
    public class ProConThreadDemo {
        public static void main(String[] args) {
            Respository res = new Respository();
     
            //定义2个消费者,3个生产者
            Worker p1 = new Worker(res,"手机");
            Worker p2 = new Worker(res,"电脑");
            Worker p3 = new Worker(res,"鼠标");
            Constomer c1 = new Constomer(res);
            Constomer c2 = new Constomer(res);
     
            Thread t1 = new Thread(p1,"甲");
            Thread t2 = new Thread(p2,"乙");
            Thread t3 = new Thread(p3,"丙");
            Thread t4 = new Thread(c1,"aaa");
            Thread t5 = new Thread(c2,"bbb");
     
            t1.start();
            t2.start();
            t3.start();
            t4.start();
            t5.start();
     
        }
     
     
     
    }
    //仓库类
    class Respository{
     
        private LinkedList<Product> store = new LinkedList<Product>();
     
        //生产者的方法,用于向仓库存货
        //最多只能有一个线程同时访问该方法.
        public synchronized void push(Product p,String ThreadName){
            //设置仓库库存最多能存5个商品
            /* 仓库容量最大值为5,当容量等于5的时候进入等待状态.等待其他线程唤醒
             * 唤醒后继续循环,等到仓库的存量小于5时,跳出循环继续向下执行准备生产产品.
             */
            while (store.size()==5){
                try {
                    System.out.println(ThreadName+" 发现:仓库已满,赶紧叫人运走");
                    //因为仓库容量已满,无法继续生产,进入等待状态,等待其他线程唤醒.
                    this.wait();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            this.notifyAll();
            store.addLast(p);
            System.out.println(ThreadName+" 给仓库添加 "+p.Name+p.Id+"号名称为 "+" 当前库存量为:"+store.size());
            //为了方便观察运行结果,每次生产完后等待0.1秒
            try {
                Thread.sleep(100);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
     
        }
     
     
        //消费者的方法,用于仓库出货
        //最多只能有一个线程同时访问该方法.
        public synchronized void pop(String ThreadName){
            /* 当仓库没有存货时,消费者需要进行等待.等待其他线程来唤醒
             * 唤醒后继续循环,等到仓库的存量大于0时,跳出循环继续向下执行准备消费产品.
             */
            while (store.size()==0){
                try {
                    System.out.println(ThreadName+" 发现:仓库空了,赶紧安排生产");
                    //因为仓库容量已空,无法继续消费,进入等待状态,等待其他线程唤醒.
                    this.wait();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            this.notifyAll();
            //定义对象。存放pollFirst()方法删除的对象,
            Product p = store.pollFirst();
            System.out.println(ThreadName+"买走 "+p.Name+p.Id+" 当前库存量为:"+store.size());
            //为了方便观察运行结果,每次取出后等待0.1秒
            try {
                Thread.sleep(100);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
     
        }
     
     
    }
     
     
    //产品类
    class Product{
        //产品的唯一标识Id
        public int Id;
        //产品的名称
        public String Name;
     
        public Product(String name, int id) {
            Name = name;
            Id = id;
        }
     
    }
     
    //生产者
    class Worker implements Runnable{
        //关键字volatile 是为了保持 Id 的可见性,一旦Id被修改,其他任何线程用到Id的地方,都会相应修改
        //否则下方run方法容易出问题,生产商品的Id和名称 与到时候消费者取出商品的Id和名称不一致
        public volatile Integer Id = 0;
     
        public volatile String name;
     
        //引用一个产品
        private Product p;
        //引用一个仓库
        Respository res;
     
        boolean flag = true;
     
        public Worker(Respository res,String name) {
            this.res = res;
            this.name = name;
        }
     
        @Override
        public void run() {
            while (flag){
                p  = new Product(name,Id);
                res.push(new Product(this.p.Name,Id++),Thread.currentThread().getName());
            }
        }
    }
     
    class Constomer implements Runnable{
        boolean flag = true;
     
        //引用一个仓库
        Respository res;
     
        public Constomer(Respository res) {
            this.res = res;
        }
     
        @Override
        public void run() {
            while (flag) {
     
                res.pop(Thread.currentThread().getName());
     
     
            }
     
        }
    }
    

      


    运行结果如下,可见仓库最多库存为5个,接近于实际生产

    aaa 发现:仓库空了,赶紧安排生产
    乙 给仓库添加 电脑0号名称为  当前库存量为:1
    丙 给仓库添加 鼠标0号名称为  当前库存量为:2
    甲 给仓库添加 手机0号名称为  当前库存量为:3
    bbb买走 电脑0 当前库存量为:2
    bbb买走 鼠标0 当前库存量为:1
    甲 给仓库添加 手机1号名称为  当前库存量为:2
    丙 给仓库添加 鼠标1号名称为  当前库存量为:3
    乙 给仓库添加 电脑1号名称为  当前库存量为:4
    aaa买走 手机0 当前库存量为:3
    aaa买走 手机1 当前库存量为:2
    aaa买走 鼠标1 当前库存量为:1
    aaa买走 电脑1 当前库存量为:0
    aaa 发现:仓库空了,赶紧安排生产
    乙 给仓库添加 电脑2号名称为  当前库存量为:1
    丙 给仓库添加 鼠标2号名称为  当前库存量为:2
    甲 给仓库添加 手机2号名称为  当前库存量为:3
    bbb买走 电脑2 当前库存量为:2
    bbb买走 鼠标2 当前库存量为:1
    甲 给仓库添加 手机3号名称为  当前库存量为:2
    丙 给仓库添加 鼠标3号名称为  当前库存量为:3
    乙 给仓库添加 电脑3号名称为  当前库存量为:4
    aaa买走 手机2 当前库存量为:3
    乙 给仓库添加 电脑4号名称为  当前库存量为:4
    乙 给仓库添加 电脑5号名称为  当前库存量为:5
    

      


    第二种方法,利用 lock类 替代 synchronized的使用,这样可以优化代码,主要是在唤醒的时候可以根据条件去唤醒指定的某些线程。例如:当库存为空的时候,第一种方法是唤醒所有等待的线程,也包括取出的线程;而此时lock类 可以设置在库存为空的时候,只唤醒生产线程,取出的线程依旧处于等待状态,具体代码如下:

    package Thread;
     
    import java.util.LinkedList;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
     
    public class ProConThreadPool {
     
        public static void main(String[] args) {
     
           Respository res = new Respository();
     
           Worker p1 = new Worker(res,"手机");
           Worker p2 = new Worker(res,"电脑");
           Worker p3 = new Worker(res,"鼠标");
     
           Constomer c1 = new Constomer(res);
           Constomer c2 = new Constomer(res);
     
           Thread t1 = new Thread(p1,"甲");
           Thread t2 = new Thread(p2,"乙");
           Thread t3 = new Thread(p3,"丙");
           Thread t4 = new Thread(c1,"aaa");
           Thread t5 = new Thread(c2,"bbb");
     
           t1.start();
           t2.start();
           t3.start();
           t4.start();
           t5.start();
     
        }
     
     
     
    }
    //仓库类
    class Respository{
     
        private Lock lock = new ReentrantLock();
     
        private LinkedList<Product> store = new LinkedList<Product>();
     
        private Condition condition_pro = lock.newCondition();
        private Condition condition_con = lock.newCondition();
     
        public LinkedList<Product> getStore() {
            return store;
        }
     
        public void setStore(LinkedList<Product> store) {
            this.store = store;
        }
        //向仓库存货
        public  void push(Product p,String ThreadName) throws InterruptedException{
            lock.lock();
            try {
                //设置仓库库存最多能存5个商品
                while (store.size()==5){
                        System.out.println(ThreadName+" 发现:仓库已满,赶紧叫人运走");
                        condition_pro.await();
                }
                condition_con.signalAll();
                store.addLast(p);
                System.out.println(ThreadName+" 给仓库添加 "+p.Name+p.Id+"号名称为 "+" 当前库存量为:"+store.size());
     
            }finally {
                lock.unlock();
            }
            try {
                Thread.sleep(100);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
     
        }
     
     
        //仓库出货
        public void pop(String ThreadName) throws InterruptedException
        {
            lock.lock();
            try{
                while (store.size()==0){
     
                        System.out.println(ThreadName+" 发现:仓库空了,赶紧安排生产");
                        condition_con.await();
                }
                condition_pro.signalAll();
                Product p = store.pollFirst();
                System.out.println(ThreadName+"买走 "+p.Name+p.Id+" 当前库存量为:"+store.size());
     
            }
            finally {
                lock.unlock();
            }
            try {
                Thread.sleep(100);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
     
        }
     
     
    }
     
     
     
    class Product{
        public int Id;
     
        public String Name;
     
        public Product(String name, int id) {
            Name = name;
            Id = id;
        }
     
    }
     
    class Worker implements Runnable{
     
        public volatile Integer Id = 0;
     
        public volatile String name;
     
        //引用一个产品
        private Product p;
        //引用一个仓库
        Respository res;
     
        boolean flag = true;
     
        public Worker(Respository res,String name) {
            this.res = res;
            this.name = name;
        }
     
        @Override
        public void run(){
            while (flag){
                    p  = new Product(name,Id);
                    try {
                        res.push(new Product(this.p.Name,Id++),Thread.currentThread().getName());
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
     
                }
        }
    }
     
    class Constomer implements Runnable{
        boolean flag = true;
     
        //引用一个仓库
        Respository res;
     
        public Constomer(Respository res) {
            this.res = res;
        }
     
        @Override
        public void run() {
            while (flag) {
                try {
                    res.pop(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
     
        }
    }
    

      


    运行结果与上面类似:

    aaa 发现:仓库空了,赶紧安排生产
    bbb 发现:仓库空了,赶紧安排生产
    丙 给仓库添加 鼠标0号名称为  当前库存量为:1
    乙 给仓库添加 电脑0号名称为  当前库存量为:2
    甲 给仓库添加 手机0号名称为  当前库存量为:3
    aaa买走 鼠标0 当前库存量为:2
    bbb买走 电脑0 当前库存量为:1
    bbb买走 手机0 当前库存量为:0
    乙 给仓库添加 电脑1号名称为  当前库存量为:1
    甲 给仓库添加 手机1号名称为  当前库存量为:2
    aaa买走 电脑1 当前库存量为:1
    丙 给仓库添加 鼠标1号名称为  当前库存量为:2
    aaa买走 手机1 当前库存量为:1
    甲 给仓库添加 手机2号名称为  当前库存量为:2
    乙 给仓库添加 电脑2号名称为  当前库存量为:3
    bbb买走 鼠标1 当前库存量为:2
    丙 给仓库添加 鼠标2号名称为  当前库存量为:3
    aaa买走 手机2 当前库存量为:2
    甲 给仓库添加 手机3号名称为  当前库存量为:3
    bbb买走 电脑2 当前库存量为:2
    乙 给仓库添加 电脑3号名称为  当前库存量为:3
    丙 给仓库添加 鼠标3号名称为  当前库存量为:4

    二、比较赞的办法是用Semaphore 或者 BlockingQueue来实现生产者消费者模型。

     BlockingQueue 是线程安全的,并且在调用 put,take 方法时会阻塞线程。

    基于以上特性,可以不加任何锁解决生产者消费者问题。

    直接上代码:

    public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> bq = new LinkedBlockingQueue<>(2);
     
            CountDownLatch cdl = new CountDownLatch(2);
     
            Thread t1 = new Thread(()->{ // 生产者线程
                    try {
                        for (int i = 0; i < 100; i++)
                            bq.put("z" + i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        cdl.countDown();
                    }
            });
     
            Thread t2 = new Thread(()->{ // 消费者线程
                    try {
                        for (int i = 0; i < 100; i++)
                            System.out.println(bq.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        cdl.countDown();
                    }
            });
            t2.start();
            t1.start();
            cdl.await(); // 等待两个线程结束
            System.out.println(bq.size());
     
        }
    

      

    参考:https://blog.csdn.net/zjt980452483/article/details/81348668

         https://blog.csdn.net/qq_29697901/article/details/90405141

  • 相关阅读:
    MongoDB权限管理
    Termux结合公网kali打造移动渗透神器
    整人病毒vbs大全!
    mongodb 数据库详解
    mongodb 用户及数据库管理命令
    windows入侵排查思路
    linux 下node.js 安装
    Linux下如何用/proc命令查找进程状态信息——当前目录,内存占用,描述符等
    linux c 得到指定进程内存占用
    1分钟彻底理解C语言指针
  • 原文地址:https://www.cnblogs.com/wjqhuaxia/p/11746675.html
Copyright © 2011-2022 走看看