zoukankan      html  css  js  c++  java
  • 通过阻塞队列实现生产者和消费者异步解耦

    生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和消费者可以以不同的速度生产和消费数据。这篇文章我们来看看什么是生产者消费者模式,这个问题也是多线程面试题中经常被提及的。如何使用阻塞队列(Blocking Queue)解决生产者消费者模式,以及使用生产者消费者模式的好处。

    真实世界中的生产者消费者模式

    生产者和消费者模式在生活当中随处可见,它描述的是协调与协作的关系。比如一个人正在准备食物(生产者),而另一个人正在吃(消费者),他们使用一个共用的桌子用于放置盘子和取走盘子,生产者准备食物,如果桌子上已经满了就等待,消费者(那个吃的)等待如果桌子空了的话。这里桌子就是一个共享的对象。在Java Executor框架自身实现了生产者消费者模式它们分别负责添加和执行任务。

    生产者消费者模式的好处

    它的确是一种实用的设计模式,常用于编写多线程或并发代码。下面是它的一些优点:

    1. 它简化的开发,你可以独立地或并发的编写消费者和生产者,它仅仅只需知道共享对象是谁

    2. 生产者不需要知道谁是消费者或者有多少消费者,对消费者来说也是一样

    3. 生产者和消费者可以以不同的速度执行

    4. 分离的消费者和生产者在功能上能写出更简洁、可读、易维护的代码

    多线程中的生产者消费者问题

    生产者消费者问题是一个流行的面试题,面试官会要求你实现生产者消费者设计模式,以至于能让生产者应等待如果队列或篮子满了的话,消费者等待如果队列或者篮子是空的。这个问题可以用不同的方式来现实,经典的方法是使用wait和notify方法在生产者和消费者线程中合作,在队列满了或者队列是空的条件下阻塞,Java5的阻塞队列(BlockingQueue)数据结构更简单,因为它隐含的提供了这些控制,现在你不需要使用wait和nofity在生产者和消费者之间通信了,阻塞队列的put()方法将阻塞如果队列满了,队列take()方法将阻塞如果队列是空的。在下部分我们可以看到代码例子。

    先回顾一下知识点阻塞队列BlockingQueue

    常用实现队列简介:

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

    下面是实现代码 目的是解决生产者和消费者的异步解耦 完全透明

    public class Producer implements Runnable {
    
        BlockingQueue blockingQueue = null;
    
        public Producer(BlockingQueue blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        @Override
        public void run() {
    
            for (int i = 0; i < 5; i++) {
    
                try {
                    System.out.println("producer:" + i);
                    blockingQueue.put(i);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    View Code

    下面是消费者:

    public class Customer implements Runnable {
    
        BlockingQueue blockingQueue = null;
    
        public Customer(BlockingQueue blockingQueue) {
    
            this.blockingQueue = blockingQueue;
        }
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            while (true) {
                try {
                    System.out.println("customer:" + blockingQueue.take());
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
    
            }
    
        }
    
    }
    View Code

    客户单demo

    public class ClientDemo {
    
        public static void main(String[] args) {
    
            BlockingQueue blockingQueue = new LinkedBlockingQueue<>();
    
            Producer producer = new Producer(blockingQueue);
            Customer cusThread = new Customer(blockingQueue);
            Thread producerThread = new Thread(producer);
            Thread customerThread = new Thread(cusThread);
    
            producerThread.start();
    
            customerThread.start();
        }
    
    }

    需要解释一下 里面的两个属性:

    /**
    *
    BlockingQueue.put方法
    public void put(E e)
             throws InterruptedException
    将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
     
    
    指定者:
    接口 BlockingQueue<E> 中的 put
    参数:
    e - 要添加的元素
    抛出:
    InterruptedException - 如果在等待时被中断
    NullPointerException - 如果指定元素为 null
     
    
    */
    BlockingQueue.put

    =====================

     

    BlockingQueue.take
    /**
    
    *
    
    take
    public E take()
           throws InterruptedException
    从接口 BlockingQueue 复制的描述
    获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
     
    
    指定者:
    接口 BlockingQueue<E> 中的 take
    返回:
    此队列的头部
    抛出:
    InterruptedException - 如果在等待时被中断
    */
    
     blockingQueue.take();
    
    
    
      public E take() throws InterruptedException {
    
            E x;
    
            int c = -1;
    
            final AtomicInteger count = this.count;
    
            final ReentrantLock takeLock = this.takeLock;
    
            takeLock.lockInterruptibly();
    
            try {
    
                while (count.get() == 0) {
    
                    notEmpty.await();
    
                }
    
                x = dequeue();
    
                c = count.getAndDecrement();
    
                if (c > 1)
    
                    notEmpty.signal();
    
            } finally {
    
                takeLock.unlock();
    
            }
    
            if (c == capacity)
    
                signalNotFull();
    
            return x;
    
        }

    需要注意的是  put  和take都是阻塞的  在高并发场景下 不利于吞吐率的提升 

  • 相关阅读:
    Yield Usage Understanding
    Deadclock on calling async methond
    How to generate file name according to datetime in bat command
    Run Unit API Testing Which Was Distributed To Multiple Test Agents
    druid的关键参数+数据库连接池运行原理
    修改idea打开新窗口的默认配置
    spring boot -thymeleaf-url
    @pathvariable和@RequestParam的区别
    spring boot -thymeleaf-域对象操作
    spring boot -thymeleaf-遍历list和map
  • 原文地址:https://www.cnblogs.com/zhangfengshi/p/9396077.html
Copyright © 2011-2022 走看看