一 BlockingQueue
是java.util.concurrent包提供的用于解决并发生产者-消费者最有用的类,它的特性是在任意时刻只有一个线程可以进行take或put操作,并且blockingqueue提供了超时return null的机制,在很多生产场景里可以看到这个工具的身影.
1. 队列类型
无限队列 -- 几乎可以无限增长
有限队列 -- 定义了最大容量
2. 队列数据结构
-
- 通常用链表或者数组实现
- 一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
- 主要操作: 入队(EnQueue)与出队(Dequeue)
3. 常见的4种阻塞队列
ArrayBlockingQueue : 队列基于数组实现,大小在创建ArrayBlockingQueue时已经定义好.
应用场景 : 线程池中有比较多的应用,生产者消费者场景
LinkedBlockingQueue : 是一个基于链表的无界队列, 使用无限BlockingQueue设计生产者-消费者模型最重要的是,消费者应该能够像生产者向队列添加消息一样快的消费消息,否则,内存可能会填满.
DelayQueue : 由优先级堆支持的,基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组的扩容实现.
工作原理 : 队列内部会根据时间优先级进行排序.
4. BlockingQueue API
add() : 插入成功返回true,否则抛出 IllegalStateException 异常
put() : 将指定的元素插入队列, 如果队列满了,会阻塞,直到有空间插入
offer() : 如果插入成功返回true , 否则返回false
offer(E e, long timeout, TimeUnit unit ) : 等同于put
take() : 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
poll(long timeout, TimeUnit unit) : 检索并删除队列的头部, 等待指定的时间以使元素可用,如果超时,则返回null
5. 多线程实现生产者消费者
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadLocalRandom; public class NumbersProducer implements Runnable { private BlockingQueue<Integer> numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); System.out.println("潘金莲-{" + Thread.currentThread().getId() + "}号,给武大郎的泡药!"); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put(poisonPill); System.out.println("潘金莲-{" + Thread.currentThread().getId() + "}号,往武大郎的药里放入第{" + (j + 1) + "}颗毒丸!"); } } } import java.util.concurrent.BlockingQueue; public class NumbersConsumer implements Runnable { // 阻塞队列 private BlockingQueue<Integer> queue; private final int poisonPill; public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); if (number.equals(poisonPill)) { System.out.println("武大郎-{" + Thread.currentThread().getId() + "}号GG,喝药-编号:{" + number + "}"); return; } System.out.println("武大郎-{" + Thread.currentThread().getId() + "}号,喝药-编号:{" + number + "}"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class Main { public static void main(String[] args) { // 阻塞队列最大值 int BOUND = 10; // 16个生产者线程 int N_PRODUCERS = 16; // 获取虚拟机可用的处理器数量 int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); // 获取最大值 int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND); //潘金莲给武大郎熬药 for (int i = 1; i < N_PRODUCERS; i++) { new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } //武大郎开始喝药 for (int j = 0; j < N_CONSUMERS; j++) { new Thread(new NumbersConsumer(queue, poisonPill)).start(); } //潘金莲开始投毒,武大郎喝完毒药GG new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start(); } }