zoukankan      html  css  js  c++  java
  • Java并发(二十二):定时任务ScheduledThreadPoolExecutor

    需要在理解线程池原理的基础上学习定时任务:Java并发(二十一):线程池实现原理

    一、先做总结

    通过一个简单示例总结:

        public static void main(String[] args) {
            ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(3);
            scheduled.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println();
                }
                
            }, 10, 30, TimeUnit.MILLISECONDS);
        }

    1、概述

    new一个线程池,等待队列是DelayedWorkQueue,将Runable放入队列中,到时间会被线程池取出执行

    2、如何实现任务到时间被自动取出?

    延时队列DelayedWorkQueue:

      DelayedWorkQueue为ScheduledThreadPoolExecutor中的内部类(类似DelayQueue)

      DelayedWorkQueue中的任务是按照延迟时间从短到长来进行排序的(插入时排序)

      只有在延迟期满时才能从中提取元素,其列头是延迟期满后保存时间最长的Delayed元素

    DelayedWorkQueue原理:

      put()/offer():将ScheduledFutureTask放入队列时,进行排序,时间短的在前(ScheduledFutureTask有触发时间time属性)

      take():取出ScheduledFutureTask时,quene[0]的时间到了就返回;

              quene[0]的时间没到,就将take线程挂起delay时间。时间到了自动唤醒(Unsafe实现),再次取quene[0]。

    3、周期任务如何实现?

       任务被取出来run之后,将time+period又放入DelayedWorkQueue队列

    4、四个定时任务及区别:

    (1)schedule(Callable callable, long delay, TimeUnit unit) :创建并执行在给定延迟后启用的 ScheduledFuture。
    (2)schedule(Runnable command, long delay, TimeUnit unit) :创建并执行在给定延迟后启用的一次性操作。
    (3)scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
    (4)scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

    区别:

    第三个方法(scheduleAtFixedRate)是周期固定,也就说它是不会受到这个延迟的影响的,每个线程的调度周期在初始化时就已经绝对了,是什么时候调度就是什么时候调度,它不会因为上一个线程的调度失效延迟而受到影响。
    但是第四个方法(scheduleWithFixedDelay),则不一样,它是每个线程的调度间隔固定,也就是说第一个线程与第二线程之间间隔delay,第二个与第三个间隔delay,以此类推。如果第二线程推迟了那么后面所有的线程调度都会推迟。

    scheduleAtFixedRate与scheduleWithFixedDelay区别原理:

      任务被取出来run之后,将time+period又放入DelayedWorkQueue队列

      细节一:构造ScheduledFutureTask时,scheduleAtFixedRate传入period(>0),scheduleWithFixedDelay传入-delay(<0)

      细节二:setNextRunTime时,scheduleAtFixedRate.time=time+period;scheduleWithFixedDelay.time=now()+period

    细节一:

        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    
      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (delay <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(-delay));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }

    细节二:

        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;// scheduleAtFixedRate:在上次开始执行的时间+周期时间
            else
                time = triggerTime(-p);// scheduleWithFixedDelay:执行完上一个线程的时间+周期时间
        }
    
        long triggerTime(long delay) {
            return now()
                    + ((delay < (Long.MAX_VALUE >> 1)) ? delay: overflowFree(delay));
        }

    二、四个定时任务方法

    ScheduledThreadPoolExecutor提供了如下四个方法,也就是四个调度器:

    1. schedule(Callable callable, long delay, TimeUnit unit) :创建并执行在给定延迟后启用的 ScheduledFuture。
    2. schedule(Runnable command, long delay, TimeUnit unit) :创建并执行在给定延迟后启用的一次性操作。
    3. scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
    4. scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

    第一、二个方法差不多,都是一次性操作,只不过参数一个是Callable,一个是Runnable。

    稍微分析下第三(scheduleAtFixedRate)、四个(scheduleWithFixedDelay)方法,加入initialDelay = 5,period/delay = 3,unit为秒。

    如果每个线程都是都运行非常良好不存在延迟的问题,那么这两个方法线程运行周期是5、8、11、14、17…….,但是如果存在延迟呢?比如第三个线程用了5秒钟,那么这两个方法的处理策略是怎样的?第三个方法(scheduleAtFixedRate)是周期固定,也就说它是不会受到这个延迟的影响的,每个线程的调度周期在初始化时就已经绝对了,是什么时候调度就是什么时候调度,它不会因为上一个线程的调度失效延迟而受到影响。但是第四个方法(scheduleWithFixedDelay),则不一样,它是每个线程的调度间隔固定,也就是说第一个线程与第二线程之间间隔delay,第二个与第三个间隔delay,以此类推。如果第二线程推迟了那么后面所有的线程调度都会推迟,例如,上面第二线程推迟了2秒,那么第三个就不再是11秒执行了,而是13秒执行。

    三、ScheduledFutureTask

    ScheduledFutureTask是ScheduledThreadPoolExecutor的内部类,线程池将Runable任务封装成ScheduledFutureTask来提交

    ScheduledFutureTask内部继承FutureTask,实现RunnableScheduledFuture接口,它内部定义了三个比较重要的变量:

            /** 任务被添加到ScheduledThreadPoolExecutor中的序号 */
            private final long sequenceNumber;
    
            /** 任务要执行的具体时间 */
            private long time;
    
            /**  任务的间隔周期 /
            private final long period;

    构造函数:

            ScheduledFutureTask(Runnable r, V result, long ns) {
                super(r, result);
                this.time = ns;
                this.period = 0;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    
            ScheduledFutureTask(Runnable r, V result, long ns, long period) {
                super(r, result);
                this.time = ns;
                this.period = period;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    
            ScheduledFutureTask(Callable<V> callable, long ns) {
                super(callable);
                this.time = ns;
                this.period = 0;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
    
            ScheduledFutureTask(Callable<V> callable, long ns) {
                super(callable);
                this.time = ns;
                this.period = 0;
                this.sequenceNumber = sequencer.getAndIncrement();
            }

    compareTo()方法:

    提供一个排序算法,该算法规则是:首先按照time排序,time小的排在前面,大的排在后面,如果time相同,则使用sequenceNumber排序,小的排在前面,大的排在后面。

    为什么在这个类里面提供compareTo()方法呢?

    在前面就介绍过ScheduledThreadPoolExecutor在构造方法中提供的是DelayedWorkQueue()队列中,也就是说ScheduledThreadPoolExecutor是把任务添加到DelayedWorkQueue中的,而DelayedWorkQueue则是类似于DelayQueue,内部维护着一个以时间为先后顺序的队列,所以compareTo()方法使用与DelayedWorkQueue队列对其元素ScheduledThreadPoolExecutor task进行排序的算法。

     public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

    run()方法:

    ScheduledThreadPoolExecutor通过run()方法对task任务进行调度和延迟

            public void run() {
                boolean periodic = isPeriodic();
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                else if (!periodic)
                    ScheduledFutureTask.super.run();
                else if (ScheduledFutureTask.super.runAndReset()) {
                    setNextRunTime();
                    reExecutePeriodic(outerTask);
                }
            }

    (1)调用isPeriodic()获取该线程是否为周期性任务标志,然后调用canRunInCurrentRunState()方法判断该线程是否可以执行,如果不可以执行则调用cancel()取消任务。

    (2)如果当线程已经到达了执行点,则调用run()方法执行task,该run()方法是在FutureTask中定义的。

    (3)否则调用runAndReset()方法运行并充值,调用setNextRunTime()方法计算任务下次的执行时间,重新把任务添加到队列中,让该任务可以重复执行。

    四、延时队列DelayedWorkQueue

    使用优先级队列DelayedWorkQueue,保证添加到队列中的任务会按照任务的延时时间进行排序,延时时间少的任务首先被获取。

    重要属性:

         // 初始时,数组长度大小。
            private static final int INITIAL_CAPACITY = 16;
            // 使用数组来储存队列中的元素。
            private RunnableScheduledFuture<?>[] queue =
                new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
            // 使用lock来保证多线程并发安全问题。
            private final ReentrantLock lock = new ReentrantLock();
            // 队列中储存元素的大小
            private int size = 0;
    
            //特指队列头任务所在线程
            private Thread leader = null;
            
            // 当队列头的任务延时时间到了,或者有新的任务变成队列头时,用来唤醒等待线程
            private final Condition available = lock.newCondition();

    offer()方法插入元素:

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
            // 使用lock保证并发操作安全
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                // 如果要超过数组长度,就要进行数组扩容
                if (i >= queue.length)
                    // 数组扩容
                    grow();
                // 将队列中元素个数加一
                size = i + 1;
                // 如果是第一个元素,那么就不需要排序,直接赋值就行了
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    // 调用siftUp方法,使插入的元素变得有序。
                    siftUp(i, e);
                }
                // 表示新插入的元素是队列头,更换了队列头,
                // 那么就要唤醒正在等待获取任务的线程。
                if (queue[0] == e) {
                    leader = null;
                    // 唤醒正在等待等待获取任务的线程
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

    主要是三步:

    (1)元素个数超过数组长度,就会调用grow()方法,进行数组扩容。

    (2)将新元素e添加到优先级队列中对应的位置,通过siftUp方法,保证按照元素的优先级排序。

    (3)如果新插入的元素是队列头,即更换了队列头,那么就要唤醒正在等待获取任务的线程。这些线程可能是因为原队列头元素的延时时间没到,而等待的。

    siftUp方法:按照元素的优先级插入元素

    通过循环,来查找元素key应该插入在堆二叉树那个节点位置,并交互父节点的位置。
        private void siftUp(int k, RunnableScheduledFuture<?> key) {
                // 当k==0时,就到了堆二叉树的根节点了,跳出循环
                while (k > 0) {
                    // 父节点位置坐标, 相当于(k - 1) / 2
                    int parent = (k - 1) >>> 1;
                    // 获取父节点位置元素
                    RunnableScheduledFuture<?> e = queue[parent];
                    // 如果key元素大于父节点位置元素,满足条件,那么跳出循环
                    // 因为是从小到大排序的。
                    if (key.compareTo(e) >= 0)
                        break;
                    // 否则就将父节点元素存放到k位置
                    queue[k] = e;
                    // 这个只有当元素是ScheduledFutureTask对象实例才有用,用来快速取消任务。
                    setIndex(e, k);
                    // 重新赋值k,寻找元素key应该插入到堆二叉树的那个节点
                    k = parent;
                }
                // 循环结束,k就是元素key应该插入的节点位置
                queue[k] = key;
                setIndex(key, k);
            }

    take()方法取元素:

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    // 如果没有任务,就让线程在available条件下等待。
                    if (first == null)
                        available.await();
                    else {
                        // 获取任务的剩余延时时间
                        long delay = first.getDelay(NANOSECONDS);
                        // 如果延时时间到了,就返回这个任务,用来执行。
                        if (delay <= 0)
                            return finishPoll(first);
                        // 将first设置为null,当线程等待时,不持有first的引用
                        first = null; // don't retain ref while waiting
    
                        // 如果还是原来那个等待队列头任务的线程,
                        // 说明队列头任务正在执行。
                        if (leader != null)
                            available.await();
                        else {
                            // 记录一下当前等待队列头任务的线程
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 当任务的延时时间到了时,能够自动超时唤醒。
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    // 唤醒等待任务的线程
                    available.signal();
                lock.unlock();
            }
        }

    如果队列中没有任务,那么就让当前线程在available条件下等待。如果队列头任务的剩余延时时间delay大于0,那么就让当前线程在available条件下等待delay时间。

    五、源码解析定时任务过程

    以一个简单的示例来分析:

        public static void main(String[] args) {
            ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(3);
            scheduled.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println();
                }
    
            }, 10, 30, TimeUnit.MILLISECONDS);
        }

    new线程池:

    ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(3); // new一个等待队列是DelayedWorkQueue的线程池

        // Executors.newScheduledThreadPool(3);
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
        
        // super父类即线程池类ThreadPoolExecutor
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
        
        public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                    Executors.defaultThreadFactory(), defaultHandler);
        }

    任务提交:

        // ScheduledThreadPoolExecutor.scheduleAtFixedRate
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                long initialDelay, long period, TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            
            // 封装成ScheduledFutureTask提交
            ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
                    null, triggerTime(initialDelay, unit), unit.toNanos(period));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t); // 提交
            return t;
        }
        
        // ScheduledThreadPoolExecutor.delayedExecute(RunnableScheduledFuture<?>)
        private void delayedExecute(RunnableScheduledFuture<?> task) {
            if (isShutdown())
                reject(task);
            else {
                super.getQueue().add(task); // 任务插入到延时队列DelayedWorkQueue中
                if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                    task.cancel(false);
                else
                    ensurePrestart(); // 启动一个线程 
            }
        }
        
        // ScheduledThreadPoolExecutor.DelayedWorkQueue
        public boolean add(Runnable e) {
            return offer(e); // 按时间排序,插入延时队列(上文分析过了)
        }
        
        // ThreadPoolExecutor.ensurePrestart()
        void ensurePrestart() {
            int wc = workerCountOf(ctl.get());
            if (wc < corePoolSize)
                // 线程池启动一个没有任务的线程,while循环到延时队列中取任务,调用DelayedWorkQueue.take()取
                // addWorker(null, true)方法不做详细介绍,前一篇线程池文章中分析过了
                addWorker(null, true); 
            else if (wc == 0)
                addWorker(null, false);
        }
    
        // DelayedWorkQueue.take()
        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await(); // 如果没有任务,就让线程在available条件下等待。
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first); // 如果延时时间到了,就返回这个任务,用来执行。
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 如果第一个任务延时时间没到,就挂起delay时间,到延时时间自动唤醒
                                // 此处是循环,自动唤醒之后再取出任务去执行
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
        
        // 任务是封装成ScheduledFutureTask的,任务执行会调用ScheduledFutureTask的 run方法
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run(); // 执行任务
            else if (ScheduledFutureTask.super.runAndReset()) { // 设置下一次循环的任务 
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
        
        // 循环
        void reExecutePeriodic(RunnableScheduledFuture<?> task) {
            if (canRunInCurrentRunState(true)) {
                super.getQueue().add(task);
                if (!canRunInCurrentRunState(true) && remove(task))
                    task.cancel(false);
                else
                    ensurePrestart();
            }
        }

    参考资料 / 相关推荐:

    【死磕Java并发】—–J.U.C之线程池:ScheduledThreadPoolExecutor

    Java优先级队列DelayedWorkQueue原理分析

  • 相关阅读:
    dnn
    DATAGRID学习
    在.net下的换行符
    treeview
    《25项最优时间管理工具与技巧》
    vim常用操作
    【Google给毕业生的忠告】
    MySQL的安装、使用及权限管理
    各种国际化标准组织
    ubuntu thunderbird 邮箱 163 配置 不能发送问题
  • 原文地址:https://www.cnblogs.com/hexinwei1/p/10066113.html
Copyright © 2011-2022 走看看