BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
四组不同的行为方式解释:
- 抛异常:如果试图的操作无法立即执行,抛一个异常。
- 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
- 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
- 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false。
阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制。封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的await/signal操作了。
阻塞队列的主要实现类有以下几种:
1:ArrayBlockingQueue:ArrayBlockingQueue是一个有界的阻塞队列,其内部实现是一个数组。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(因为它是基于数组实现的,一旦初始化,大小就无法修改,不会自动扩容)。内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
2:DelayQueue:DelayQueue 对元素进行持有,直到一个特定的延迟到期。DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。元素必须实现 java.util.concurrent.Delayed 接口。
3:LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链表存储元素。可以选择一个长度上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
4:PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable接口。队列中元素的排序取决于你自己的 Comparable 实现。注意:如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。
5:SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳一个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。
6:LinkedBlockingDeque :链阻塞双端队列.它是对BlockingDeque双端队列接口的实现,线程在双端队列的两端都可以插入和提取元素。在它为空的时候,一个试图从中抽取数据的线程将会阻塞,无论该线程是试图从哪一端抽取数据。
BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景,最常见的就是用来实现生产者消费者模式:因为使用者不需显式指定什么时候阻塞进行生产或者消费,队列本身就提供了满或空时的阻塞功能。
public class BlockingQueueTest { //生产者线程 public static class Producer implements Runnable{ private final BlockingQueue<Integer> blockingQueue; private volatile boolean flag; private Random random; public Producer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; flag=false; random=new Random(); } public void run() { while(!flag){ int info=random.nextInt(100); try { blockingQueue.put(info);//生产对象并存入阻塞队列中 Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } public void shutDown(){ flag=true; } } //消费者线程 public static class Consumer implements Runnable{ private final BlockingQueue<Integer> blockingQueue; private volatile boolean flag; public Consumer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; } public void run() { while(!flag){ int info; try { info = blockingQueue.take();//从阻塞队列中消费产品 Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } public void shutDown(){ flag=true; } } public static void main(String[] args){ BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10); Producer producer=new Producer(blockingQueue); Consumer consumer=new Consumer(blockingQueue); //创建5个生产者,5个消费者,并启动生产和消费 for(int i=0;i<10;i++){ if(i<5){ new Thread(producer,"producer"+i).start(); }else{ new Thread(consumer,"consumer"+(i-5)).start(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } producer.shutDown();//手动停止生产 consumer.shutDown();//手动停止消费 } }