zoukankan      html  css  js  c++  java
  • Java并发编程-阻塞队列(BlockingQueue)的实现原理

    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

    BlockingQueue 的操作方法

    BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下: 
    这里写图片描述

    四组不同的行为方式解释:

    • 抛异常:如果试图的操作无法立即执行,抛一个异常。
    • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
    • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
    • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false)。

    无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。

    可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

    BlockingQueue 的实现类

    BlockingQueue 是个接口,你需要使用它的实现之一来使用BlockingQueue,java.util.concurrent包下具有以下 BlockingQueue 接口的实现类:

      • ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。

      • DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。

      • LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。

      • PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。

      • SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

    • 使用例子:

      阻塞队列的最长使用的例子就是生产者消费者模式,也是各种实现生产者消费者模式方式中首选的方式。使用者不用关心什么阻塞生产,什么时候阻塞消费,使用非常方便,代码如下:

      package MyThread;
      
      import java.util.Random;
      import java.util.concurrent.BlockingQueue;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.TimeUnit;
      
      public class BlockingQueueTest {
          //生产者
          public static class Producer implements Runnable{
              private final BlockingQueue<Integer> blockingQueue;
              private volatile boolean flag;
              private Random random;
      
              public Producer(BlockingQueue<Integer> blockingQueue) {
                  this.blockingQueue = blockingQueue;
                  flag=false;
                  random=new Random();
      
              }
              public void run() {
                  while(!flag){
                      int info=random.nextInt(100);
                      try {
                          blockingQueue.put(info);
                          System.out.println(Thread.currentThread().getName()+" produce "+info);
                          Thread.sleep(50);
                      } catch (InterruptedException e) {
                          // TODO Auto-generated catch block
                          e.printStackTrace();
                      }               
                  }
              }
              public void shutDown(){
                  flag=true;
              }
          }
          //消费者
          public static class Consumer implements Runnable{
              private final BlockingQueue<Integer> blockingQueue;
              private volatile boolean flag;
              public Consumer(BlockingQueue<Integer> blockingQueue) {
                  this.blockingQueue = blockingQueue;
              }
              public void run() {
                  while(!flag){
                      int info;
                      try {
                          info = blockingQueue.take();
                          System.out.println(Thread.currentThread().getName()+" consumer "+info);
                          Thread.sleep(50);
                      } catch (InterruptedException e) {
                          // TODO Auto-generated catch block
                          e.printStackTrace();
                      }               
                  }
              }
              public void shutDown(){
                  flag=true;
              }
          }
          public static void main(String[] args){
              BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
              Producer producer=new Producer(blockingQueue);
              Consumer consumer=new Consumer(blockingQueue);
              //创建5个生产者,5个消费者
              for(int i=0;i<10;i++){
                  if(i<5){
                      new Thread(producer,"producer"+i).start();
                  }else{
                      new Thread(consumer,"consumer"+(i-5)).start();
                  }
              }
      
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }
              producer.shutDown();
              consumer.shutDown();
      
          }
      }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85

      阻塞队列原理:

      其实阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制。使用BlockingQueue封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的await/signal操作了。

  • 相关阅读:
    Java代码打成jar后 classgetClassLoadergetResource("")返回为null
    springboot-yml内list、map组合写法
    rpc-java 生成代码路径设置
    Git操作 :从一个分支cherry-pick多个commit到其他分支
    使用maven插件生成grpc所需要的Java代码
    'Failed to import pydot. You must `pip install pydot` and install graphviz
    seasonal_decompose plot figsize
    Failed to install 'TwoSampleMR' from GitHub
    prophet Building wheel for fbprophet (setup.py) ... error
    python matplotlib 绘图线条类型和颜色选择
  • 原文地址:https://www.cnblogs.com/renjiaqi/p/8296193.html
Copyright © 2011-2022 走看看