zoukankan      html  css  js  c++  java
  • Java队列学习笔记(2)---DelayQueue

    DelayQueue

    DelayQueue 的性质:

    • 它是一个线程安全的队列。
    • 包含 PriorityQueue 的性质。
    • 放入该队列的元素必须实现 Delayed 接口
    • 从该队列取出对象时,需要询问对象的执行延迟。即队头不为 null 条件还不充分,还需要剩余延迟 delay <= 0,对象才能正常出队。这点比较特殊。

    Delayed 接口

    Delayed 用于标记在给定延迟后应执行操作的对象。

    public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
    

    getDelay 方法的实现中并不需要阻塞,只需要返回给定时间单位中与此对象关联的剩余延迟。

    public interface ScheduledFuture<V> extends Delayed, Future<V> {}
    

    比如实现 ScheduledFuture 接口的对象就可以做 DelayQueue 的元素。

    他包含的关键成员变量

    // 可重入锁, 不参与序列化和反序列化
    private final transient  lock = new ReentrantLock();
    // 依赖优先级队列,在此基础上封装线程安全的逻辑
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    // 指定在队列头等待元素的线程。
    private Thread leader = null;
    // 条件锁:当一个较新的元素在队列头变得可用或一个新线程可能需要成为 leader 时发出的状态信号。
    private final Condition available = lock.newCondition();
    

    poll 立即出队

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 获取队首元素,但不从队列中移除
            E first = q.peek();
            // 条件一:first == null
            // 条件不成立:队首元素不为 null,队列中至少包含一个元素。进入 else 逻辑
            // 条件成立:队列中没有元素
            // 条件二:first.getDelay(NANOSECONDS) > 0
            // 条件成立:这是一个延迟执行的对象
            // 条件不成立:这不是一个延迟执行的对象
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                // 情况一:队列为空,且该队列中的元素不是延迟执行的对象
                // 情况二:队列不为空
                // 优先级队列的队首元素出队
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
    

    take 阻塞式出队

    public E take() throws InterruptedException {
    
    
        final ReentrantLock lock = this.lock;
        // 该锁可以响应打断
        lock.lockInterruptibly();
        try {
            // 自旋
            for (;;) {
                // 获取队头元素,但是不把它从队列中移走
                E first = q.peek();
                // 条件满足,队列为空
                if (first == null)
                    // 陷入条件阻塞
                    available.await();
                else {
                    // 进入该分支,表示队头元素不为null,队列中至少包含一个元素
                    // 获取的延迟执行时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 如果延迟执行时间 <= 0, 不用延迟,立即出队
                    if (delay <= 0)
                        return q.poll();
                    // 释放队首元素的引用
                    first = null; // don't retain ref while waiting
                    // leader 表示正在等待队首元素的线程
                    // 条件成立:已经有线程在等待队首元素了,当前线程条件阻塞
                    // 条件不成立:当前线程是目前第一个等待队首元素的线程
                    if (leader != null)
                        available.await();
                    else {
                        // 多个线程执行 take 方法时,仅有一个线程来到此 else 分支
                        // 因此只有一个线程进行超时等待,其他线程则会因为 leader != null 而永久等待
                        // 获取当前线程
                        Thread thisThread = Thread.currentThread();
                        // 将当前线程引用赋值给首个等待队首元素的线程 leader
                        leader = thisThread;
                        try {
                            // 当前线程阻塞最多 delay 的延迟时间,单位纳秒
                            available.awaitNanos(delay);
                        } finally {
                            // 新的队首元素加入到队列中,或者已经超过等待的最大时长 delay
                            // 条件成立:当前唤醒的线程就是 leader 线程,leader 线程置为 null
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 返回优先级队列的队首元素时,或者发生打断时,跳出自旋,执行此处方法
            // 条件一:leader == null
            // 条件成立:表示目前所有线程都在进行“无限”等待
            // 条件二:q.peek() != null
            // 条件成立:队列中队头元素不为 null,队列中至少包含一个元素
            if (leader == null && q.peek() != null)
                // 通知等待条件的线程来竞选 leader
                available.signal();
            lock.unlock();
        }
    }
    

    offer 元素加入队尾

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        // 队列发生变化的过程,需要加锁
        lock.lock();
        try {
            // 调用优先级队列的 offer 方法,插入 e 元素到队尾
            q.offer(e);
            // 条件成立:e 成为了新的队首元素
            if (q.peek() == e) {
                // 因为新加入的元素,导致原先的 leader 无效了,重置 leader 为 null
                // 通知等待的线程重新竞选 leader
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    

    poll(timeout, unit) 超时等待出队

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 队首元素出队
                E first = q.peek();
                // 条件成立:队首元素为 null
                // 条件不成立:队首元素不为 null
                if (first == null) {
                    // 条件成立:nanos <= 0, 即传入的参数 timeout <= 0
                    // 条件不成立:走 else 分支,超时时长 > 0
                    if (nanos <= 0)
                        return null;
                    else
                       // 最多等待 nanos 纳秒,nanos 返回值为剩余延迟等待秒数
                        nanos = available.awaitNanos(nanos);
                } else {
                    // 队首元素不为 null,但是也不能立即返回,还需要考虑对象的延迟执行特性
                    long delay = first.getDelay(NANOSECONDS);
                    // 条件成立:延迟等待时长<=0
                    if (delay <= 0)
                        // 此时,优先级队列的队首元素出队
                        return q.poll();
                    // 条件成立: nanos <= 0
                    // 情况一:调用方法时,传入的参数 timeout <= 0
                    // 情况二:因为第一轮自旋时,first == null,然后已经 awaitNanos 超时了,现在虽然代码已经运行到这一行
                    if (nanos <= 0)
                        // 已经没有额外的时长来等待对象的延迟执行,所以这里返回 null
                        return null;
                    // 释放 first 引用
                    first = null; // don't retain ref while waiting
                    // 条件一: nanos < delay 
                    // 条件不成立:允许再等待至少 delay 的时长,也不会超过传入的参数延迟
                    // 条件成立:允许等待的剩余延迟 nanos 小于对象执行的剩余延迟 delay
                    // 条件二:隐含条件 nanos >= delay
                    // 条件成立:等待队首元素的 leader 线程存在,当前线程直接进入超时等待
                    // 条件不成立:进入 else 分支
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        // 允许等待的剩余延迟 nanos 大于等于对象执行的剩余延迟 delay,且 leader 线程为 null
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            // delay - timeLeft 表示本次从睡眠到被唤醒所用的时间
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 返回优先级队列的队首元素(即调用 q.poll 时),或者发生线程打断时,跳出自旋,执行此处方法
            // 条件一:leader == null
            // 条件成立:表示目前所有线程都在进行“无限”等待
            // 条件二:q.peek() != null
            // 条件成立:队列中队头元素不为 null,队列中至少包含一个元素
            if (leader == null && q.peek() != null)
                // 通知等待条件的线程来竞选 leader
                available.signal();
            lock.unlock();
        }
    }
    
  • 相关阅读:
    multilabel-multiclass classifier
    关于zabbix _get返回Could not attach to pid的问题
    python导出环境依赖到req,txt文件中
    inode满的解决方法
    搞定面试官:咱们从头到尾再说一次 Java 垃圾回收
    SpringBoot项目,如何优雅的把接口参数中的空白值替换为null值?
    万万没想到,JVM内存区域的面试题也可以问的这么难?
    万万没想到,面试中,连 ClassLoader类加载器 也能问出这么多问题…..
    npm私服verdaccio报sha错误的解决方案
    配置SQL Server 2016无域AlwaysOn(转)
  • 原文地址:https://www.cnblogs.com/kendoziyu/p/java_code_analysis_of_DelayQueue.html
Copyright © 2011-2022 走看看