zoukankan      html  css  js  c++  java
  • DelayQueue 源码分析

    DelayQueue

    DelayQueue 是基于 PriorityQueue 实现的线程安全的无界优先级阻塞队列,
    队列的头部元素必须在超时后才能移除,元素必须实现 Delayed 接口。
    

    创建实例

        // 控制访问的互斥锁
        private final transient ReentrantLock lock = new ReentrantLock();
        // 持有元素的优先级队列
        private final PriorityQueue<E> q = new PriorityQueue<>();
    
        /**
         *  在延迟队列头部超时阻塞等待的线程,当有 leader 时,其他线程将无限期等待下去。
         *  leader 线程读取到元素之后,必须唤醒其他等待的线程。
         */
        private Thread leader;
    
        /**
         *  当队列为空或有 leader 在阻塞等待时,当前线程将在该条件上阻塞等待。
         */
        private final Condition available = lock.newCondition();
    
        /**
         *  创建一个空的延迟队列
         */
        public DelayQueue() {}
    

    读取元素

        /**
         *  移除并获取延时队列的头部元素,如果没有元素超时,则阻塞等待
         */
        @Override
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    // 读取队列头部元素
                    E first = q.peek();
                    // 1)如果队列为空
                    if (first == null) {
                        // 当前线程在 available 条件上阻塞等待
                        available.await();
                    } else {
                        // 获取元素的超时时间
                        final long delay = first.getDelay(NANOSECONDS);
                        // 如果元素已经超时
                        if (delay <= 0L) {
                            // 则移除并返回头部元素
                            return q.poll();
                        }
                        first = null; // don't retain ref while waiting
                        // 1)如果已经有线程在阻塞等待
                        if (leader != null) {
                            // 当前线程在 available 条件上阻塞等待
                            available.await();
                        } else {
                            // 2)当前线程是第一个阻塞等待的线程
                            final Thread thisThread = Thread.currentThread();
                            // 写入 leader
                            leader = thisThread;
                            try {
                                // 阻塞等待指定的超时时间
                                available.awaitNanos(delay);
                            } finally {
                                // 线程被激活后尝试移除 leader
                                if (leader == thisThread) {
                                    leader = null;
                                }
                            }
                        }
                    }
                }
            } finally {
                // 如果没有 leader 并且队列中有元素存在 
                if (leader == null && q.peek() != null) {
                    // 唤醒其他阻塞等待的线程来读取元素
                    available.signal();
                }
                lock.unlock();
            }
        }
    

    添加元素

        /**
         *  将目标元素 e 插入到延时队列中,由于是无界的,该操作不会被阻塞
         */
        @Override
        public void put(E e) {
            offer(e);
        }
    
        @Override
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 插入元素
                q.offer(e);
                // 如果当前元素是优先级最高的元素
                if (q.peek() == e) {
                    // 清除 leader
                    leader = null;
                    // 唤醒在可用条件上等待的线程
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }
    
  • 相关阅读:
    JMeter
    MeasureSpec介绍及使用详解
    AS中一些不经常用到的快捷键
    gradle 构建工具,与Ant Maven关系
    关于runOnUiThread()与Handler两种更新UI的方法
    关于new Handler()与new Handler(Looper.getMainLooper())区别
    RTSP协议详解
    overridePendingTransition的简介
    Android获取手机分辨率DisplayMetircs类
    RTSP消息交互过程
  • 原文地址:https://www.cnblogs.com/zhuxudong/p/10085246.html
Copyright © 2011-2022 走看看