zoukankan      html  css  js  c++  java
  • 通过阻塞队列实现生产者和消费者异步解耦

    生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和消费者可以以不同的速度生产和消费数据。这篇文章我们来看看什么是生产者消费者模式,这个问题也是多线程面试题中经常被提及的。如何使用阻塞队列(Blocking Queue)解决生产者消费者模式,以及使用生产者消费者模式的好处。

    真实世界中的生产者消费者模式

    生产者和消费者模式在生活当中随处可见,它描述的是协调与协作的关系。比如一个人正在准备食物(生产者),而另一个人正在吃(消费者),他们使用一个共用的桌子用于放置盘子和取走盘子,生产者准备食物,如果桌子上已经满了就等待,消费者(那个吃的)等待如果桌子空了的话。这里桌子就是一个共享的对象。在Java Executor框架自身实现了生产者消费者模式它们分别负责添加和执行任务。

    生产者消费者模式的好处

    它的确是一种实用的设计模式,常用于编写多线程或并发代码。下面是它的一些优点:

    1. 它简化的开发,你可以独立地或并发的编写消费者和生产者,它仅仅只需知道共享对象是谁

    2. 生产者不需要知道谁是消费者或者有多少消费者,对消费者来说也是一样

    3. 生产者和消费者可以以不同的速度执行

    4. 分离的消费者和生产者在功能上能写出更简洁、可读、易维护的代码

    多线程中的生产者消费者问题

    生产者消费者问题是一个流行的面试题,面试官会要求你实现生产者消费者设计模式,以至于能让生产者应等待如果队列或篮子满了的话,消费者等待如果队列或者篮子是空的。这个问题可以用不同的方式来现实,经典的方法是使用wait和notify方法在生产者和消费者线程中合作,在队列满了或者队列是空的条件下阻塞,Java5的阻塞队列(BlockingQueue)数据结构更简单,因为它隐含的提供了这些控制,现在你不需要使用wait和nofity在生产者和消费者之间通信了,阻塞队列的put()方法将阻塞如果队列满了,队列take()方法将阻塞如果队列是空的。在下部分我们可以看到代码例子。

    先回顾一下知识点阻塞队列BlockingQueue

    常用实现队列简介:

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

    下面是实现代码 目的是解决生产者和消费者的异步解耦 完全透明

    public class Producer implements Runnable {
    
        BlockingQueue blockingQueue = null;
    
        public Producer(BlockingQueue blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        @Override
        public void run() {
    
            for (int i = 0; i < 5; i++) {
    
                try {
                    System.out.println("producer:" + i);
                    blockingQueue.put(i);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    View Code

    下面是消费者:

    public class Customer implements Runnable {
    
        BlockingQueue blockingQueue = null;
    
        public Customer(BlockingQueue blockingQueue) {
    
            this.blockingQueue = blockingQueue;
        }
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            while (true) {
                try {
                    System.out.println("customer:" + blockingQueue.take());
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
    
            }
    
        }
    
    }
    View Code

    客户单demo

    public class ClientDemo {
    
        public static void main(String[] args) {
    
            BlockingQueue blockingQueue = new LinkedBlockingQueue<>();
    
            Producer producer = new Producer(blockingQueue);
            Customer cusThread = new Customer(blockingQueue);
            Thread producerThread = new Thread(producer);
            Thread customerThread = new Thread(cusThread);
    
            producerThread.start();
    
            customerThread.start();
        }
    
    }

    需要解释一下 里面的两个属性:

    /**
    *
    BlockingQueue.put方法
    public void put(E e)
             throws InterruptedException
    将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
     
    
    指定者:
    接口 BlockingQueue<E> 中的 put
    参数:
    e - 要添加的元素
    抛出:
    InterruptedException - 如果在等待时被中断
    NullPointerException - 如果指定元素为 null
     
    
    */
    BlockingQueue.put

    =====================

     

    BlockingQueue.take
    /**
    
    *
    
    take
    public E take()
           throws InterruptedException
    从接口 BlockingQueue 复制的描述
    获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
     
    
    指定者:
    接口 BlockingQueue<E> 中的 take
    返回:
    此队列的头部
    抛出:
    InterruptedException - 如果在等待时被中断
    */
    
     blockingQueue.take();
    
    
    
      public E take() throws InterruptedException {
    
            E x;
    
            int c = -1;
    
            final AtomicInteger count = this.count;
    
            final ReentrantLock takeLock = this.takeLock;
    
            takeLock.lockInterruptibly();
    
            try {
    
                while (count.get() == 0) {
    
                    notEmpty.await();
    
                }
    
                x = dequeue();
    
                c = count.getAndDecrement();
    
                if (c > 1)
    
                    notEmpty.signal();
    
            } finally {
    
                takeLock.unlock();
    
            }
    
            if (c == capacity)
    
                signalNotFull();
    
            return x;
    
        }

    需要注意的是  put  和take都是阻塞的  在高并发场景下 不利于吞吐率的提升 

  • 相关阅读:
    shell 测试命令
    shell 键盘录入和运算
    shell 的变量
    shell 脚本 helloworld
    让windows系统的DOS窗口也可以显示utf8字符集
    wxpython发布还自己图标的程序
    弥补wxpython无背景图片缺陷
    wxPython实现在浏览器中打开链接
    使用py2exe发布windows平台Python
    python os模块实用函数
  • 原文地址:https://www.cnblogs.com/zhangfengshi/p/9396077.html
Copyright © 2011-2022 走看看