zoukankan      html  css  js  c++  java
  • 调度线程池原理ScheduledThreadPoolExecutor

    概述

    数据结构

      任务队列

        任务队列使用的自定义的队列 DelayedWorkQueue,是一个基于二叉堆的数据结构。提供了扩容、插入、弹出等基本操作。

        其中,堆顶元素时待执行时间最早的元素。

        

    /**
    * 获取任务对象
    */
    public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0];//从数组首部获取最近的待执行任务。 if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0)// 判断剩余时间是否小于0, >0继续等待剩余时间。 return finishPoll(first);// 堆弹出操作 first = null; // don't retain ref while waiting if (leader != null)// 只有获得leader节点的线程才会等待delay时间再去尝试获取任务对象,避免其它线程不必要的等待 available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal();// 当leader节点获取可执行任务后,唤醒其它等待线程 lock.unlock(); } }

      

      任务对象:ScheduledFutureTask

        提交给线程池的任务被封装成ScheduledFutureTask类型,该类实现了Deleyed对象,用于任务执行时间的控制。

        

        private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
            /** Sequence number to break ties FIFO */ //用于指定任务添加的序号,当执行时间相同时,控制执行顺序
            private final long sequenceNumber;
            /** The time the task is enabled to execute in nanoTime units */
            private long time;// 通过具体的待执行时间点转换成的时间戳形式。(Date -> long)
    
            /**
             * Period in nanoseconds for repeating tasks.  A positive
             * value indicates fixed-rate execution.  A negative value
             * indicates fixed-delay execution.  A value of 0 indicates a
             * non-repeating task.
             */
            private final long period;//时间的执行周期
    
            /** The actual task to be re-enqueued by reExecutePeriodic */
            RunnableScheduledFuture<V> outerTask = this; //下一个待执行的任务
    
            /**
             * Index into delay queue, to support faster cancellation.
             */
            int heapIndex; //在堆中的索引,用于支持快速取消
    
            public long getDelay(TimeUnit unit) {// 计算还有多少时间去执行任务
                return unit.convert(time - now(), NANOSECONDS);
            }
            public int compareTo(Delayed other) {// 用于将任务保存到队列中时,判断任务的保存顺序,越早执行的,放在前面,实现一样的,比较序号值。
               ......
            }
    
            /**
             * Returns {@code true} if this is a periodic (not a one-shot) action.
             *
             * @return {@code true} if periodic
             */
            public boolean isPeriodic() {
                return period != 0;
            }
    
            /**
             * Sets the next time to run for a periodic task.
             */
            private void setNextRunTime() {
                long p = period;
                if (p > 0)
                    time += p;
                else
                    time = triggerTime(-p);
            }
    
            public boolean cancel(boolean mayInterruptIfRunning) {
                boolean cancelled = super.cancel(mayInterruptIfRunning);
                if (cancelled && removeOnCancel && heapIndex >= 0)
                    remove(this);
                return cancelled;
            }
    
            /**
             * Overrides FutureTask version so as to reset/requeue if periodic.
             */
            public void run() { //实际运行
                boolean periodic = isPeriodic();// 判断是否周期任务
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                else if (!periodic)
                    ScheduledFutureTask.super.run();// 非周期运行一次
                else if (ScheduledFutureTask.super.runAndReset()) {
                    setNextRunTime();// 更新time值,确定下次自行时间
                    reExecutePeriodic(outerTask);// 将任务保存到任务队列中
                }
            }
        }
    

      关于触发时间的计算

        /**
         * Returns the trigger time of a delayed action.
         */
        private long triggerTime(long delay, TimeUnit unit) {
            return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
        }
    
        /**
         * Returns the trigger time of a delayed action.
         */
        long triggerTime(long delay) {// 根据延期时间,加上当前时间戳的值,赋值给任务对象的time属性。
            return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }
    

      

  • 相关阅读:
    2013-9-29 通信原理学习笔记
    《大数据时代》阅读笔记
    《人人都是产品经理》阅读笔记一
    2013-8-13 信道接入技术研究学习
    2013-8-6 ubuntu基本操作
    2013-7-30 802.1X企业级加密
    2013-7-29 杂记
    2013-7-28 802.11n帧聚合
    2013-7-27 802.1X学习
    vue+node+mongoDB前后端分离个人博客(入门向)
  • 原文地址:https://www.cnblogs.com/hhan/p/10820423.html
Copyright © 2011-2022 走看看