DelayQueue,带有延迟元素的线程安全队列,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间)。DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。
领导者/追随者模式是多个工作线程轮流获得事件源集合,轮流监听、分发并处理事件的一种模式。在任意时间点,程序都仅有一个领导者线程,它负责监听IO事件。而其他线程都是追随者,它们休眠在线程池中等待成为新的领导者。当前的领导者如果检测到IO事件,首先要从线程池中推选出新的领导者线程,然后处理IO事件。此时,新的领导者等待新的IO事件,而原来的领导者则处理IO事件,二者实现了并发。
简单理解,就是最多只有一个线程在处理,其他线程在睡眠。在DelayQueue的实现中,Leader/Followers模式用于等待队首的第一个元素。
类定义及参数:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { /** 重入锁,实现线程安全 */ private final transient ReentrantLock lock = new ReentrantLock(); /** 使用优先队列实现 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** Leader/Followers模式 */ private Thread leader = null; /** 条件对象,当新元素到达,或新线程可能需要成为leader时被通知 */ private final Condition available = lock.newCondition();
构造函数:
/** * 默认构造,得到空的延迟队列 */ public DelayQueue() {} /** * 构造延迟队列,初始包含c中的元素 * * @param c 初始包含的元素集合 * @throws NullPointerException 当集合或集合任一元素为空时抛出空指针错误 */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
add方法:
/** * 向延迟队列插入元素 * * @param e 要插入的元素 * @return true * @throws NullPointerException 元素为空,抛出空指针错误 */ public boolean add(E e) { // 直接调用offer并返回 return offer(e); }
offer方法:
/** * 向延迟队列插入元素 * * @param e 要插入的元素 * @return true * @throws NullPointerException 元素为空,抛出空指针错误 */ public boolean offer(E e) { final ReentrantLock lock = this.lock; // 获得锁 lock.lock(); try { // 向优先队列插入元素 q.offer(e); // 若在此之前队列为空,则置空leader,并通知条件对象,需要结合take方法看 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { // 释放锁 lock.unlock(); } }
put方法:
/** * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞。 * * @param e 要插入的元素 * @throws NullPointerException 元素为空,抛出空指针错误 */ public void put(E e) { offer(e); }
带超时的offer方法:
/** * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞,因此,直接调用offer方法并返回 * * @param e 要插入的元素 * @param timeout 不会阻塞,忽略 * @param unit 不会阻塞,忽略 * @return true * @throws NullPointerException 元素为空,抛出空指针错误 */ public boolean offer(E e, long timeout, TimeUnit unit) { // 直接调用offer方法并返回 return offer(e); }
poll方法:
/** * 获取并移除队首的元素, 或者返回null(如果队列不包含到达延迟时间的元素) * * @return 队首的元素, 或者null(如果队列不包含到达延迟时间的元素) */ public E poll() { final ReentrantLock lock = this.lock; // 获得锁 lock.lock(); try { // 获取优先队列队首元素 E first = q.peek(); // 若优先队列队首元素为空,或者还没达到延迟时间,返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; // 否则,返回并移除队首元素 else return q.poll(); } finally { // 释放锁 lock.unlock(); } }
take方法(重要):
/** * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素 * * @return 队首元素 * @throws InterruptedException 阻塞时被打断,抛出打断异常 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获得锁,该锁可被打断 lock.lockInterruptibly(); try { // 循环处理 for (;;) { // 获取队首元素 E first = q.peek(); // 若元素为空,等待条件,在offer方法中会调用条件对象的通知方法 // 并重新进入循环 if (first == null) available.await(); // 若元素不为空 else { // 获取延迟时间 long delay = first.getDelay(NANOSECONDS); // 若达到延迟时间,返回并移除队首元素 if (delay <= 0) return q.poll(); // 否则,需要进入等待 first = null; // 在等待时,不持有引用 // 若leader不为空,等待条件 if (leader != null) available.await(); // 否则,设置leader为当前线程,并超时等待延迟时间 else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { // 通知其他线程条件得到满足 if (leader == null && q.peek() != null) available.signal(); // 释放锁 lock.unlock(); } }
带超时的poll方法(重要):
/** * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素或超时 * * @return 队首元素,或者null * @throws InterruptedException 阻塞等待时被打断,抛出打断异常*/ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
peek方法:
/** * 获取但不移除队首元素,或返回null(如果队列为空)。和poll方法不同, * 若队列不为空,该方法换回队首元素,不论是否达到延迟时间 * * @return 队首元素,或null(如果队列为空) */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } }
出处:
https://www.cnblogs.com/enumhack/p/7472873.html
https://www.cnblogs.com/wanly3643/p/3944661.html
jdk源码