zoukankan      html  css  js  c++  java
  • DelayQueue

    延迟阻塞队列 DelayQueue

    延迟阻塞队列 DelayQueue

    DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

    使用场景

    因延迟阻塞队列的特性, 我们一般将 DelayQueue 作用于以下场景 :

    • 缓存系统 : 当能够从 DelayQueue 中获取元素时,说该缓存已过期
    • 定时任务调度 :

    下面我们以缓存系统的应用,看下 DelayQueue 的使用,代码如下:

    public class DelayQueueDemo {
    
        static class Cache implements Runnable {
    
            private boolean stop = false;
    
            private Map<String, String> itemMap = new HashMap<>();
    
            private DelayQueue<CacheItem> delayQueue = new DelayQueue<>();
    
            public Cache () {
                // 开启内部线程检测是否过期
                new Thread(this).start();
            }
    
            /**
             * 添加缓存
             *
             * @param key
             * @param value
             * @param exprieTime&emsp;过期时间,单位秒
             */
            public void put (String key, String value, long exprieTime) {
                CacheItem cacheItem = new CacheItem(key, exprieTime);
    
                // 此处忽略添加重复 key 的处理
                delayQueue.add(cacheItem);
                itemMap.put(key, value);
            }
    
            public String get (String key) {
                return itemMap.get(key);
            }
    
            public void shutdown () {
                stop = true;
            }
    
            @Override
            public void run() {
                while (!stop) {
                    CacheItem cacheItem = delayQueue.poll();
                    if (cacheItem != null) {
                        // 元素过期, 从缓存中移除
                        itemMap.remove(cacheItem.getKey());
                        System.out.println("key : " + cacheItem.getKey() + " 过期并移除");
                    }
                }
    
                System.out.println("cache stop");
            }
        }
    
        static class CacheItem implements Delayed {
    
            private String key;
    
            /**
             * 过期时间(单位秒)
             */
            private long exprieTime;
    
            private long currentTime;
    
            public CacheItem(String key, long exprieTime) {
                this.key = key;
                this.exprieTime = exprieTime;
                this.currentTime = System.currentTimeMillis();
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
                // 计算剩余的过期时间
                // 大于 0 说明未过期
                return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime);
            }
    
            @Override
            public int compareTo(Delayed o) {
                // 过期时间长的放置在队列尾部
                if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) {
                    return 1;
                }
                // 过期时间短的放置在队列头
                if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
                    return -1;
                }
    
                return 0;
            }
    
            public String getKey() {
                return key;
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            Cache cache = new Cache();
    
            // 添加缓存元素
            cache.put("a", "1", 5);
            cache.put("b", "2", 4);
            cache.put("c", "3", 3);
    
            while (true) {
                String a = cache.get("a");
                String b = cache.get("b");
                String c = cache.get("c");
    
                System.out.println("a : " + a + ", b : " + b + ", c : " + c);
    
                // 元素均过期后退出循环
                if (StringUtils.isEmpty(a) && StringUtils.isEmpty(b) && StringUtils.isEmpty(c)) {
                    break;
                }
    
                TimeUnit.MILLISECONDS.sleep(1000);
            }
    
            cache.shutdown();
        }
    }
    
    复制代码

    执行结果如下:

    
    a : 1, b : 2, c : 3
    a : 1, b : 2, c : 3
    a : 1, b : 2, c : 3
    key : c 过期并移除
    a : 1, b : 2, c : null
    key : b 过期并移除
    a : 1, b : null, c : null
    key : a 过期并移除
    a : null, b : null, c : null
    cache stop
    
    复制代码

    从执行结果可以看出,因循环内部每次停顿 1 秒,当等待 3 秒后,元素 c 过期并从缓存中清除,等待 4 秒后,元素 b 过期并从缓存中清除,等待 5 秒后,元素 a 过期并从缓存中清除。

    实现原理

    变量

    重入锁
    private final transient ReentrantLock lock = new ReentrantLock();
    复制代码

    用于保证队列操作的线程安全性

    优先队列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    复制代码

    存储介质,用于保证延迟低的优先执行

    leader

    leader 指向的是第一个从队列获取元素阻塞等待的线程,其作用是减少其他线程不必要的等待时间。(这个地方我一直没搞明白 怎么就减少其他线程的等待时间了)

    condition
    private final Condition available = lock.newCondition();
    复制代码

    条件对象,当新元素到达,或新线程可能需要成为leader时被通知

    下面将主要对队列的入队,出队动作进行分析 :

    入队 - offer
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 入队
                q.offer(e);
                if (q.peek() == e) {
                    // 若入队的元素位于队列头部,说明当前元素延迟最小
                    // 将 leader 置空
                    leader = null;
                    // 唤醒阻塞在等待队列的线程
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }
    复制代码
    出队 - poll
    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null)
                    	// 等待 add 唤醒
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                        	// 已过期则直接返回队列头节点
                            return q.poll();
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                        	// 若 leader 不为空
                        	// 说明已经有其他线程调用过 take 操作
                        	// 当前调用线程 follower 挂起等待
                            available.await();
                        else {
                        	// 若 leader 为空
                        	// 将 leader 指向当前线程
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                            	// 当前调用线程在指定 delay 时间内挂起等待
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    // leader 处理完之后,唤醒 follower
                    available.signal();
                lock.unlock();
            }
        }
    复制代码
    Leader-follower 模式

    该图引用自 CSDN 《Leader/Follower多线程网络模型介绍》

    摘自--https://juejin.im/post/5bf945b95188254e2a04329b

    小结

    看了 DelayQueue 的实现 我们大概也明白 PriorityQueue 采用小顶堆的原因了。

  • 相关阅读:
    TypeScript 第一讲 ———— 基本数据类型的使用
    关于TypeScript命名空间
    Egret 自定义皮肤 ———— 引入类中以及createChildren()和 childrenCreated()的使用
    egret基础——控件
    回顾过去,展望未来
    JDBC、Hibernate、Mybatis之间的区别
    SSM框架优缺点和spring boot 比起优缺点是什么?
    拦截器和过滤器的区别
    转发和重定向区别
    关于虚拟机中克隆的linux为什么不能开启网络服务
  • 原文地址:https://www.cnblogs.com/zhangfengshi/p/11943743.html
Copyright © 2011-2022 走看看