阻塞队列
- **==ArrayBlockingQueue==**是一个基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行排序
- **==LinkedBlockingQueue==**是一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue
- **==SynchronousQueue==**是一个不存储元素的阻塞队列,灭个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于
1、队列和阻塞队列
-
首先是一个队列,而一个阻塞队列再数据结构中所起的作用大致如下图
graph LR Thread1-- put -->id1["阻塞队列"] subgraph BlockingQueue id1 end id1-- Take -->Thread2 蛋糕师父--"放(柜满阻塞)"-->id2[蛋糕展示柜] subgraph 柜 id2 end id2--"取(柜空阻塞)"-->顾客
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
当阻塞队列是空是,从队列中==获取==元素的操作会被阻塞
当阻塞队列是满时,从队列中==添加==元素的操作会被阻塞
试图从空的阻塞队列中获取元素的线程将会被阻塞,知道其他的线程网空的队列插入新的元素。
试图网已满的阻塞队列中添加新元素的线程同样会被阻塞,知道其他的线程从列中移除一个或者多个元素或者完全清空队列后使队列重新变得空闲起来并后续新增
2、为什么用?有什么好处?
-
在多线程领域:所谓阻塞,在某些情况下会挂起线程,一旦满足条件,被挂起的线程又会自动被唤醒
-
为什么需要BlockingQueue
好处时我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了
在concurrent包发布以前,在多线程环境下,==我们每个程序员都必须自己控制这些细节,尤其还要兼顾效率和线程安全==,而这回给我们程序带来不小的复杂度
3、BlockingQueue的核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
方法类型 | status |
---|---|
抛出异常 | 当阻塞队列满时,再往队列中add会抛IllegalStateException: Queue full 当阻塞队列空时,在网队列里remove会抛 NoSuchElementException |
特殊值 | 插入方法,成功true失败false 移除方法,成功返回出队列的元素,队列里没有就返回null |
一直阻塞 | 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞线程知道put数据或响应中断退出 当阻塞队列空时,消费者线程试图从队列take元素,队列会一直阻塞消费者线程知道队列可用。 |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
4、架构梳理+种类分析
-
种类分析
- ==ArrayBlockingQueue==:由数据结构组成的有界阻塞队列。
- ==LinkedBlockingQueue==:由链表结构组成的有界(但大小默认值为
Integer.MAX_VALUE
)阻塞队列。 - PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
- ==SychronousQueue==:不存储元素的阻塞队列,也即单个元素的队列。
- LinkedTransferQueue:由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:由历览表结构组成的双向阻塞队列。
-
SychronousQueue
-
理论:SynchronousQueue没有容量,与其他BlockingQueue不同,SychronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
-
代码示例
package com.jian8.juc.queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * ArrayBlockingQueue是一个基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行排序 * LinkedBlockingQueue是一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue * SynchronousQueue是一个不存储元素的阻塞队列,灭个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 * 1.队列 * 2.阻塞队列 * 2.1 阻塞队列有没有好的一面 * 2.2 不得不阻塞,你如何管理 */ public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName() + " put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName() + " put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }, "AAA").start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "BBB").start(); } }
-
5、用在哪里
-
生产者消费者模式
-
传统版
package com.jian8.juc.queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,来5轮 * 1. 线程 操作 资源类 * 2. 判断 干活 通知 * 3. 防止虚假唤起机制 */ public class ProdConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); for (int i = 1; i <= 5; i++) { new Thread(() -> { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } }, "ProductorA " + i).start(); } for (int i = 1; i <= 5; i++) { new Thread(() -> { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } }, "ConsumerA " + i).start(); } for (int i = 1; i <= 5; i++) { new Thread(() -> { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } }, "ProductorB " + i).start(); } for (int i = 1; i <= 5; i++) { new Thread(() -> { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } }, "ConsumerB " + i).start(); } } } class ShareData {//资源类 private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception { lock.lock(); try { //1.判断 while (number != 0) { //等待不能生产 condition.await(); } //2.干活 number++; System.out.println(Thread.currentThread().getName() + " " + number); //3.通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() throws Exception { lock.lock(); try { //1.判断 while (number == 0) { //等待不能消费 condition.await(); } //2.消费 number--; System.out.println(Thread.currentThread().getName() + " " + number); //3.通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
-
阻塞队列版
package com.jian8.juc.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ProdConsumer_BlockQueueDemo { public static void main(String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 生产线程启动"); try { myResource.myProd(); } catch (Exception e) { e.printStackTrace(); } }, "Prod").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 消费线程启动"); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } }, "Consumer").start(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("5s后main叫停,线程结束"); try { myResource.stop(); } catch (Exception e) { e.printStackTrace(); } } } class MyResource { private volatile boolean flag = true;//默认开启,进行生产+消费 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProd() throws Exception { String data = null; boolean retValue; while (flag) { data = atomicInteger.incrementAndGet() + ""; retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (retValue) { System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "失败"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + " 大老板叫停了,flag=false,生产结束"); } public void myConsumer() throws Exception { String result = null; while (flag) { result = blockingQueue.poll(2, TimeUnit.SECONDS); if (null == result || result.equalsIgnoreCase("")) { flag = false; System.out.println(Thread.currentThread().getName() + " 超过2s没有取到蛋糕,消费退出"); System.out.println(); return; } System.out.println(
-