zoukankan      html  css  js  c++  java
  • Java阻塞队列简介

    个人理解

    总体认知

    • 本质上是队列,但是并不一定是FIFO的,比如PriorityBlockingQueue
    • 阻塞: 线程的状态
      • 生产者阻塞: 队列满
      • 消费者阻塞: 队列空

    只要对阻塞队列有一个整体的认知,相信理解其各种实现就很轻松了。

    如果没有BlockingQueue

    • 如果没有BlockingQueue,那多线程就需要在自己run方法里实现各种场景的等待-通知机制
    • 多线程将各种场景下的等待-通知机制解耦到了实现了BlockingQueue接口的各个工具类中,就像中间件MQ一样

    阻塞队列主要的操作

    插入到阻塞队列

    插入元素 说明
    add(e) 成功返回true,如果队列满了,则抛出IllegalStateException
    put(e) 如果队列满了,则阻塞(当前线程后续会被唤醒),否则插入元素到队列中
    offer(e) 成功返回true,如果队列满了返回false
    offer(e,time,unit) 在设定的等待时间内,成功返回true; 超过等待时间,返回false

    从阻塞队列中移除

    移除元素 说明
    remove(o) 移除指定元素,如果存在返回true,否则返回false
    take() 如果队列为,则进入阻塞状态等待,否则队头元素出队。
    JDK源码应用:ThreadPoolExecutor#getTask
    poll() 队头元素出队,如果队列为空,则返回null
    poll(time,unit) 如果队列为空,则等待获取,指定时间内如果队列仍然为空,返回null; 否则队头元素出队。
    JDK源码应用:ThreadPoolExecutor#getTask

    将一个阻塞队列的一些元素移动到另一个Collection

    drainTo(Collection<? super E> c,int maxElements);

    将当前阻塞队列中的元素出队指定个数到集合c中。

    BlockingQueue的类型

    无界队列

    • 可以自动增长,无需设置size

      • LinkedBlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque<>();
        

        上面代码中,其实底层已经设置了最大size为了Integer。MAX_VALUE,2^31-1,即二进制表示为31个1

    • 由于是无界队列,它可能会变得非常大,所以使用无界队列因保证消费者快速地消费队列元素,否则可能会导致OOM,一般不推荐不设定阻塞队列的size

    有界队列

    • 有最大的size限制

      • LinkedBlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>(16);
        

        size为16,如果队列满了生产者会阻塞; 如果队列为空,消费者会阻塞。

    • 应该多使用有界队列开发我们地程序,避免队列任务积压,资源耗尽

    线程对阻塞队列操作的公平性

    • 根据先来先执行的规则,如果线程被阻塞后,按照被阻塞的先后顺序被唤醒,则是公平的,反之为不公平
    • 但是保证公平性会降低吞吐量,一般业务场景中不会要求同一类线程执行的先后顺序,可以使用非公平锁
    • 例如ArrayBlockingQueue底层通过是否为公平锁的ReentrantLock来实现是否公平,而ReentrantLock使用双向链表保证公平性

    Java中阻塞队列的一些实现类

    ArrayBlockingQueue

    • 基于数组实现的有界阻塞队列,初始化会分配指定sizeObject数组
    • 入队操作和出队操作都是使用的同一把锁
    • 出队顺序是FIFO,在默认构造方法里不保证多线程操作的公平性

    LinkedBlockingQueue

    • 基于链表实现的有界(理论上size最大为2^31-1)或无界(不用用户手动指定size)阻塞队列,由于是基于链表实现的,所以初始化不需要为存储元素分配内存
    • 入队操作和出队操作都是使用的两把不同的锁,唤醒也是两把锁对应的两个Condition对象
      • 例如生产者调用put方法时,如果队列没有满会尝试唤醒生产者,并且如果队列之前是空,入队之后还会唤醒消费者,该设计尽可能减少了线程等待的时间,适合高并发场景
    • 出队顺序是FIFO
    • Executors#newFixedThreadPool(int)底层应用该队列

    LinkedBlockingQueue与ArrayBlockingQueue异同

    操作 LinkedBlockingQueue ArrayBlockingQueue
    初始化 基于链表实现的初始化是可以不用指定队列大小(默认是Integer。MAX_VALUE);
    由于是基于链表,初始化无需分配之后存储元素的内存
    基于数组实现的,初始化时必须指定队列大小初始化就要分配完整的Array内存
    同步 入队出队操作分别对应不同的锁,一共两把锁,唤醒自然也是两个不同的Condition对象
    高效地唤醒方式,尽可能减少生产者和消费者线程阻塞的时间,见LinkedBlockingQueue#put
    入队出队操作使用同一把锁, 粒度粗,锁未分离
    低效地唤醒方式(随机唤醒一个生产者或消费者)

    PriorityBlockingQueue

    • 基于数据结构中的实现,所以底层用数组实现更简单,无界阻塞队列,就像ArrayList,即使设定容量,但需要入队更多的元素时,底层数组就会扩容
    • 入队操作和出队操作都是使用的同一把锁
    • 出队顺序默认是按照集合元素的compare方法的返回值,返回值小于0,越先被执行,构造方法也可以传入Comparator的实现自定义比较的优先级

    SynchronousQueue

    • 不存储元素生产者线程一次put必须要有一个消费者线程take,否则后续生产者线程会wait
    • Executors#newCachedThreadPool()底层应用该队列

    Demo代码

    抽象设计打印和测试各个BlockingQueue的特点

    • 用一个类LogDemoProducerAndConsumer.java, 目的是对测试各个BlockingQueue提供一个公共的测试方法logDemoProducerAndConsumer

    • 构造方法需要传入一个BlockingQueue的实例

    • logDemoProducerAndConsumer将会通过日志的形式打印BlockingQueue一对一情况下的生产者-消费者的入队与出队操作,默认让消费者的消费速率小于生产者的生产速率

    • 注意: 在本Demo中, 出队和打印log两步操作不具有原子性, 入队和打印log两步操作不具有原子性, 即log打印的顺序并不一定是多线程执行的顺序, 但通过log能体会到各个BlockingQueue的特性

    /**
     * 打印log: 生产者生产地元素和消费者消费地元素.<br>
     * 注意: 在本Demo中, 出队和打印log两步操作不具有原子性, 入队和打印log两步操作不具有原子性<br>
     * 即log打印的顺序并不一定是多线程执行的顺序, 但通过log能体会到各个BlockingQueue的特性
     *
     * @author www.cnblogs.com/theRhyme
     * @date 2021/03/02
     */
    @Slf4j
    public class LogDemoProducerAndConsumer {
    
      private int taskCount = 16;
      private BlockingQueue<Integer> blockingQueue;
      /** 生产者将元素入队, 注意入队和打印log两步操作不具有原子性, 即log打印的顺序并不一定是多线程执行的顺序 */
      private final Runnable producerRunnable =
          () -> {
            for (int i = 0; i < taskCount; i++) {
              try {
                TimeUnit.MILLISECONDS.sleep(1);
                this.put(i);
                log.info("Producer ({}) put: ({}).", Thread.currentThread().getName(), i);
              } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
              }
            }
          };
      /** 消费者将元素出队, 注意出队和打印log两步操作不具有原子性, 即log打印的顺序并不一定是多线程执行的顺序 */
      private final Runnable consumerRunnable =
          () -> {
            try {
    
              // 模拟消费者出队操作比生产者入队操作慢
              TimeUnit.MILLISECONDS.sleep(6);
              for (int i = 0; i < taskCount; i++) {
                // 模拟消费者出队操作比生产者入队操作慢
                TimeUnit.MILLISECONDS.sleep(3);
                log.info("Consumer ({}) take: ({}).", Thread.currentThread().getName(), this.take());
              }
            } catch (InterruptedException e) {
              e.printStackTrace();
              Thread.currentThread().interrupt();
            }
          };
    
      public LogDemoProducerAndConsumer(BlockingQueue<Integer> blockingQueue) {
        this.blockingQueue = blockingQueue;
      }
    
      public void logDemoProducerAndConsumer() throws InterruptedException {
    
        final Thread p1 = new Thread(producerRunnable, "p1");
        p1.start();
    
        final Thread c1 = new Thread(consumerRunnable, "c1");
        c1.start();
    
        p1.join();
        c1.join();
      }
    
      private void put(Integer e) throws InterruptedException {
        blockingQueue.put(e);
      }
    
      private Integer take() throws InterruptedException {
        return blockingQueue.take();
      }
    }
    
    

    各个BlockingQueue的Demo结果

    ArrayBlockingQueue

    @Test
    public void arrayBlockingQueueTest() throws InterruptedException {
      new LogDemoProducerAndConsumer(new ArrayBlockingQueue<>(8)).logDemoProducerAndConsumer();
    }
    

    执行结果如下, 出队的顺序跟入队的顺序有关.

    Producer (p1) put: (0).
    Producer (p1) put: (1).
    Producer (p1) put: (2).
    Producer (p1) put: (3).
    Producer (p1) put: (4).
    Producer (p1) put: (5).
    Consumer (c1) take: (0).
    Producer (p1) put: (6).
    Producer (p1) put: (7).
    Consumer (c1) take: (1).
    Producer (p1) put: (8).
    Producer (p1) put: (9).
    Producer (p1) put: (10).
    Consumer (c1) take: (2).
    Producer (p1) put: (11).
    Consumer (c1) take: (3).
    Consumer (c1) take: (4).
    Producer (p1) put: (12).
    Producer (p1) put: (13).
    Consumer (c1) take: (5).
    Consumer (c1) take: (6).
    Producer (p1) put: (14).
    Producer (p1) put: (15).
    Consumer (c1) take: (7).
    Consumer (c1) take: (8).
    Consumer (c1) take: (9).
    Consumer (c1) take: (10).
    Consumer (c1) take: (11).
    Consumer (c1) take: (12).
    Consumer (c1) take: (13).
    Consumer (c1) take: (14).
    Consumer (c1) take: (15).
    

    LinkedBlockingQueue

    @Test
    public void linkedBlockingQueueTest() throws InterruptedException {
      new LogDemoProducerAndConsumer(new LinkedBlockingQueue<>(8)).logDemoProducerAndConsumer();
    }
    

    执行结果如下, 出队的顺序跟入队的顺序有关.

    Producer (p1) put: (0).
    Producer (p1) put: (1).
    Producer (p1) put: (2).
    Producer (p1) put: (3).
    Producer (p1) put: (4).
    Consumer (c1) take: (0).
    Producer (p1) put: (5).
    Producer (p1) put: (6).
    Consumer (c1) take: (1).
    Producer (p1) put: (7).
    Producer (p1) put: (8).
    Consumer (c1) take: (2).
    Producer (p1) put: (9).
    Producer (p1) put: (10).
    Consumer (c1) take: (3).
    Producer (p1) put: (11).
    Consumer (c1) take: (4).
    Producer (p1) put: (12).
    Consumer (c1) take: (5).
    Producer (p1) put: (13).
    Consumer (c1) take: (6).
    Producer (p1) put: (14).
    Consumer (c1) take: (7).
    Producer (p1) put: (15).
    Consumer (c1) take: (8).
    Consumer (c1) take: (9).
    Consumer (c1) take: (10).
    Consumer (c1) take: (11).
    Consumer (c1) take: (12).
    Consumer (c1) take: (13).
    Consumer (c1) take: (14).
    Consumer (c1) take: (15).
    

    PriorityBlockingQueue

    @Test
    public void priorityBlockingQueueTest() throws InterruptedException {
      // 更改默认优先级,手动设置为: 数字越大, 优先级越大, 更先被消费
      new LogDemoProducerAndConsumer(new PriorityBlockingQueue<>(8, Comparator.reverseOrder()))
          .logDemoProducerAndConsumer();
    }
    

    执行结果如下, 可以看到, 在优先级队列PriorityBlockingQueue中, 出队的顺序跟优先级有关, 即通过compareTo方法比较.

    Producer (p1) put: (0).
    Producer (p1) put: (1).
    Producer (p1) put: (2).
    Producer (p1) put: (3).
    Producer (p1) put: (4).
    Producer (p1) put: (5).
    Producer (p1) put: (6).
    Producer (p1) put: (7).
    Consumer (c1) take: (7).
    Producer (p1) put: (8).
    Producer (p1) put: (9).
    Producer (p1) put: (10).
    Consumer (c1) take: (10).
    Producer (p1) put: (11).
    Producer (p1) put: (12).
    Consumer (c1) take: (12).
    Producer (p1) put: (13).
    Producer (p1) put: (14).
    Consumer (c1) take: (14).
    Producer (p1) put: (15).
    Consumer (c1) take: (15).
    Consumer (c1) take: (13).
    Consumer (c1) take: (11).
    Consumer (c1) take: (9).
    Consumer (c1) take: (8).
    Consumer (c1) take: (6).
    Consumer (c1) take: (5).
    Consumer (c1) take: (4).
    Consumer (c1) take: (3).
    Consumer (c1) take: (2).
    Consumer (c1) take: (1).
    Consumer (c1) take: (0).
    

    SynchronousQueue

    @Test
    public void synchronousBlockingQueueTest() throws InterruptedException {
      // 没有队列容量, 一个线程put, 必须另一个线程take之后, 才能进行put
      new LogDemoProducerAndConsumer(new SynchronousQueue<>()).logDemoProducerAndConsumer();
    }
    

    执行结果如下, 可以看到, 在SynchronousQueue中, 一个生产者线程put一个元素后, 必须要有另一个消费者线程take之后, 生产者才能继续put, 否则被阻塞.

    Producer (p1) put: (0).
    Consumer (c1) take: (0).
    Consumer (c1) take: (1).
    Producer (p1) put: (1).
    Consumer (c1) take: (2).
    Producer (p1) put: (2).
    Producer (p1) put: (3).
    Consumer (c1) take: (3).
    Consumer (c1) take: (4).
    Producer (p1) put: (4).
    Producer (p1) put: (5).
    Consumer (c1) take: (5).
    Consumer (c1) take: (6).
    Producer (p1) put: (6).
    Producer (p1) put: (7).
    Consumer (c1) take: (7).
    Consumer (c1) take: (8).
    Producer (p1) put: (8).
    Producer (p1) put: (9).
    Consumer (c1) take: (9).
    Producer (p1) put: (10).
    Consumer (c1) take: (10).
    Consumer (c1) take: (11).
    Producer (p1) put: (11).
    Consumer (c1) take: (12).
    Producer (p1) put: (12).
    Consumer (c1) take: (13).
    Producer (p1) put: (13).
    Producer (p1) put: (14).
    Consumer (c1) take: (14).
    Consumer (c1) take: (15).
    Producer (p1) put: (15).
    
  • 相关阅读:
    Hdu 5396 Expression (区间Dp)
    Lightoj 1174
    codeforces 570 D. Tree Requests (dfs)
    codeforces 570 E. Pig and Palindromes (DP)
    Hdu 5385 The path
    Hdu 5384 Danganronpa (AC自动机模板)
    Hdu 5372 Segment Game (树状数组)
    Hdu 5379 Mahjong tree (dfs + 组合数)
    Hdu 5371 Hotaru's problem (manacher+枚举)
    Face The Right Way---hdu3276(开关问题)
  • 原文地址:https://www.cnblogs.com/theRhyme/p/java_blocking_queue_brief.html
Copyright © 2011-2022 走看看