zoukankan      html  css  js  c++  java
  • 延时队列:Java中的DelayQueue

    Java中的DelayQueue位于java.util.concurrent包下,本质是由PriorityQueue和BlockingQueue实现的阻塞优先级队列。

    放入队列的元素需要实现java.util.concurrent包的Delayed接口:

    public interface Delayed extends Comparable<Delayed> {
    
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }

    通过实现这个接口,来完成对队列中元素,按照时间延迟先后排序的目的。

    从队列中取元素:

    看DelayedQueue的take()方法:

        /**
         * Retrieves and removes the head of this queue, waiting if necessary
         * until an element with an expired delay is available on this queue.
         *
         * @return the head of this queue
         * @throws InterruptedException {@inheritDoc}
         */
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return q.poll();
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        } 

    可以看到,在这段代码里,在第一个元素的延迟时间还没到的情况下:

    • 如果当前没有其他线程等待,则阻塞当前线程直到延迟时间。
    • 如果有其他线程在等待,则阻塞当前线程。

    向队列中放入元素:

        /**
         * Inserts the specified element into this delay queue.
         *
         * @param e the element to add
         * @return {@code true}
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }

    在放入元素的时候,会唤醒等待中的读线程。

    如果我们不考虑分布式运行和任务持久化的话,Java中的DelayQueue是一个很理想的方案,精巧好用。但是如果我们需要分布式运行和任务持久化,就需要引入一些外部组件。

  • 相关阅读:
    【源码学习之spark core 1.6.1 standalone模式下的作业提交】
    【源码学习之spark streaming 1.6.1 】
    spark udf 初识初用
    spark 累加历史 + 统计全部 + 行转列
    spark 都用了哪些开源东东
    kafka 官方示例代码--消费者
    104. 二叉树的最大深度
    237. 删除链表中的节点
    Leetcode-167. Two Sum II
    Leetcode-215. Kth Largest Element in an Array
  • 原文地址:https://www.cnblogs.com/duanxz/p/9585716.html
Copyright © 2011-2022 走看看