zoukankan      html  css  js  c++  java
  • 并发编程 —— ScheduledThreadPoolExecutor

    1. 前言

    在前面的文章中,我们介绍了定时任务类 Timer ,他是 JDK 1.3 中出现的,位于 java.util 包下。而今天说的 ScheduledThreadPoolExecutor 的是在 JUC 包下,是 JDK1.5 新增的。

    今天就来说说这个类。

    2. API 介绍

    该类内部结构和 Timer 还是有点类似的,也是 3 个类:

    • ScheduledThreadPoolExecutor:程序员使用的接口。
    • DelayedWorkQueue : 存储任务的队列。
    • ScheduledFutureTask : 执行任务的线程。

    构造方法介绍

    // 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
    ScheduledThreadPoolExecutor(int corePoolSize)  
    // 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
    ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)  
    // 使用给定的初始参数创建一个新 ScheduledThreadPoolExecutor。
    ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)  
    // 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
    ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)  
    

    ScheduledThreadPoolExecutor 最多支持 3 个参数:核心线程数量,线程工厂,拒绝策略。

    为什么没有最大线程数量?由于 ScheduledThreadPoolExecutor 内部是个无界队列,maximumPoolSize 也就没有意思了。

    再介绍一下他的 API 方法,请原谅我将 JDK 文档照抄过来了,就当是备忘吧,如下:

    protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) // 修改或替换用于执行 callable 的任务。
      
    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) // 修改或替换用于执行 runnable 的任务。        
        
    void execute(Runnable command) // 使用所要求的零延迟执行命令。  
    
    boolean	getContinueExistingPeriodicTasksAfterShutdownPolicy() // 获取有关在此执行程序已 shutdown 的情况下、是否继续执行现有定期任务的策略。      
       
    boolean	getExecuteExistingDelayedTasksAfterShutdownPolicy() // 获取有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。  
    
    BlockingQueue<Runnable>	getQueue() // 返回此执行程序使用的任务队列。     
    
    boolean	remove(Runnable task) // 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。     
      
    <V> ScheduledFuture<V>	schedule(Callable<V> callable, long delay, TimeUnit unit) // 创建并执行在给定延迟后启用的 ScheduledFuture。    
        
    ScheduledFuture<?>	schedule(Runnable command, long delay, TimeUnit unit) // 创建并执行在给定延迟后启用的一次性操作。  
         
    ScheduledFuture<?>	scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。 
         
    ScheduledFuture<?>	scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。  
       
    void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) // 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有定期任务的策略。  
       
    void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) // 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。  
          
    void shutdown() // 在以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务。
        
    List<Runnable>	shutdownNow() // 尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。  
         
    <T> Future<T> submit(Callable<T> task) //  提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。 
       
    Future<?> submit(Runnable task) //  提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。  
         
    <T> Future<T> submit(Runnable task, T result) //  提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
    

    最经常使用的几个方法如下:

    // 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
    ScheduledThreadPoolExecutor(int corePoolSize)  
    
    // 创建并执行在给定延迟后启用的一次性操作。  
    ScheduledFuture<?>	schedule(Runnable command, long delay, TimeUnit unit) 
      
    // 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。 
    ScheduledFuture<?>	scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 
    
    // 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。       
    ScheduledFuture<?>	scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 
    

    除了默认的构造方法,还有 3 个 schedule 方法。我们将分析他们内部的实现。

    3. 构造方法内部实现

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    

    我们感兴趣的就是这个 DelayedWorkQueue 队列了。他也是一个阻塞队列。这个队列的数据结构是堆。同时,这个 queue 也是可比较的,比较什么呢?任务必须实现 compareTo 方法,这个方法的比较逻辑是:比较任务的执行时间,如果任务的执行时间相同,则比较任务的加入时间。

    因此,ScheduledFutureTask 有 2 个变量:

    • time : 任务的执行时间。
    • sequenceNumber:任务的加入时间。

    这两个变量就是用来比较任务的执行顺序的。整个调度的顺序就是这个逻辑。

    4. 几个 schedule 方法的的区别

    刚刚说了,有 3 个 schedule 方法:

    1. ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
      创建并执行在给定延迟后启用的一次性操作。

    2. ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
      创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在initialDelay + 2 * period后执行,依此类推。

    3. ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
      创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

    第一个方法执行在给定的时间后,执行一次就结束。

    有点意思的地方是 第二个方法和 第三个方法,他们直接的区别。

    这两个方法都可以重复的调用。但是,重复调用的逻辑有所区别,这里就是比 Timer 好用的地方。

    他们的共同点在于:必须等待上个任务执行完毕才能执行下个任务。

    不同点在于:他们调度的时间粗略是不同的。

    scheduleAtFixedRate 方法的执行周期都是固定的,也就是,他是以上一个任务的开始执行时间作为起点,加上之后的 period 时间,调度下次任务。

    scheduleWithFixedDelay 方法则是以上一个任务的结束时间作为起点,加上之后的 period 时间,调度下次任务。

    有什么区别呢?

    如何任务执行时间很短,那就没上面区别。但是,如果任务执行时间很长,超过了 period 时间,那么区别就出来了。

    我们假设一下。

    我们设置 period 时间为 2 秒,而任务耗时 5 秒。

    这个两个方法的区别就体现出来了。

    scheduleAtFixedRate 方法将会在上一个任务结束完毕立刻执行,他和上一个任务的开始执行时间的间隔是 5 秒(因为必须等待上一个任务执行完毕)。

    scheduleWithFixedDelay 方法将会在上一个任务结束后,注意:再等待 2 秒,才开始执行,那么他和上一个任务的开始执行时间的间隔是 7 秒。

    所以,我们在使用 ScheduledThreadPoolExecutor 的过程中需要注意任务的执行时间不能超过间隔时间,如果超过了,最好使用 scheduleAtFixedRate 方法,防止任务堆积。

    当然,也和具体的业务有关。不能一概而论。但一定要注意这两个方法的区别。

    5. scheduled 方法的实现

    我们看看 scheduleAtFixedRate 方法的内部实现。

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
    

    创建一个 ScheduledFutureTask 对象,然后装饰一个这个 Future ,该类实现是直接返回,子类可以有自己的实现,在这个任务外装饰一层。

    然后执行 delayedExecute 方法,最后返回 Future

    这个 ScheduledFutureTask 实现了很多接口,比如 FutureRunnableComparable Delayed 等。

    ScheduledFutureTask 的构造方法如下:

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
    

    层层递进,该类首先通过一个原子静态 int 对象这只任务的入队编号,然后创建一个 Callable,这个 Callable 是一个适配器,适配了Runnable Callable,也就是将Runnable包装成 callabe, 他的 call 方法就是调用给定任务的 run 方法。当然,这里的 result 是没有什么作用的。

    如果你传递的是一个 callable ,那么,就调用 FutureTaskrun 方法,设置真正的返回值。

    这里使用了适配器模式,还是挺有趣的。

    总的来说,这个 ScheduledFutureTask 基于 FutureTask, 关于 FutureTask 我们之前从源码介绍过了。

    而他自己重写了几个方法:compareTogetDelayrunisPeriodic 4 个方法。

    我们还是要看看 delayedExecute 的实现。

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            // 添加进队列。
            super.getQueue().add(task);
            // 如果线程池关闭了,且不可以在当前状态下运行任务,且从队列删除任务成功,就给任务打上取消标记。
            // 第二个判断是由两个变量控制的(下面是默认值):
            // continueExistingPeriodicTasksAfterShutdown = false 表示关闭的时候应取消周期性任务。默认关闭
            // executeExistingDelayedTasksAfterShutdown = true。表示关闭的时候应取消非周期性的任务。默认不关闭。
            // running 状态下,canRunInCurrentRunState 必定返回 ture。
            // 非 running 状态下,canRunInCurrentRunState 根据上面的两个值返回。
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                // 开始执行任务
                ensurePrestart();
        }
    }
    

    说说上面的方法。

    1. 判断是否关闭,关闭则拒绝任务。
    2. 如果不是 关闭状态,则添加进队列,而添加队列的顺序我们之前讲过了,根据 ScheduledFutureTaskcompareTo 方法来的,先比较执行时间,再比较添加顺序。
    3. 如果这个过程中线程池关闭了,则判断此时是否应该取消任务,根据两个变量来的,注释里面写了。默认的策略是,如果是周期性的任务,就取消,反之不取消。
    4. 如果没有关闭线程池。就调用线程池里的线程执行任务。

    整体的过程如图:

    image.png

    注意上面的图,如果是周期性的任务,则会在执行完毕后,归还队列。

    从哪里可以看出来呢?

    ScheduledFutureTaskrun 方法:

    public void run() {
        // 是否是周期性任务
        boolean periodic = isPeriodic();
        // 如果不可以在当前状态下运行,就取消任务(将这个任务的状态设置为CANCELLED)。
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        // 如果不是周期性的任务,调用 FutureTask # run 方法
        else if (!periodic)
            ScheduledFutureTask.super.run();
        // 如果是周期性的。
        // 执行任务,但不设置返回值,成功后返回 true。(callable 不可以重复执行)
        else if (ScheduledFutureTask.super.runAndReset()) {
            // 设置下次执行时间
            setNextRunTime();
            // 再次将任务添加到队列中
            reExecutePeriodic(outerTask);
        }
    }
    

    逻辑如下:

    1. 如果不能再当前状态下运行了,就取消这个任务。
    2. 如果不是周期性的任务,就执行 FutureTaskrun 方法。
    3. 如果是周期性的任务,就需要执行 runAndReset 方法。
    4. 执行完毕后,重写设置当前任务的下次执行时间,然后添加进队列中。

    而管理整个执行过程的就是ScheduledThreadPoolExecutor的父类 ThreadPoolExecutor runWorker方法。其中,该方法会从队列中取出数据,也就是调用队列的 take 方法。

    关于 DelayedWorkQueuetake方法,其中有个 leader 变量,如果 leader 不是空,说明已经有线程在等待了,那就阻塞当前线程,如果是空,说明,队列的第一个元素已经被更新了,就设置当前线程为 leader.

    这是一个 Leader-Follower 模式,Doug Lea 说的。

    当然,take 方法整体的逻辑还是不变的。从队列的头部拿数据。使用 Condition 做线程之间的协调。

    5. 总结

    关于 ScheduledThreadPoolExecutor 调度类,我们分析的差不多了,总结一下。

    ScheduledThreadPoolExecutor 是个定时任务线程池,类似 Timer,但是比 Timer 强大,健壮。

    比如不会像 Timer 那样,任务异常了,整个调度系统就彻底无用了。

    也比 Timer 多了 Rate 模式(Rate 和 Delay)。

    这两种模式的区别就是任务执行的起点时间不同,Rate 是从上一个任务的开始执行时间开始计算;Delay 是从上一个任务的结束时间开始计算。

    因此,如果任务本身的时间超过了间隔时间,那么这两种模式的间隔时间将会不一致。

    而任务的排序是通过 ScheduledFutureTask compareTo 方法排序的,规则是先比较执行时间,如果时间相同,再比较加入时间。

    还要注意一点就是:如果任务执行过程中异常了,那么将不会再次重复执行。因为 ScheduledFutureTask run 方法没有做catch处理。所以程序员需要手动处理,相对于 Timer 异常就直接费了调度系统来说,要好很多。

    ScheduledThreadPoolExecutor 的是实现基于 ThreadPoolExecutor,大部分功能都是重用的父类,只是自己在执行完毕之后,重新设置时间,并再次将任务还到了队列中,形成了定时任务。

  • 相关阅读:
    搭建Git本地服务器
    shutdown,init,halt,poweroff,reboot的区别和联系, pkill -kill -t tty7注销
    RHEL/CentOS/Fedora常用的 CentOS 5/6/7 yum 源(EPEL、Remi、RPMForge、RPMFusion, ius,163,sohu,阿里云)配置
    国内的一些开源镜像站汇总,EPEL源
    EditPlus 配置 Java & C/CPP 开发环境
    Nginx+Keepalived 做负载均衡器
    监控Nginx负载均衡器脚本
    Heartbeat+DRBD+NFS 构建高可用的文件系统
    数据库索引的作用和长处缺点
    【ThinkPHP学习】ThinkPHP自己主动转义存储富文本编辑器内容导致读取出错
  • 原文地址:https://www.cnblogs.com/stateis0/p/9062075.html
Copyright © 2011-2022 走看看