zoukankan      html  css  js  c++  java
  • JUC源码分析-线程池篇(三)Timer

    JUC源码分析-线程池篇(三)Timer

    Timer 是 java.util 包提供的一个定时任务调度器,在主线程之外起一个单独的线程执行指定的计划任务,可以指定执行一次或者反复执行多次。

    1. Timer 类结构

    Timer类结构

    Timer 由 TimerThread,TaskQueue ,TimerTask 组成。 Timer 初始化时会构建一个 TimerThread 线程用于执行调度任务。TaskQueue 保存所有调度的定时任务,基于二叉树小顶堆时间,第一个要执行的任务永远位于 queue[1],其插入和查找时间复杂度都是 log2n。

    1.1 TimerTask 生命周期

    int state = VIRGIN;		// 任务状态
    
    static final int VIRGIN = 0;		// 初始化
    static final int SCHEDULED   = 1;	// 任务已经调度,还未执行
    static final int EXECUTED    = 2;	// 任务已经执行
    static final int CANCELLED   = 3;	// 任务取消
    

    VIRGIN 表示 Task 刚刚被创建,SCHEDULED 表示 Task 已经被加入 TaskQueue中,等待调度,EXECUTED 表示 Task 已经被执行,CANCELLED 表示 Task 已经被取消。

    TimerTask 还有两个重要属性:

    • nextExecutionTime 表示任务下一次的执行时间;
    • period 表示任务调度的周期。0 表示只执行一次,1 表示 scheduleAtFixedRate 周期性定时任务,-1 表示 schedule 周期性定时任务。

    1.2 Timer 定时调度函数

    1. schedule(TimerTask task, Date time) 绝对时间执行一次。
    2. schedule(TimerTask task, long delay) 相对时间执行一次。
    3. schedule(TimerTask task, long delay, long period) 相对时间,每隔 period 重复执行。schedule 下次执行时间为当前时间 + period,很明显如果出现任务执行时间过长,会出现任务丢失,但不会造成线程阻塞。
    4. scheduleAtFixedRate(TimerTask task, long delay, long period) 相对时间,每隔 period 重复执行。scheduleAtFixedRate 下次执行时间为 task.nextExecutionTime + period,也就是上一次执行结束的时间,此时如果出现任务执行时间过长,任务不会丢失,但会造成线程阻塞。

    前两个就不再解释了,schedule 和 scheduleAtFixedRate 这两个方法都是任务调度方法,他们之间区别是:schedule 会丢弃延迟的任务,不会造成线程阻塞,scheduleAtFixedRate 严格按 period 执行,不会丢弃任务,但都延迟执行了,造成了严重的阻塞。 Timer 内部是单线程执行,所以 Timer 调度的任务都不应该有阻塞。

    举个例子:每 5s 调度一次,那么正常就是 0,5,10,15,20s,而任务调度耗时 10s。schedule 会变成 0,10,20s 执行,原本 5 次任务最后只执行了 3 次,不会阻塞,而 scheduleAtFixedRate 就会变成 0,10,20,30,40s,任务数量没有减少,但都延迟执行了,造成了严重的阻塞。

    2. Timer 源码分析

    2.1 TaskQueue

    顾名思义,TaskQueue 就是用来保存 TimerTask 的队列。需要注意的是,TaskQueue 的内部实现使用的是最小堆,堆顶的 Task 是最近即将到时间的 Task,所以在调度任务时,每次只需要取出堆顶元素,判断时间是否已到即可,效率非常高。下面是 TaskQueue 的核心代码,其实就是最小堆的实现代码:

    (1) 椎顶永远都是最先执行的定时任务

    // 小顶椎椎顶永远都是最先执行的定时任务。但为什么是 1 而不从开始呢?以后有机会再研究下
    TimerTask getMin() {
        return queue[1];
    }
    
    // 定时调度后重新设置小顶堆
    void rescheduleMin(long newTime) {
        queue[1].nextExecutionTime = newTime;
        fixDown(1);
    }
    
    // 每取走一个椎顶的定时任务,都需要重新将 queue[1]
    void removeMin() {
        queue[1] = queue[size];
        queue[size--] = null;  // Drop extra reference to prevent memory leak
        fixDown(1);	// 重新设置堆顶,保证堆顶的任务一定是最先执行的
    }
    

    (2) 添加定时任务 add

    // 添加定时任务
    void add(TimerTask task) {
        // Grow backing store if necessary
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);
    
        queue[++size] = task;
        fixUp(size);	// 重新设置堆顶,保证堆顶的任务一定是最先执行的
    }
    

    可以看到无论是添加还是删除都会重新设置堆顶的定时任务,保证椎顶永远都是最先执行的定时任务。

    (3) 小顶堆数据结构

    TaskQueue 数据结构的操作都是 fixUp 和 fixDown,查找和添加的时间复杂度都是 log2n。

    // 元素上浮,只要比父节点小就交换位置
    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }
    
    // 元素下浮,只要比左右子节点大就交换位置。因为子节点有两个,所以比较了两次
    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size &&
                queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }
    
    // 将一个普通的数组设置为小顶堆
    void heapify() {
        for (int i = size/2; i >= 1; i--)
            fixDown(i);
    }
    

    2.2 TimerThread

    同样地,顾名思义,TimerThread 就是用来调度 TaskQueue 中的任务的线程。毫无疑问,TimerThread 肯定每次都会调度 queue.getMin() 的任务(堆顶),如果定时任务取消或已执行后就需要重新设置堆顶任务,保证堆顶永远是最先执行的定时任务。核心逻辑如下:

    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // 1. 调用 cancel 方法后 newTasksMayBeScheduled=false。
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die
    
                    // 2. 小顶堆永远是最先执行的定时任务
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                    	// 3. 任务取消,重新设置堆顶任务
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                    	// 4. taskFired=true 表示任务已经执行时间到了,重新设置下一次的执行时间
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                            	// 5. 任务需要重新排序
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period	// schedule
                                                : executionTime + task.period);	// scheduleAtFixedRate
                            }
                        }
                    }
                    // 6. 还未到执行时间,限时等待。正常情况下不会被唤醒,除了①取消定时器,②添加新的定时任务
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                // 7. 任务执行时间到了,开始执行
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }
    

    总结:TimerThread 会起一个 while 循环,从 queue 中取出当前最近即将到时间的 Task,然后判断 Task 的执行时间是否已经到了,如果还没到,则计算目标调度时间和当前时间的差值 delta,继续 wait delta 毫秒。wait 时间到之后会结束本次循环,在下一次循环中,如果没有新的更早的 task 加入,则当前的 task 将会被执行,同时设置下一次的执行时间。

    2.3 Timer

    从上面的介绍可知,TimerThread 的调度核心是起一个 while 循环,不断检查是否有 task 需要执行,其中两次调用了 queue.wait() 方法。那在哪些情况下 queue.notify() 方法会被调用呢?

    2.3.1 任务调度

    // schedule 会丢弃延迟的任务。下面的例子中 5 个任务会实际只会执行 3 个
    // 比如任务原本该 0,5,10,15,20s 执行,但任务执行实际耗时 10s,实际执行时间 0,10,20
    public void schedule(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis()+delay, -period);
    }
    // scheduleAtFixedRate 则不会丢弃延迟的任务,同样的例了任务都不会丢弃,但所有的任务都是延迟执行的。
    // 比如任务原本该 0,5,10,15,20s 执行,但任务执行实际耗时 6s,实际执行时间 0,10,20,30,40
    public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis()+delay, period);
    }
    

    schedule 和 scheduleAtFixedRate 都是调用 sched 完成任务入队的。

    private void sched(TimerTask task, long time, long period) {
        if (time < 0)
            throw new IllegalArgumentException("Illegal execution time.");
    
        // Constrain value of period sufficiently to prevent numeric
        // overflow while still being effectively infinitely large.
        if (Math.abs(period) > (Long.MAX_VALUE >> 1))
            period >>= 1;
        // 定时任务入队
        synchronized(queue) {
            if (!thread.newTasksMayBeScheduled)
                throw new IllegalStateException("Timer already cancelled.");
    
            synchronized(task.lock) {
                if (task.state != TimerTask.VIRGIN)
                    throw new IllegalStateException(
                        "Task already scheduled or cancelled");
                task.nextExecutionTime = time;
                task.period = period;
                task.state = TimerTask.SCHEDULED;
            }
            // 添加定时任务,并唤醒线程
            queue.add(task);
            if (queue.getMin() == task)
                queue.notify();
        }
    }
    

    为什么需要 queue.getMin() == task 时才调用 notify 方法呢?

    因为只有新加入的 task 是所有 Task 中要被最早执行的 task 时,才会需要打断 TimeThread 的等待状态。

    2.3.2 取消任务 cancel

    // 设置 newTasksMayBeScheduled=false 并清空任务后唤醒线程
    public void cancel() {
        synchronized(queue) {
            thread.newTasksMayBeScheduled = false;
            queue.clear();
            queue.notify();  // In case queue was already empty.
        }
    }
    

    该方法会把队列清空,并且把 newTasksMayBeScheduled 标志设置为 false,这个时候如果不调用 queue.notify(),在 queue 本来就已经 empty 的情况下,TimerThread 的 mainloop 就会陷入死等待:

    2.3.3 取消任务 cancel

    是否上面两种情况调用 notify 就已经足够了?当 queue 为空,并且没人调用 add 或 cancel 方法时,TimerThread 永远都不会 stop,所以机智的 JDK 还加上了一种比较保险的方法:

    private final Object threadReaper = new Object() {
        protected void finalize() throws Throwable {
            synchronized(queue) {
                thread.newTasksMayBeScheduled = false;
                queue.notify(); // In case queue is empty.
            }
        }
    };
    

    用到了 Object 对象的 finalize 方法,大家都知道 finalize 方法是对象被 GC 的时候调用的。上述做法的思路是:当一个 Timer 已经没有任何对象引用时,自然不会有新的 Task 加入到队列中,Timer 对象自然也就会被垃圾回收,此时 TimerThread 也就应该 stop 了,所以在垃圾回收的时候还应该把 newTasksMayBeScheduled 设置为 false,并且唤起正在 wait 的 TimerThread 线程。所以说,如果你创建的 Timer 不再需要了,最好是调用 cancel 接口手动取消,否则的话 TimerThread 就需要等到垃圾回收的时候才会 stop。

    参考:

    1. 《JDK Timer实现详解》:https://blog.csdn.net/winwill2012/article/details/73939167

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    String,StringBuffer与StringBuilder的区别?
    Digui
    Digui1
    逆序
    TestOverWrite
    DemoBoxWeight
    TestSuperSub
    Cast
    TestOverWrite
    Joseph
  • 原文地址:https://www.cnblogs.com/binarylei/p/10957118.html
Copyright © 2011-2022 走看看