Java中的DelayQueue位于java.util.concurrent包下,本质是由PriorityQueue和BlockingQueue实现的阻塞优先级队列。
放入队列的元素需要实现java.util.concurrent包的Delayed接口:
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
通过实现这个接口,来完成对队列中元素,按照时间延迟先后排序的目的。
从队列中取元素:
看DelayedQueue的take()方法:
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
可以看到,在这段代码里,在第一个元素的延迟时间还没到的情况下:
- 如果当前没有其他线程等待,则阻塞当前线程直到延迟时间。
- 如果有其他线程在等待,则阻塞当前线程。
向队列中放入元素:
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
在放入元素的时候,会唤醒等待中的读线程。
如果我们不考虑分布式运行和任务持久化的话,Java中的DelayQueue是一个很理想的方案,精巧好用。但是如果我们需要分布式运行和任务持久化,就需要引入一些外部组件。