简介
当阻塞队列是空时,从队列中获取元素的操作会被阻塞,直到其他的线程往空的队列中插入新的元素。
当阻塞队列是满时,从队列中添加元素的操作会被阻塞,直到其他线程从队列中移除一个或多个元素使得队列变得空闲器来后继续新增。
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
在多线程领域:所谓阻塞,在某些情况下会挂起线程(阻塞),一旦条件满足,被挂起的线程又会被自动唤醒。
BlockingQueue所有的实现:
ArrayBlockingQueue:由数组结构组成的有界阻塞队列
LinkedBlockingQueue:由链表组成的有界阻塞队列(大小默认Integer.MAX_VALUE)
SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
PriorityBlockingQueue:支持优先级排序的无界阻塞队列
DelayQueue:使用优先级队列实现的延迟无界阻塞队列
LinkedTransferQueue:由链表结构组成的无界阻塞队列
LinkedBlockingDeque:由链表结构组成的双向阻塞队列
常用方法
插入
add(e):返回true,添加失败时抛出异常
offer(e):返回true/false,方法特别之处用于添加失败时只返回false
put(e):void,当阻塞队列满时,生产者如果往队列里put元素,则队列会一直阻塞生产者线程,直到队列可用或者响应中断退出
offer(e,time,unit):返回true/false,当阻塞队列满时,生产者如果往队列里面插入元素,队列会阻塞生产者线程一段时间,如果超过了指定时间,生产者线程会退出,并返回false
删除
remove():返回true,移除失败时抛出异常
poll():返回移除元素/null,移除失败时返回null
take():返回移除元素,当阻塞队列为空时,消费者线程如果从队列里面移除元素,则队列会一直阻塞消费者线程,直到队列不为空
poll(time,unit):返回移除元素/null,当阻塞队列空时,消费者如果从队列里面删除元素,则队列会一直阻塞消费者线程,如果超过了指定时间,消费者线程会退出,并返回null
获取
element():返回头部的元素,如果队列为空,则抛出异常,否则返回头部元素
peek():返回移除元素/null,如果队列为空,返回特殊值null,否则返回头部的元素。
其他方法
drainTo(Collection<? super E> c) 从该队列中删除所有可用的元素,并将它们添加到给定的集合中。
drainTo(Collection<? super E> c, int maxElements) 最多从该队列中删除给定数量的可用元素,并将它们添加到给定的集合中。
remainingCapacity() 返回队列剩余容量
SynchronousQueue
-
SynchronousQueue没有容量,与其他的阻塞队列不同,SynchronousQueue是一个不存储元素的阻塞队列,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
-
适合传递性场景。
-
性能高于ArrayBlockingQueue和LinkedBlockingQueue。
BlockingQueue<String> queue = new SynchronousQueue<>();
new Thread(()->{
try {
queue.put("1");
queue.put("2");
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
new Thread(()->{
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
创建线程池时,参数runnableTaskQueue(任务队列),用于保存等待执行的任务的阻塞队列可以选择SynchronousQueue。静态工厂方法Executors.newCachedThreadPool()使用了这个队列。
链表阻塞LinkedBlockingQueue类
一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1
- LinkedBlockingQueue具有单链表和有界阻塞队列的功能。
- 队列慢时插入操作被阻塞,队列空时,移除操作被阻塞。
- 默认和最大长度为Integer.MAX_VALUE,相当于无界(值非常大:2^31-1)。
LinkedBlockingQueue的应用场景
吞吐量通常要高于ArrayBlockingQueue。创建线程池时,参数runnableTaskQueue(任务队列),用于保存等待执行的任务的阻塞队列可以选择LinkedBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
双向阻塞LinkedBlockingDeque
- 由链LinkedBlockingDeque = 阻塞队列+链表+双端访问
- 线程安全。
- 多线程同时入队时,因多了一端访问入口,所以减少了一半的竞争。
- 默认容量大小为Integer.MAX_VALUE。可指定容量大小。
LinkedBlockingDeque可以用在“工作窃取“模式中。
工作窃取算法:某个线程比较空闲,从其他线程的工作队列中的队尾窃取任务来帮忙执行。
延时阻塞DelayQueue
DelayQueue = Delayed + BlockingQueue。队列中的元素必须实现Delayed接口。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
在创建元素时,可以指定多久可以从队列中获取到当前元素。只有在延时期满才能从队列中获取到当前元素。
应用场景
- 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期。然后用一个线程循环的查询DelayQueue队列,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
- 定时任务调度:使用DelayQueue队列保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。比如Java中的TimerQueue就是使用DelayQueue实现的。
线程通信-生产者消费者
传统版
使用ReentrantLock和Condition
资源类:
class ShareData{
private int number = 0 ;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
while (number !=0){
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName()+" number : "+ number);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (number ==0){
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+" number : "+ number);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
测试方法:
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(()->{
for (int i = 0; i < 5; i++) {
shareData.increment();
}
},"t1").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
shareData.decrement();
}
},"t2").start();
}
阻塞队列实现
资源类
class MyResource{
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
}
public void myProd() throws Exception{
String data = null;
boolean retValue;
while (FLAG){
data = atomicInteger.incrementAndGet()+"";
blockingQueue.put(data);
System.out.println(Thread.currentThread().getName()+"插入队列成功");
TimeUnit.MILLISECONDS.sleep(1000);
}
System.out.println(Thread.currentThread().getName()+"生产退出");
}
public void myConsumer() throws Exception{
while (FLAG){
String poll = blockingQueue.take();
System.out.println(Thread.currentThread().getName()+"消费队列成功");
}
System.out.println(Thread.currentThread().getName()+"消费退出");
}
public void stop(){
System.out.println("main叫停");
this.FLAG = false;
}
}
测试类:
public static void main(String[] args) throws InterruptedException {
MyResource myResource = new MyResource(new SynchronousQueue<>());
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"生产线程启动");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
},"t1").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
},"t2").start();
TimeUnit.SECONDS.sleep(5);
myResource.stop();
}