在团队的重试与延迟调度项目中,使用了基于DelayQueue的方式来实现时间调度和触发,此处,对DelayQueue进行一些梳理。
首先是Queue接口,关于队列的含义,不再赘述。如下,Queue接口方法,按抛异常或返回特定值(null)可以分为两类如下图:

接下来是BlockingQueue接口,其继承了Queue接口,方法如下:

注意:BlockingQueue不接受null值,其所有实现类对于add null操作,都会抛出NullPointerException。null只作为poll、peek等操作的失败值。在Java官方文档中,即说明了BlockingQueue及其实现类主要用来适用生产者-消费者模型,将其作为普通的集合使用性能可能会存在问题。同时,其所有实现类都是线程安全的,内部方法通过各种锁变成了原子性操作。以下是官方文档给出的基于BlockingQueue的生产者-消费者示例:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
接下来,开始聊聊DelayQueue
先说说其指定的队列元素:interface Delayed的实现类
Delayed有两个接口方法:
long getDelay(TimeUnit unit); //返回还剩余的延迟时间 public int compareTo(T o); //Delayed接口继承了Comparable,用来排序
接下来,就是delayQueue的具体实现了,首先值得注意的是其内部持有这么几个东西:
ReentrantLock lock = new ReentrantLock(); // 可重入锁 Condition available = ReentrantLock.newCondition(); // 靠其来发信号唤醒等着拿数据的线程 private Thread leader = null; // leader-follower模式,用来最小化等待队头元素时间的策略 PriorityQueue<E> q = new PriorityQueue<E>(); // 优先队列,可以知道Delayed.compareTo()必然用在此处
然后看看其两个核心方法,put与take:
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
可以看出,在向delayQueue添加元素时,主要调用PriorityQueue.offer()向内部的优先队列添加元素。在优先队列中添加元素时,会按照事先的比较方法,找到元素的合适位置并插入。同时,看到PriorityQueue中是使用Obeject[]来存储数据的,因此可以推测在大量的向delayQueue添加随机而无序的元素时,可能会遇到性能问题。
再一个有点意思的就是在offer到PriorityQueue之后,会检查一下自己是不是队头,要是自己是队头的话,作废老的leader(因为老的leader是目标取走前一个队头的线程),再让接下来最先来取走队头的线程成为leader(结合take方法看,下一次最先来的线程,才是取走当前队头的线程)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await(); // 1
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0) // 2
return q.poll();
else if (leader != null) // 3
available.await();
else { // 4
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
来分析下上面的代码,
1. 当first == null,说明队列为空,当前线程使自己沉睡,并交出ReentrantLock锁,等待Condition.signal()来唤醒自己
2. 当队头元素到达指定的延迟时间了,直接取出返回
3. 当leader != null,说明前面的leader还在等呢,那么当前线程肯定是follower,那当前线程就只能无奈的等着Condition发信号了
4. 当leader = null时,说明当前线程是第一个要取走队头的线程,那它就理所当然成为leader,后边再来拿队头的,都是follower,都得排在后边等着。而这时,当前线程又清楚的知道队头元素的延迟时间啥时候到期,那么它根本就不用等Condition发信号,自己睡到延迟时间到期,再醒来肯定就能直接拿走队头了。可以看出,通过引入leader-follower能够降低线程的等待时间。
5. 再一个稍微注意点的就是无限for循环了,当被condition.singal()唤醒之后,正好进行下一个循环,接着取