zoukankan      html  css  js  c++  java
  • 多线程——生产者消费者模式三种实现方式

      生产者消费者模式通过一个阻塞队列来解决两者之间的强耦合问题。阻塞队列相当于一个缓冲区,平衡消费者和生产者的处理能力。

      阻塞队列有数据——生产者不生产,阻塞队列没数据——消费者不消费

    一、synchronized+wait+notifyAll

     生产

    package com.ProductCusromer.method2;
    
    import java.util.List;
    import java.util.Random;
    
    /**
     * @author Millet
     * @date 2020/3/30 17:59
     */
    public class Product2 implements Runnable {
        private List list;
    
        public Product2(List list) {
            this.list = list;
        }
        //多个生产者对应多个消费者
        @Override
        public void run() {
            Random random = new Random(50);//相当于商品
            while (true){
                synchronized (list){
                    while(list.size()>0){
                        //集合中有数据就停止写入,等待消费者消费完
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //执行到这说明集合没有数据了
                    list.add((random.nextInt(50)));
                    //提示
                    System.out.println(Thread.currentThread().getName()+"线程生产了:"+ list.get(0));
                    list.notifyAll();
                }
            }
        }
    }

     消费者

    package com.ProductCusromer.method2;
    
    import java.util.List;
    
    /**
     * @author Millet
     * @date 2020/3/30 18:07
     */
    public class Consumer2 implements Runnable {
        private List list;
    
        public Consumer2(List list) {
            this.list = list;
        }
    
        @Override
        public void run() {
            while(true){
                synchronized (list){
                    while(list.size()<=0){//适合多生产者多消费者
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                     try {
                         Thread.sleep(500);
                      } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
    
                     //提示
                      System.err.println(Thread.currentThread().getName()+"线程消费了:"+ list.remove(0));
                      list.notifyAll();
                    }
                }
        }
    }

    二、ReetrantLock+Condition

    生产者

    package com.ProductCusromer.method3;
    
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    /**
     * @author Millet
     * @date 2020/3/30 17:59
     */
    public class Product3 implements Runnable {
        private List list;
        private Lock mLock;
        private Condition mCondition;
        public Product3(List list, Lock lock, Condition condition) {
            this.list = list;
            this.mLock = lock;
            this.mCondition = condition;
        }
        //一个生产者对应一个消费者
        @Override
        public void run() {
            Random random = new Random(50);//相当于商品
            while (true){
                mLock.lock();
                if(list.size()>0){
                    //集合中有数据就停止写入,等待消费者消费完
                    try {
                        mCondition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //执行到这说明集合没有数据了
                list.add((random.nextInt(50)));
                //提示
                System.out.println(Thread.currentThread().getName()+"线程生产了:"+ list.get(0));
                mCondition.signalAll();
                mLock.unlock();
            }
        }
    }

     消费者

    package com.ProductCusromer.method2;
    
    import java.util.List;
    
    /**
     * @author Millet
     * @date 2020/3/30 18:07
     */
    public class Consumer2 implements Runnable {
        private List list;
    
        public Consumer2(List list) {
            this.list = list;
        }
    
        @Override
        public void run() {
            while(true){
                synchronized (list){
                    while(list.size()<=0){//适合多生产者多消费者
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                     try {
                         Thread.sleep(500);
                      } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
    
                     //提示
                      System.err.println(Thread.currentThread().getName()+"线程消费了:"+ list.remove(0));
                      list.notifyAll();
                    }
                }
        }
    }

    三、BlockingQueue实现

    生产者

    package com.ProductCusromer.method3;
    
    import java.util.concurrent.BlockingQueue;
    
    /**
     * @author Millet
     * @date 2020/3/30 21:28
     */
    public class Product4 implements Runnable {
        private BlockingQueue queue;
        private String name;
        public Product4(String name, BlockingQueue queue) {
            this.queue = queue;
            this.name = name;
        }
        @Override
        public void run() {
            while(true){
                try {
                    int val = (int) Math.random();
                    queue.put(val);
                    System.out.println(name + "生产:"+val+".当前队列长度:"+ queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    消费者

    package com.ProductCusromer.method3;
    
    import java.util.concurrent.BlockingDeque;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    /**
     * @author Millet
     * @date 2020/3/30 21:28
     */
    public class Consumer4 implements Runnable{
        private BlockingQueue queue;
        private String name;
        public Consumer4(String name, BlockingQueue queue) {
            this.queue = queue;
            this.name = name;
        }
        @Override
        public void run() {
            try {
                while (true){
                    int val = (int) queue.take();
                    System.out.println(name + "消费:"+val+".当前队列长度:"+ queue.size());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    测试

    package com.ProductCusromer.method3;
    
    import com.ProductCusromer.method1.Consumer1;
    import com.ProductCusromer.method1.Product1;
    
    import javax.persistence.criteria.CriteriaBuilder;
    import java.util.List;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * @author Millet
     * @date 2020/3/30 20:33
     */
    public class Main {
        public static void main(String[] args) {
            BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(20);
            Product4 producer1 = new Product4("生产1号", queue);
            Product4 producer2 = new Product4("生产2号", queue);
            Product4 producer3 = new Product4("生产3号", queue);
    
            Consumer4 consumer1 = new Consumer4("消费1号", queue);
            Consumer4 consumer2 = new Consumer4("消费2号", queue);
    
    // 开始producer线程进行生产
            new Thread(producer1).start();
            new Thread(producer2).start();
            new Thread(producer3).start();
    
    // 开始consumer线程进行消费。
            new Thread(consumer1).start();
            new Thread(consumer2).start();
        }
    }

    结果

  • 相关阅读:
    翻转单词顺序列
    和为S的两个数字
    单例模式
    python利用pyinstaller打包常用打包命令
    python 3.8 使用pymssql 向SQL Server插入数据不成功原因
    PyQt5(designer)入门教程
    PyQt5中文教程
    scrapy 图片爬取 多层多页 保存不同的文件夹 重命名full文件夹
    安装Python + PyCharm + PyQt5配套设置
    python用pymysql模块操作数据库MySQL,实现查增删改
  • 原文地址:https://www.cnblogs.com/qmillet/p/12601518.html
Copyright © 2011-2022 走看看