1,BlockingQueue
生产者-消费者模型即有界缓存模型,生产者线程在仓库装满之后被阻塞,消费者线程则在仓库清空后阻塞。
它包括三个基本部分:
1) 产品仓库,用于存放产品。
2) 生产者,生产出来的产品存入仓库。
3) 消费者,消费仓库里的产品。
java.util.concurrent.BlockingQueue,是一个阻塞队列接口,当BlockingQueue操作无法立即响应时,有四种处理方式:
1) 抛出异常
2) 返回特定的值,根据操作不同,可能是null或者false中的一个。
3) 无限期的阻塞当前线程,直到操作可以成功为止。
4) 根据阻塞超时设置来进行阻塞。
BlockingQueue的核心方法和未响应处理的对应形式如下:
|
抛出异常 |
返回特定值 |
无限阻塞 |
超时 |
插入 |
add(e) |
offer(e) |
put(e) |
offer(e,time,unit) |
移除 |
remove() |
poll() |
take() |
poll(time,unit) |
查询 |
element() |
peek() |
|
|
其中add、remove、elemnt三个方法具体实现都是调用offer、poll、peek三个方法,在BlockingQueue的各个实现类中,通过重写这几个方法类达到多线程安全的目的。
BlockingQueue有很多实现类:
2,ArrayBlockingQueue
ArrayBlockingQueue是基于数组实现的有界BlockingQueue,该队列满足先入先出的特性,它是一个典型的有界缓存,有一个固定大小的数组保存元素,一旦创建好了以后,容量就不能改变了。
1 //队列元素存储数组
2 final Object[] items;
3 //队头下标,下一次take/pool/peek/remove方法执行位置下标
4 int takeIndex;
5 //队尾下标,下一次put/offer/add方法执行下标
6 int putIndex;
7 //队列元素数量
8 int count;
9 //访问锁
10 final ReentrantLock lock;
11 //阻塞取值类型方法 take/poll/peek/remove的控制条件
12 private final Condition notEmpty;
13 //阻塞存值类型方法put/offer/add的控制条件
14 private final Condition notFul;
ArrayBlockingQueue提供add/offer/put三种方法都用于插入数据。
add(E)的实现体现在AbstractQueue中,通过调用offer(E)作为实现,如果offer(E)返回false,则抛出异常。
offer(E)方法用于入队,入队失败则返回false,反之返回true,
1 public boolean offer(E e) {
2 checkNotNull(e);
3 final ReentrantLock lock = this.lock;
4 lock.lock();
5 try {
6 //该队列用于判断队列是否已满,满队时返回false
7 if(count == items.length) {
8 return false;
9 } else {
10 enqueue(e);
11 return true;
12 }
13 } finally {
14 lock.unlock();
15 }
16 }
offer(E,long,TimeUnit)方法会通过反复入队来保证offer成功,除非线程中断。
1 public boolean offer(E e, long timeout, TimeUint unit) throws InterruptedException {
2 checkNotNull(e);
3 long nanos = unit.toNanos(timeout);
4 final ReentrantLock lock = this.lock;
5 lock.lockInterruptibly();
6 try {
7 //尝试入队,如果入队失败那么阻塞当前线程指定时长之后,再次尝试
8 while(count == items.length) {
9 if(nanos <= 0) {
10 return false;
11 }
12 nanos = notFull.awaitNanos(nanos);
13 }
14 enqueue(e);
15 return true;
16 }
17 finally {
18 lock.unlock();
19 }
20 }
put(E)方法用于入队,队满则等待notFull被唤醒,或者发起了中断
1 public void put(E e) throws InterruptedException {
2 checkNotNull(e);
3 final ReentrantLock lock = this.lock;
4 //当前线程为终端的情况下获取锁
5 lock.lockInterruptibly();
6 try {
7 //队列已满时,阻塞当前线程,直到可以插入值
8 while(count == items.length) {
9 notFull.await();
10 }
11 enqueue(e);
12 } finally {
13 lock.unlock();
14 }
15 }
add/offer/put方法特点
1) 这三个方法使用了重入锁,都是线程安全的
2) offer方法只会尝试入队一次,入队失败则返回false
3) add方法入队失败则抛出异常
4) put方法在未中断的情况下,会一直尝试入队,如果被中断则抛出中断异常,那么需要有使用者自行处理,notFull对象监视器会在出队时唤醒。
enqueue(E x)方法
该方法执行了真正的入队,源码实现很简单,主要思路是把x添加到队尾,然后唤醒notEmpty对象监视器。
1 private void enqueue(E x) {
2 final Object[] items = this.items;
3 items[putIndex] = x;
4 //putIndex达到数组上限的时候,归零,这说明这是个循环队列
5 if(++putIndex == item.length) {
6 putIndex = 0
7 }
8 count++;
9 notEmpty.signal();
10 }
add/offer/put方法都会调用enqueue方法,而唤醒notEmpty对象监视器的作用在于,通知可以被notEmpty阻塞的方法poll/take,以中断阻塞。
remove(Object o)方法用于移除指定元素,而poll、take则从队列取元素。
1 public boolean remove(Object o) {
2 if(o == null) return false;
3 final Object[] items = this.items;
4 final ReentrantLock lock = this.lock;
5 lock.lock();
6 try {
7 if(count > 0) {
8 final int putIndex = this.putIndex;
9 int i = takeIndex;
10 do {
11 //循环比较对象是否一致,取得对应下标
12 if(o.equals(item[i])) {
13 //移除指定下标位置的对象
14 removeAt(i);
15 return true;
16 }
17 if(++i == items.length) {
18 i = 0;
19 }
20 } while (i != putIndex);
21 }
22 return false;
23 }
24 finally {
25 lock.unlock();
26 }
27 }
removeAt(int)方法的逻辑并不复杂,实现思路如下:
1) 如果需要被移除的index处于队尾,那么直接移除队尾元素,不移动其他元素
2) 反之,则移除指定index后,把所有元素前移一位。
3) 唤醒notFull对象监视器
take方法用于去除队头元素,如果队列为空,那么它会等待notEmpty被唤醒,或者发起中断。
1 public E take() throws InterruptedException {
2 final ReentrantLock lock = this.lock;
3 lock.lockInterruptibly();
4 try {
5 //容器没有数据时,使用notmpty对象监视器阻塞当前线程
6 while(count == 0) {
7 notEmpty.await();
8 }
9 return dequeue();
10 } finally {
11 lock.unlock();
12 }
13 }
poll方法用于去除队头元素,如果队列为空,那么返回null。
1 public E poll() {
2 final ReentrantLock lock = this.lock;
3 lock.lock();
4 try {
5 //容器没有数据时,返回null
6 return (count == 0) ? null : dequeue();
7 } finally {
8 lock.unlock();
9 }
10 }
remove、poll、take特点:
这三个方法都是现成安全的。
1) remove方法可以移除任意对象,需要遍历比对对象来确定下标位置,并且可能需要移动大量数据位置,效率较低。
2) removeAt方法可以移除指定下标的元素,比之remove少了比对过程,但它也需要移动大量数据位置,效率稍微好一点。
3) poll和take只能移除队头元素,效率极高。
dequeue方法,它的逻辑很简单
1) 移除容器里的指定对象
2) 迭代器执行elementDequed来保证一致性
3) 唤醒notFull对象监视器
1 private E dequeue() {
2 final Object[] items = this.itmes;
3 @SuppresswArnings("unchecked")
4 E x = (E)item.[takeIndex];
5 items[takeIndex] = null;
6 if(++takeIndex == items.length) {
7 takeIndex = 0;
8 }
9 count--;
10 it(itrs != null) {
11 itrs.elementDequeued();
12 }
13 notFull.signal();
14 return x;
15 }
peek()方法用于查看队头元素,代码略。
总结,ArrayBlockingQueue特点:
1) 使用数组进行存储
2) enqueue()和dequeue()方法是入队和出队的核心方法,他们分别通知”队列非空”和”队列非满”,从而使阻塞中的入队和出队方法能够继续执行,以实现生产者消费者模式。
3) 插入只能从队尾开始,移除可以是任意位置,但是移除队头以外的元素效率很低。
4) ArrayBlockingQueue是个循环队列