zoukankan      html  css  js  c++  java
  • 聊聊并发(四)——阻塞队列

    一、概述

    1、介绍

      强烈建议读者看这篇之前,先了解队列相关知识,以及生产者与消费者模式

      concurrent 包中,BlockingQueue 很好的解决了多线程中,如何高效安全"传输"数据的问题。通过这些高效并且线程安全的队列类,为快速搭建高质量的多线程程序带来极大的便利。
      阻塞队列,首先它是一个队列(先进先出),通过一个共享的队列,可以使得数据从队列的一端输入,从另外一端输出。

      当队列是的,从队列中获取元素的操作将会被阻塞。
      当队列是的,从队列中添加元素的操作将会被阻塞。

      api文档:https://www.matools.com/api/java8

    2、为什么需要阻塞队列?

      理由:需要结合生产者与消费者来说。
      好处就是不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue 都一手包办了。在 concurrent 包发布之前,多线程环境下,程序员必须自己控制这些细节,尤其还要兼顾效率和线程安全,这给程序带来不小的复杂度。

    3、相关API

      BlockingQueue 核心方法

    方法类型
    抛出异常
    特殊值
    阻塞
    超时
    插入
    add(e)
    offer(e)
    put(e)
    offer(e,time,unit)
    删除
    remove()
    poll()
    take()
    poll(time,unit)
    检查
    element()
    peek()
    不可用
    不可用
    抛出异常
    队满时:再插入元素会抛出异常
    队空时:再删除元素会抛出异常
    特殊值
    插入:成功 true,失败false
    删除:成功,返回队元素。没有就返回null
    阻塞
    队满时:再 put 元素会一直阻塞,或响应中断退出。
    队空时:再 take 元素会一直阻塞。
    超时
    队满时:再插入元素,会阻塞一定时间,超时后退出。
    队空时:再删除元素,会阻塞一定时间,超时后退出。

    4、作用

      API文档中明确提出,阻塞队列被设计主要就是用于生产者与消费者模式。这样并不需要用到Lock 或 synchronized 以及等待唤醒机制。
      而是仅仅用到了阻塞队列以及原子变量类,就可以实现生产者消费者模型。不用程序员关心具体的加锁解锁过程,而是更关注具体的业务逻辑。

    二、分类

    1、ArrayBlockingQueue

      数组结构组成的有界阻塞队列。
      基于数组的阻塞队列,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象。这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
      ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue。
    按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的 Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于 GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

    2、LinkedBlockingQueue

      链表结构组成的有界(大小默认值为integer.MAX_VALUE)阻塞队列。
      基于链表的阻塞队列,同 ArrayBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时,才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

      LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

    3、DelayQueue

      使用优先级队列实现的延迟无界阻塞队列。

      DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

    4、PriorityBlockingQueue

      支持优先级排序无界阻塞队列。
      基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),但需要注意的是 PriorityBlockingQueue 并不会阻塞生产者,而只会在没有可消费的数据时,阻塞消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是非公平锁。

    5、SynchronousQueue

      不存储元素的阻塞队列,即单个元素的队列。单个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
      一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么大家都在集市等待。
      相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区)。如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给哪些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖)。
      但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。

      源码示例:构造器

     1 /**
     2  * Creates a {@code SynchronousQueue} with nonfair access policy.
     3  */
     4 public SynchronousQueue() {
     5     this(false);
     6 }
     7 
     8 /**
     9  * Creates a {@code SynchronousQueue} with the specified fairness policy.
    10  *
    11  * @param fair if true, waiting threads contend in FIFO order for
    12  *        access; otherwise the order is unspecified.
    13  */
    14  // 如果fair是true,那么等待的线程通过一个FIFO保证顺序;否则顺序是不明确的
    15 public SynchronousQueue(boolean fair) {
    16     transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    17 }

      公平模式和非公平模式的区别:
      公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体现整体的公平策略。
      非公平模式(默认):SynchronousQueue 采用非公平锁,同时配合一个 LIFO 栈来管理多余的生产者和消费者。
      后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或消费者的数据永远得不到处理。

    6、LinkedTransferQueue

      链表组成的无界阻塞队列。一个由链表结构组成的无界阻塞 TransferQueue 队列。
      相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。

    7、LinkedBlockingDeque

      链表组成的双向阻塞队列。
      LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作,可能会阻塞该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况
      插入元素时:如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再将该元素插入。该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异常。
      读取元素时:如果当前队列为空会阻塞,直到队列不为空然后返回元素,同样可以通过设置超时参数。

    三、SynchronousQueue

      没有容量,与其他BlockingQueue不同,SychronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
      代码示例:

     1 public class SynchronousQueueDemo {
     2     public static void main(String[] args) {
     3         BlockingQueue<Integer> blockingQueue = new SynchronousQueue<>();
     4 
     5         // AAA线程主要用来对阻塞队列进行put操作
     6         new Thread(() -> {
     7             try {
     8                 System.out.println(Thread.currentThread().getName() + "	" + "put 1");
     9                 blockingQueue.put(1);
    10                 System.out.println(Thread.currentThread().getName() + "	" + "put 2");
    11                 blockingQueue.put(2);
    12                 System.out.println(Thread.currentThread().getName() + "	" + "put 3");
    13                 blockingQueue.put(3);
    14             } catch (InterruptedException e) {
    15                 e.printStackTrace();
    16             }
    17         }, "AAA").start();
    18 
    19         // BBB线程主要从阻塞队列当中进行take操作
    20         new Thread(() -> {
    21             try {
    22                 Thread.sleep(5000);
    23                 System.out.println(Thread.currentThread().getName() + "	 take:" + blockingQueue.take());
    24                 Thread.sleep(5000);
    25                 System.out.println(Thread.currentThread().getName() + "	 take:" + blockingQueue.take());
    26                 Thread.sleep(5000);
    27                 System.out.println(Thread.currentThread().getName() + "	 take:" + blockingQueue.take());
    28             } catch (InterruptedException e) {
    29                 e.printStackTrace();
    30             }
    31         }, "BBB").start();
    32     }
    33 }

      参考文档:https://www.matools.com/api/java8

    作者:Craftsman-L

    本博客所有文章仅用于学习、研究和交流目的,版权归作者所有,欢迎非商业性质转载。

    如果本篇博客给您带来帮助,请作者喝杯咖啡吧!点击下面打赏,您的支持是我最大的动力!

  • 相关阅读:
    centos7-根据端口号查看所属应用
    centos7-网络与防火墙常用命令
    docker(5)常用命令
    docker(4)使用Dockerfile文件创建镜像-对docker(3)的改进
    [多校联考2021] 模拟赛4
    [省选前集训2021] 模拟赛7
    [多校联考2021] 模拟赛3
    [学习笔记] 圆方树
    [多校联考2021] 模拟赛2
    [学习笔记] 反悔贪心
  • 原文地址:https://www.cnblogs.com/originator/p/15547126.html
Copyright © 2011-2022 走看看