zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

    ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口。

    public class ScheduledThreadPoolExecutor
            extends ThreadPoolExecutor
            implements ScheduledExecutorService

    ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行。

    和分析ThreadPoolExecutor时一样,首先来看核心方法execute:

        public void execute(Runnable command) {
            schedule(command, 0, TimeUnit.NANOSECONDS);
        }

    execute方法调用了另外一个方法schedule,同时我们发现三个submit方法也是同样调用了schedule方法,因为有两种类型的任务:Callable和Runnable,因此schedule也有两个重载方法。

        public ScheduledFuture<?> schedule(Runnable command,
                                           long delay,
                                           TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> t = decorateTask(command,
                new ScheduledFutureTask<Void>(command, null,
                                              triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
    
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay,
                                               TimeUnit unit) {
            if (callable == null || unit == null)
                throw new NullPointerException();
            RunnableScheduledFuture<V> t = decorateTask(callable,
                new ScheduledFutureTask<V>(callable,
                                           triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }

    两个方法逻辑基本一致,都是把任务包装成RunnableScheduledFuture对象,然后调用delayedExecute来实现延迟执行。任务包装类继承自ThreadPoolExecutor的包装类RunnableFuture,同时实现ScheduledFuture接口使包装类具有了延迟执行和重复执行这些功能以匹配ScheduledThreadPoolExecutor。

    因此首先来看ScheduledFutureTask,以下是ScheduledFutureTask专有的几个变量:

    private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
            /** 针对线程池所有任务的序列号 */
            private final long sequenceNumber;
    
            /** 距离任务开始执行的时间,纳秒为单位 */
            private long time;
    
            /**
             * 重复执行任务的间隔,即每隔多少时间执行一次任务
             */
            private final long period;
    
            /** 重复执行任务和排队时用这个类型的对象, */
            RunnableScheduledFuture<V> outerTask = this;
    
            /**
             * 在延迟队列的索引,这样取消任务时使用索引会加快查找速度
             */
            int heapIndex;

    来看核心方法run:

            public void run() {
                boolean periodic = isPeriodic();
                // 检测是否可以运行任务,这里涉及到另外两个变量:continueExistingPeriodicTasksAfterShutdown
                // 和executeExistingDelayedTasksAfterShutdown
                // 前者允许在shutdown之后继续执行重复执行的任务
                // 后者允许在shutdown之后继续执行延时执行的任务,
                // 因此这里根据任务是否为periodic来决定采用哪个选项,然后
                // 如果线程池正在运行,那么肯定可以执行
                // 如果正在shutdown,那么要看选项的值是否为true来决定是否允许执行任务
                // 如果不被允许的话,就会取消任务
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                // 如果可以执行任务,对于不用重复执行的任务,直接执行即可
                else if (!periodic)
                    ScheduledFutureTask.super.run();
                // 对于需要重复执行的任务,则执行一次,然后reset
                // 更新一下下次执行的时间,调用reExecutePeriodic更新任务在执行队列的
                // 位置(其实就是添加到队列的末尾)
                else if (ScheduledFutureTask.super.runAndReset()) {
                    setNextRunTime();
                    reExecutePeriodic(outerTask);
                }
            }

    因此这里可以得出关于重复执行的实现:任务执行一次,Reset状态,重新加入到任务队列。

    回到delayedExecute,它可以保证任务在准确时间点执行,来看delayedExecute是如果实现延迟执行的:

        private void delayedExecute(RunnableScheduledFuture<?> task) {
            if (isShutdown())
                reject(task);
            else {
                super.getQueue().add(task);
                if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                    task.cancel(false);
                else
                    ensurePrestart();
            }
        }

    乍看之下,发现也就是把任务加入到任务队列中,那么这个延时执行的功能是如何实现的,秘密就在任务队列的实现。

        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }

    ScheduledThreadPoolExecutor的任务队列不是普通的BlockingQueue,而是一个特殊的实现DelayedWorkQueue。下一篇文章就来说说这个DelayedWorkQueue

  • 相关阅读:
    jQuery源码解析(架构与依赖模块)第二章 核心模块
    jQuery源码解析(架构与依赖模块)第一章 理解架构
    js中运算符的优先级
    JS将时间与时间戳互转
    关于JavaScript scope的一切
    java中继承以及其他相关内容
    java中数组的内容
    关于java中的引用数据类型
    关于java的源文件结构以及常用的包
    Java语言的基础内容
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3926720.html
Copyright © 2011-2022 走看看