zoukankan      html  css  js  c++  java
  • 使用阻塞队列实现生产者消费者问题

      相比Synchronized+wait()+notify()  和  Lock(Condition)+await()+signal() 的方式控制多线程之间的阻塞和唤醒,使用阻塞队列BlockingQueue更简单,程序员不需要花精力去关心什么时候需要阻塞线程,什么时候需要唤醒线程,一起交给BlockingQueue

      使用BlockingQueue的代码示例:

    1.资源类代码,内部有生产方法和消费方法,数据都存在阻塞队列中,使用阻塞队列的offer()和poll()插入和移除数据

    public class Source{//资源类
        private volatile boolean flag = true;//控制标志,true:生产和消费,false:不生产也不消费
        private AtomicInteger atomicInteger = new AtomicInteger(0);//原子类,避免写覆盖
        private BlockingQueue<String> blockingDeque = null;//阻塞对列
    
        public Source(BlockingQueue blockingDeque) {
            this.blockingDeque = blockingDeque;
        }
    
        /**
         * 生产数据
         * @author yang yajun
         * @date 2020/12/26 22:25
         * @Description: TODO
         */
        public void prod() throws InterruptedException {
    
            String data = null;
            boolean offer = false;
            while (flag){
                data = atomicInteger.incrementAndGet()+"";
                //插入数据,对列满了的话,就等2秒,如果还不行就返回false
                offer = blockingDeque.offer(data, 2L, TimeUnit.SECONDS);
                if(offer) {
                    System.out.println(Thread.currentThread().getName()+"生产成功"+data);
                }else {
                    System.out.println(Thread.currentThread().getName()+"2秒后生产失败");
                }
            }
            System.out.println(Thread.currentThread().getName()+"flag=false,停止生产");
        }
        /**
         * 消费数据
         * @author yang yajun
         * @date 2020/12/26 22:29
         * @Description: TODO
         */
        public void consumer() throws InterruptedException {
    
            String data = null;
            while (flag){
                //移除数据,对列为空的话,就等2秒,如果还不行就返回null
                data = blockingDeque.poll(2L, TimeUnit.SECONDS);
                if(data==null) {
    //                flag=false;
                    System.out.println(Thread.currentThread().getName()+"2秒后,获取数据失败");
                }else
                    System.out.println(Thread.currentThread().getName()+"获取数据成功"+data);
    
            }
            
        }
        /**
         * 对列停止工作
         * @author yang yajun
         * @date 2020/12/26 22:29
         * @Description: TODO
         */
        public void stop() {
            flag = false;
        }
    }

    2.主线程代码示例,启2个线程,分别调用资源类中的生产方法和消费方法,阻塞队列从构造器传入,队列界值设为3

    /**
     * 生产者消费者的高级版,BlockingQueue
     * @author yang yajun
     * @date 2020/12/2621:34
     * @Description: TODO
     */
    public class ProdConsumerBlockingQ {
        public static void main(String[] arg0) throws InterruptedException {
            BlockingQueue<String> blockingDeque = new ArrayBlockingQueue<>(3);
            Source source = new Source(blockingDeque);
    
            new Thread(()->{
                try {
                    source.prod();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            new Thread(()->{
                try {
                    source.consumer();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            TimeUnit.SECONDS.sleep(3);//主线程3秒后,停止阻塞队列工作
            System.out.println();
            System.out.println();
            System.out.println();
            System.out.println("停止");
            source.stop();
        }
    }

    注意:上面的代码有点漏洞,因为,在主线程执行stop方法后,阻塞队列里可能还有数据没被消费,看下面执行结果,可以看到有数据被生产了,但3732037321、37322没有被消费

    Thread-0生产成功37316
    Thread-0生产成功37317
    Thread-0生产成功37318
    Thread-1获取数据成功37316
    Thread-1获取数据成功37317
    Thread-1获取数据成功37318
    Thread-0生产成功37319
    Thread-0生产成功37320
    Thread-0生产成功37321
    Thread-0生产成功37322
    
    
    
    停止
    Thread-1获取数据成功37319
    Thread-02秒后生产失败
    Thread-0flag=false,停止生产

    要想保证队列中的数据被完全消费,其实好解决,就在消费方法中循环阻塞队列界值的次数,每次都调用poll()【不加时间参数的方法】就可以了

  • 相关阅读:
    021.day21 反射 Class类 反射常用操作
    020.day20 线程概述 多线程优缺点 线程的创建 线程常用方法 生命周期 多线程同步
    019.day19 缓冲流 对象流 标准输入输出流
    018.day18 map集合如何实现排序 File类 IO流 字节流 字符流 编码
    017.day17 Map接口 克隆 treeSet集合排重缺陷
    016.day16 HashSet TreeSet 比较器Comparable Comparator
    015.day15
    014.day14
    013.day13
    线程
  • 原文地址:https://www.cnblogs.com/yayin/p/14194796.html
Copyright © 2011-2022 走看看