zoukankan      html  css  js  c++  java
  • 并发库应用之十一 & 阻塞队列的应用

           队列包含固定长度的队列和不固定长度的队列,队列的规则就是:先进先出。固定长度的队列往里放数据,如果放满了还要放,阻塞式队列就会等待,直到有数据取出,空出位置后才继续放;非阻塞式队列不能等待就只能报错了。

      查看Condition的JDK文档时,其中简单的模拟实现了阻塞队列的原理,Java中也已经为我们提供了阻塞队列ArrayBlockingQueue,其定义如下:

      public  interface  BlockingQueue<E>  extends  Queue<E>

      BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

      抛出异常 特殊值 阻 塞 超 时
    插 入 add(e) offer(e) put(e) offer(e, time, unit)
    移 除 remove() poll() take() poll(e, time, unit)
    检 查 element() peek() 不可用 不可用

      

      BlockingQueue 不接受 null 元素。试图 addput offer 一个 null 元素时,某些实现会抛出 NullPointerExceptionnull 被用作指示 poll 操作失败的警戒值。

      BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。

      BlockingQueue 实现主要用于生产者-消费者队列,但它另外还支持 Collection 接口。因此,举例来说,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

      BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

      java.util.concurrent.ArrayBlockingQueue<E>  E--在此 collection 中保持的元素类型

      extends AbstractQueue<E> implements BlockingQueue<E>, Serializable

      一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。

      这是一个典型的"有界缓存区",固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

      此类支持对等待的生产者线程和消费者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。

    对应构造方法摘要:

      ArrayBlockingQueue(int capacity)          创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue

        ArrayBlockingQueue(int capacity, boolean fair)  创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue

        ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

        创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素

    方法摘要:

      boolean  add(E e) 将指定的元素插入此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛异常

      void  clear()    自动移除此队列中的所有元素

      boolean  contains(Object o)  如果此队列包含指定的元素,则返回 true

      int  drainTo(Collection<E> c)   移除此队列中所有可用的元素,并将它们添加到给定 collection 中

      int  drainTo(Collection<E> c, int maxElements)  最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中

      ......

      这里我就不冗余多写这些方法的介绍了,有用到或者感兴趣的直接查文档即可。

      阻塞队列与Semaphore有些相似,但也不同,阻塞队列是一方存放数据,另一方释放数据,Semaphore通常则是由同一方设置和释放信号量。ArrayBlockingQueue只有put方法和take方法才具有阻塞功能,其他的不是得到一个指定的特殊值就是抛异常。
    应用案例:
         用3个空间的队列来演示阻塞队列的功能和效果。

     1 import java.util.concurrent.ArrayBlockingQueue;
     2 import java.util.concurrent.BlockingQueue;
     3 
     4 public class BlockingQueueTest {
     5     public static void main(String[] args) {
     6         //创建具有3个大小容量的阻塞队列
     7         final BlockingQueue queue = new ArrayBlockingQueue(3);
     8         for (int i = 0; i < 2; i++) { //分别启动3个写入和取出阻塞队列数据线程
     9             new Thread() {
    10                 public void run() {
    11                     while (true) {
    12                         try {
    13                             Thread.sleep((long) (Math.random() * 1000));
    14                             System.out.println(String.format("%s 准备放入队列数据...", Thread.currentThread().getName()));
    15                             queue.put(1);
    16                             System.out.println(String.format("%s 已经放了数据,队列目前有 %d 个数据", Thread.currentThread().getName(), queue.size()));
    17                         } catch (InterruptedException e) {
    18                             e.printStackTrace();
    19                         }
    20                     }
    21                 }
    22             }.start();
    23         }
    24 
    25         new Thread() {
    26             public void run() {
    27                 while (true) {
    28                     try {
    29                         Thread.sleep(1000);
    30                         System.out.println(String.format("%s 准备从队列取数据...", Thread.currentThread().getName()));
    31                         queue.take();
    32                         System.out.println(String.format("%s 已经取走了数据,队列目前有 %d 个数据", Thread.currentThread().getName(), queue.size()));
    33                     } catch (InterruptedException e) {
    34                         e.printStackTrace();
    35                     }
    36                 }
    37             }
    38 
    39         }.start();
    40     }
    41 }

    以上代码运行打印效果如下:

      

     提示:欢迎继续参看我相关的下一篇博客:并发库应用之十二 & 常用集合问题汇总

  • 相关阅读:
    Nginx会话保持之nginx-sticky-module模块
    企业级分布式应用服务EDAS _Dubbo商业版_微服务PaaS平台 【EDAS Serverless 运维 创业】
    git repository description
    运维成长
    jenkins+maven+tomcat集群发布
    Leaf——美团点评分布式ID生成系统 UUID & 类snowflake
    tomcat redis 集群 session共享
    JEECG & JEESite Tomcat集群 Session共享
    分布式Tomcat session会话Sticky Sessions问题
    Memcached 集群架构与memcached-session-manager
  • 原文地址:https://www.cnblogs.com/liang1101/p/6528569.html
Copyright © 2011-2022 走看看