zoukankan      html  css  js  c++  java
  • ScheduledThreadPoolExecutor的scheduleAtFixedRate方法探究

    ScheduledThreadPoolExecutor除了具有ThreadPoolExecutor的所有功能外,还可以延迟执行任务或者周期性的执 行某个任务。scheduleWithFixedDelay和scheduleAtFixedRate就是用来完成这个功能的。平常使用 scheduleAtFixedRate这个方法时并没有多想,但是这几天在实现一个功能的时候,需要考虑scheduleAtFixedRate所执行 的task是否会影响任务的周期性,比如scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS),那么 这个command的执行会不会影响这个10秒的周期性。因此特意仔细看了下ScheduledThreadPoolExecutor的源代码,这里记录 一下,以便以后查看。

        scheduleAtFixedRate有两个时间参数,initialDelay和period,对应该方法的两个主要功能,即延迟运行任务和周期性执行任务。

    Java代码  收藏代码
    1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,  
    2.                                               long initialDelay,  
    3.                                               long period,  
    4.                                               TimeUnit unit) {  
    5.     if (command == null || unit == null)  
    6.         throw new NullPointerException();  
    7.     if (period <= 0)  
    8.         throw new IllegalArgumentException();  
    9.     RunnableScheduledFuture<?> t = decorateTask(command,  
    10.         new ScheduledFutureTask<Object>(command,  
    11.                                         null,  
    12.                                         triggerTime(initialDelay, unit),  
    13.                                         unit.toNanos(period)));  
    14.     delayedExecute(t);  
    15.     return t;  
    16. }  
    17.   
    18. /** 
    19.  * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. 
    20.  */  
    21. private void delayedExecute(Runnable command) {  
    22.     if (isShutdown()) {  
    23.         reject(command);  
    24.         return;  
    25.     }  
    26.     // Prestart a thread if necessary. We cannot prestart it  
    27.     // running the task because the task (probably) shouldn't be  
    28.     // run yet, so thread will just idle until delay elapses.  
    29.     if (getPoolSize() < getCorePoolSize())  
    30.         prestartCoreThread();  
    31.   
    32.     super.getQueue().add(command);  
    33. }  

        首先创建一个ScheduledFutureTask,然后通过delayedExecute执行这个task。在delayedExecute中,首先 预先启动一个线程,这里要注意的是这个这里用来启动一个新线程的firstTask参数是null,所以新启动的线程是idle状态的,然后把这个 task加入到workQueue。ScheduledThreadPoolExecutor里的workQueue用的是 DelayedWorkQueue,这个DelayedWorkQueue就是实现delay的关键。DelayedWorkQueue内部使用的是 DelayQueue,DelayQueue实现task delay的关键就在于其Offer(E e)和Take.下面,通过分析这两个方法和结合ThreadPoolExecutor的运行原理来说明delay操作是如何实现的

    Java代码  收藏代码
    1. public boolean offer(E e) {  
    2.     final ReentrantLock lock = this.lock;  
    3.     lock.lock();  
    4.     try {  
    5.         E first = q.peek();  
    6.         q.offer(e);  
    7.         if (first == null || e.compareTo(first) < 0)  
    8.             available.signalAll();  
    9.         return true;  
    10.     } finally {  
    11.         lock.unlock();  
    12.     }  
    13. }  
    14.   
    15. public E take() throws InterruptedException {  
    16.     final ReentrantLock lock = this.lock;  
    17.     lock.lockInterruptibly();  
    18.     try {  
    19.         for (;;) {  
    20.             E first = q.peek();  
    21.             if (first == null) {  
    22.                 available.await();  
    23.             } else {  
    24.                 long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
    25.                 if (delay > 0) {  
    26.                     long tl = available.awaitNanos(delay);  
    27.                 } else {  
    28.                     E x = q.poll();  
    29.                     assert x != null;  
    30.                     if (q.size() != 0)  
    31.                         available.signalAll(); // wake up other takers  
    32.                     return x;  
    33.   
    34.                 }  
    35.             }  
    36.         }  
    37.     } finally {  
    38.         lock.unlock();  
    39.     }  
    40. }  

          ScheduledThreadPoolExecutor执行task是通过工作线程Work来承担的,Work的Run方法如下:

    Java代码  收藏代码
    1. public void run() {  
    2.     try {  
    3.         Runnable task = firstTask;  
    4.         firstTask = null;  
    5.         while (task != null || (task = getTask()) != null) {  
    6.             runTask(task);  
    7.             task = null;  
    8.         }  
    9.     } finally {  
    10.         workerDone(this);  
    11.     }  
    12. }  

         因为前面在delayedExecute方法里面创建work线程的firstTask参数为null,所以就通过getTask去从workQueue 里面获取task,getTask在正常情况下(即线程池没有关闭,线程数量没有超过corePoolSize等)是通过 workQueue.take()从workQueue里获取任务。根据上面的贴出来的take方法的代码,如果queue是空的,则take方法会阻塞 住,直到有新task被add进来。而在上面的delayedExecute方法的最后,会把创建的scheduledFutureTask加入到 workQueue,这样take方法中的available.await()就被唤醒;在take方法里面,如果workQueue不为空,则执行 task.getDelay()方法获取task的delay

    Java代码  收藏代码
    1. public long getDelay(TimeUnit unit) {  
    2.     return unit.convert(time - now(), TimeUnit.NANOSECONDS);  
    3. }  

       这里的time是通过两个方法把initialDelay变成一个triggerTime

    Java代码  收藏代码
    1. /** 
    2.  * Returns the trigger time of a delayed action. 
    3.  */  
    4. private long triggerTime(long delay, TimeUnit unit) {  
    5.      return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));  
    6. }  
    7.   
    8. /** 
    9.  * Returns the trigger time of a delayed action. 
    10.  */  
    11. long triggerTime(long delay) {  
    12.      return now() +  
    13.          ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));  
    14. }  

    注意看这个方法,这里返回的delay不是固定不变的,从task被放入workQueue起,不同的时间调用getDelay方法会得出不同的 delay。如果放入workQueue的task的initialDelay是5秒,那么根据take方法的代码,如果在放入workQueue5秒 后,就可以从delayQueue中拿到5秒前put进去的task,这样就实现了delay的功能。

       在本文的最前面提到scheduleAtFixedRate能够周期性地执行一项任务,那么这个是如何实现的呢?在 scheduleAtFixedRate方法里创建了一个ScheduledFutureTask,这个ScheduledFutureTask包装了 command,最后周期性执行的是ScheduledFutureTask的run方法。

    Java代码  收藏代码
    1. private void runPeriodic() {  
    2.     boolean ok = ScheduledFutureTask.super.runAndReset();  
    3.     boolean down = isShutdown();  
    4.     // Reschedule if not cancelled and not shutdown or policy allows  
    5.     if (ok && (!down ||  
    6.                (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&  
    7.                 !isStopped()))) {  
    8.         long p = period;  
    9.         if (p > 0)  
    10.             time += p;  
    11.         else  
    12.             time = triggerTime(-p);  
    13.         ScheduledThreadPoolExecutor.super.getQueue().add(this);  
    14.     }  
    15.     // This might have been the final executed delayed  
    16.     // task.  Wake up threads to check.  
    17.     else if (down)  
    18.         interruptIdleWorkers();  
    19. }  
    20.   
    21. /** 
    22.  * Overrides FutureTask version so as to reset/requeue if periodic. 
    23.  */  
    24. public void run() {  
    25.     if (isPeriodic())  
    26.         runPeriodic();  
    27.     else  
    28.         ScheduledFutureTask.super.run();  
    29. }  

         由上面的代码可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)这个方法的周期性会受 command的影响,如果command方法的执行时间是10秒,那么执行command的周期其实是20秒,即 scheduleAtFixedRate这个方法要等一个完整的command方法执行完成后才继续周期性地执行command方法,其实这样的设计也是 符合常理的。

         以上就是对ScheduledThreadPoolExecutor的一点小理解。

    来自:http://olylakers.iteye.com/blog/1218243

  • 相关阅读:
    SGU 495 Kids and Prizes 概率DP 或 数学推理
    poj 2799 IP Networks 模拟 位运算
    uva 202 Repeating Decimals 模拟
    poj 3158 Kickdown 字符串匹配?
    uva 1595 Symmetry 暴力
    uva 201 Squares 暴力
    uva 1594 Ducci Sequence 哈希
    uva 1368 DNA Consensus String 字符串
    数字、字符串、列表的常用操作
    if条件判断 流程控制
  • 原文地址:https://www.cnblogs.com/sunxucool/p/3148141.html
Copyright © 2011-2022 走看看