zoukankan      html  css  js  c++  java
  • java 中的阻塞队列及生产者-消费者中的简单应用

    一、什么是阻塞队列

           阻塞队列,顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如图所示:

    image

           当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
            当阻塞队列是满时,往队列中添加元素的操作将会被阻塞。
            同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他线程从队列中移除一个或者多个元素或者全清空队列后,使队列重新变得空闲起来并后续新增。

    二、阻塞队列有什么好处

           在多线程领域:所谓阻塞,在某些情况下会挂起线程(即线程阻塞),一旦条件满足,被挂起的线程又会被自动唤醒。
            使用阻塞队列的好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为BlockingQueue 都一手包办好了。
           在 concurrent 包发布以前,在多线程环境下,每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

    三、java 中的阻塞队列

    3.1 架构介绍

    BlockingQueue

    3.2 种类分析

    1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
    2. LinkedBlockingQueue:由链表结构组成的有界(但大小默认值是Integer.MAX_VALUE)阻塞队列。
    3. PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
    4. DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
    5. SynchronousQueue:不存储元素的阻塞队列,也即是单个元素的队列。与其他BlcokingQueue不同,synchronousQueue 是一个不存储元素的 BlcokingQueue。每个 put 操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。)
    6. LinkedTransferQueue:由链表结构组成的无界阻塞队列。
    7. LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

    3.3 BlockingQueue 的核心方法

    image

    抛出异常

    当阻塞队列满时,再往队列里面add插入元素会抛出 IllegalStateException:Queue full
    当阻塞队列空时,再往队列 Remove 元素时候会抛出 NoSuchElementException

    特殊值

    插入方法,成功返回true,失败返回 false
    移除方法,成功返回元素,队列里面没有就返回 null

    一直阻塞

    当阻塞队列满时,生产者继续往队列里面 put 元素,队列会一直阻塞,直到put数据或者响应中断退出。
    当阻塞队列空时,消费者试图从队列 take 元素,队列会一直阻塞,消费者线程直到队列可用取出元素。

    超时退出

    当阻塞队列满时,队列会阻塞生产者线程一定时间,超过后限时后生产者线程就会退出。

    四、生产者-消费者问题

    4.1 传统的使用 await() 和 signal()

      1 class ShareData {
      2     private int number = 0;
      3     private Lock lock = new ReentrantLock();
      4     private Condition condition = lock.newCondition();
      5 
      6     public void increment() {
      7         lock.lock();
      8         try {
      9             while (number != 0) {
     10                 condition.await();
     11             }
     12             number++;
     13             System.out.println(Thread.currentThread().getName() + " " + number);
     14             condition.signalAll();
     15         } catch (InterruptedException e) {
     16             e.printStackTrace();
     17         } finally {
     18             lock.unlock();
     19         }
     20     }
     21 
     22     public void decrement() {
     23         lock.lock();
     24         try {
     25             while (number == 0) {
     26                 condition.await();
     27             }
     28             number--;
     29             System.out.println(Thread.currentThread().getName() + " " + number);
     30             condition.signalAll();
     31         } catch (InterruptedException e) {
     32             e.printStackTrace();
     33         } finally {
     34             lock.unlock();
     35         }
     36     }
     37 }
     38 
     39 public class ProdConsumer_traditionDemo {
     40 
     41     public static void main(String[] args) {
     42         ShareData shareData = new ShareData();
     43 
     44         new Thread(() -> {
     45             for (int i = 1; i <= 5; i++) {
     46                 shareData.increment();
     47             }
     48         }, "Thread-A").start();
     49 
     50         new Thread(() -> {
     51             for (int i = 1; i <= 5; i++) {
     52                 shareData.decrement();
     53             }
     54         }, "Thread-B").start();
     55     }
     56 }

    image

    4.2 使用阻塞队列

      1 class MyResource {
      2     private volatile boolean flag = true;
      3     private AtomicInteger atomicInteger = new AtomicInteger();
      4 
      5     private BlockingQueue<String> blockingQueue = null;
      6 
      7     public MyResource(BlockingQueue<String> blockingQueue) {
      8         this.blockingQueue = blockingQueue;
      9     }
     10 
     11     public void producer() throws InterruptedException {
     12         String data = null;
     13         boolean resFlag;
     14 
     15         while (flag) {
     16             data = atomicInteger.incrementAndGet() + "";
     17             resFlag = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
     18             if (resFlag) {
     19                 System.out.println(Thread.currentThread().getName() + " 生产队列, " + data + " 插入队列成功");
     20             } else {
     21                 System.out.println(Thread.currentThread().getName() + " 生产队列, " + data + " 插入队列失败");
     22             }
     23             TimeUnit.SECONDS.sleep(1);
     24         }
     25 
     26         System.out.println(Thread.currentThread().getName() + "flag = false, 生产动作结束");
     27     }
     28 
     29     public void consumer() throws InterruptedException {
     30         String result = null;
     31 
     32         while (flag) {
     33             result = blockingQueue.poll(2, TimeUnit.SECONDS);
     34 
     35             if (result == null || result.equalsIgnoreCase("")) {
     36                 flag = false;
     37                 System.out.println(Thread.currentThread().getName() + " 超过2秒没有取到,停止消费");
     38                 return;
     39             }
     40 
     41             System.out.println(Thread.currentThread().getName() + " 消费队列:" + result);
     42         }
     43     }
     44 
     45     public void stop() {
     46         this.flag = false;
     47     }
     48 }
     49 
     50 public class ProdConsumer_BlockQueueDemo {
     51 
     52     public static void main(String[] args) {
     53         MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
     54 
     55         new Thread(() -> {
     56             System.out.println("生产线程启动...");
     57             try {
     58                 myResource.producer();
     59             } catch (InterruptedException e) {
     60                 e.printStackTrace();
     61             }
     62         }, "producer").start();
     63 
     64         new Thread(() -> {
     65             System.out.println("消费线程启动...");
     66             try {
     67                 myResource.consumer();
     68             } catch (InterruptedException e) {
     69                 e.printStackTrace();
     70             }
     71         }, "consumer").start();
     72 
     73         try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
     74 
     75         myResource.stop();
     76         System.out.println(Thread.currentThread().getName() + " main 线程终止生产-消费");
     77     }
     78 }
    image
  • 相关阅读:
    Silverlight Toolkit ListBoxDragDropTarget学习笔记
    函数指针和指针函数(转)
    面试题_反转链表
    C++中的异或运算符^
    面试题_旋转字符串
    面试题_寻找丑数
    模拟一个简单的基于tcp的远程关机程序
    管理指针成员
    赫夫曼树编码问题
    堆的基本操作
  • 原文地址:https://www.cnblogs.com/lveyHang/p/11912712.html
Copyright © 2011-2022 走看看