zoukankan      html  css  js  c++  java
  • 调度线程池原理ScheduledThreadPoolExecutor

    概述

    数据结构

      任务队列

        任务队列使用的自定义的队列 DelayedWorkQueue,是一个基于二叉堆的数据结构。提供了扩容、插入、弹出等基本操作。

        其中,堆顶元素时待执行时间最早的元素。

        

    /**
    * 获取任务对象
    */
    public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0];//从数组首部获取最近的待执行任务。 if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0)// 判断剩余时间是否小于0, >0继续等待剩余时间。 return finishPoll(first);// 堆弹出操作 first = null; // don't retain ref while waiting if (leader != null)// 只有获得leader节点的线程才会等待delay时间再去尝试获取任务对象,避免其它线程不必要的等待 available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal();// 当leader节点获取可执行任务后,唤醒其它等待线程 lock.unlock(); } }

      

      任务对象:ScheduledFutureTask

        提交给线程池的任务被封装成ScheduledFutureTask类型,该类实现了Deleyed对象,用于任务执行时间的控制。

        

        private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
            /** Sequence number to break ties FIFO */ //用于指定任务添加的序号,当执行时间相同时,控制执行顺序
            private final long sequenceNumber;
            /** The time the task is enabled to execute in nanoTime units */
            private long time;// 通过具体的待执行时间点转换成的时间戳形式。(Date -> long)
    
            /**
             * Period in nanoseconds for repeating tasks.  A positive
             * value indicates fixed-rate execution.  A negative value
             * indicates fixed-delay execution.  A value of 0 indicates a
             * non-repeating task.
             */
            private final long period;//时间的执行周期
    
            /** The actual task to be re-enqueued by reExecutePeriodic */
            RunnableScheduledFuture<V> outerTask = this; //下一个待执行的任务
    
            /**
             * Index into delay queue, to support faster cancellation.
             */
            int heapIndex; //在堆中的索引,用于支持快速取消
    
            public long getDelay(TimeUnit unit) {// 计算还有多少时间去执行任务
                return unit.convert(time - now(), NANOSECONDS);
            }
            public int compareTo(Delayed other) {// 用于将任务保存到队列中时,判断任务的保存顺序,越早执行的,放在前面,实现一样的,比较序号值。
               ......
            }
    
            /**
             * Returns {@code true} if this is a periodic (not a one-shot) action.
             *
             * @return {@code true} if periodic
             */
            public boolean isPeriodic() {
                return period != 0;
            }
    
            /**
             * Sets the next time to run for a periodic task.
             */
            private void setNextRunTime() {
                long p = period;
                if (p > 0)
                    time += p;
                else
                    time = triggerTime(-p);
            }
    
            public boolean cancel(boolean mayInterruptIfRunning) {
                boolean cancelled = super.cancel(mayInterruptIfRunning);
                if (cancelled && removeOnCancel && heapIndex >= 0)
                    remove(this);
                return cancelled;
            }
    
            /**
             * Overrides FutureTask version so as to reset/requeue if periodic.
             */
            public void run() { //实际运行
                boolean periodic = isPeriodic();// 判断是否周期任务
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                else if (!periodic)
                    ScheduledFutureTask.super.run();// 非周期运行一次
                else if (ScheduledFutureTask.super.runAndReset()) {
                    setNextRunTime();// 更新time值,确定下次自行时间
                    reExecutePeriodic(outerTask);// 将任务保存到任务队列中
                }
            }
        }
    

      关于触发时间的计算

        /**
         * Returns the trigger time of a delayed action.
         */
        private long triggerTime(long delay, TimeUnit unit) {
            return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
        }
    
        /**
         * Returns the trigger time of a delayed action.
         */
        long triggerTime(long delay) {// 根据延期时间,加上当前时间戳的值,赋值给任务对象的time属性。
            return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }
    

      

  • 相关阅读:
    ok~加油!
    解析window.open链接的参数
    Arrya数组添加过滤条件
    Oracle 查询今天、昨日、本周、本月和本季度的所有记录
    Sql Server日期查询-SQL查询今天、昨天、7天内、30天
    Lua 中 pairs 和 ipairs 的区别
    关于SignalR连接数量问题的记录
    IceStorm示例运行步骤
    从 OPC 到 OPC UA
    SQL Server 2008 R2 Express Profiler
  • 原文地址:https://www.cnblogs.com/hhan/p/10820423.html
Copyright © 2011-2022 走看看