zoukankan      html  css  js  c++  java
  • ScheduledThreadPoolExecutor的scheduleAtFixedRate方法探究

    ScheduledThreadPoolExecutor除了具有ThreadPoolExecutor的所有功能外,还可以延迟执行任务或者周期性的执 行某个任务。scheduleWithFixedDelay和scheduleAtFixedRate就是用来完成这个功能的。平常使用 scheduleAtFixedRate这个方法时并没有多想,但是这几天在实现一个功能的时候,需要考虑scheduleAtFixedRate所执行 的task是否会影响任务的周期性,比如scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS),那么 这个command的执行会不会影响这个10秒的周期性。因此特意仔细看了下ScheduledThreadPoolExecutor的源代码,这里记录 一下,以便以后查看。

        scheduleAtFixedRate有两个时间参数,initialDelay和period,对应该方法的两个主要功能,即延迟运行任务和周期性执行任务。

    Java代码  收藏代码
    1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,  
    2.                                               long initialDelay,  
    3.                                               long period,  
    4.                                               TimeUnit unit) {  
    5.     if (command == null || unit == null)  
    6.         throw new NullPointerException();  
    7.     if (period <= 0)  
    8.         throw new IllegalArgumentException();  
    9.     RunnableScheduledFuture<?> t = decorateTask(command,  
    10.         new ScheduledFutureTask<Object>(command,  
    11.                                         null,  
    12.                                         triggerTime(initialDelay, unit),  
    13.                                         unit.toNanos(period)));  
    14.     delayedExecute(t);  
    15.     return t;  
    16. }  
    17.   
    18. /** 
    19.  * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. 
    20.  */  
    21. private void delayedExecute(Runnable command) {  
    22.     if (isShutdown()) {  
    23.         reject(command);  
    24.         return;  
    25.     }  
    26.     // Prestart a thread if necessary. We cannot prestart it  
    27.     // running the task because the task (probably) shouldn't be  
    28.     // run yet, so thread will just idle until delay elapses.  
    29.     if (getPoolSize() < getCorePoolSize())  
    30.         prestartCoreThread();  
    31.   
    32.     super.getQueue().add(command);  
    33. }  

        首先创建一个ScheduledFutureTask,然后通过delayedExecute执行这个task。在delayedExecute中,首先 预先启动一个线程,这里要注意的是这个这里用来启动一个新线程的firstTask参数是null,所以新启动的线程是idle状态的,然后把这个 task加入到workQueue。ScheduledThreadPoolExecutor里的workQueue用的是 DelayedWorkQueue,这个DelayedWorkQueue就是实现delay的关键。DelayedWorkQueue内部使用的是 DelayQueue,DelayQueue实现task delay的关键就在于其Offer(E e)和Take.下面,通过分析这两个方法和结合ThreadPoolExecutor的运行原理来说明delay操作是如何实现的

    Java代码  收藏代码
    1. public boolean offer(E e) {  
    2.     final ReentrantLock lock = this.lock;  
    3.     lock.lock();  
    4.     try {  
    5.         E first = q.peek();  
    6.         q.offer(e);  
    7.         if (first == null || e.compareTo(first) < 0)  
    8.             available.signalAll();  
    9.         return true;  
    10.     } finally {  
    11.         lock.unlock();  
    12.     }  
    13. }  
    14.   
    15. public E take() throws InterruptedException {  
    16.     final ReentrantLock lock = this.lock;  
    17.     lock.lockInterruptibly();  
    18.     try {  
    19.         for (;;) {  
    20.             E first = q.peek();  
    21.             if (first == null) {  
    22.                 available.await();  
    23.             } else {  
    24.                 long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
    25.                 if (delay > 0) {  
    26.                     long tl = available.awaitNanos(delay);  
    27.                 } else {  
    28.                     E x = q.poll();  
    29.                     assert x != null;  
    30.                     if (q.size() != 0)  
    31.                         available.signalAll(); // wake up other takers  
    32.                     return x;  
    33.   
    34.                 }  
    35.             }  
    36.         }  
    37.     } finally {  
    38.         lock.unlock();  
    39.     }  
    40. }  

          ScheduledThreadPoolExecutor执行task是通过工作线程Work来承担的,Work的Run方法如下:

    Java代码  收藏代码
    1. public void run() {  
    2.     try {  
    3.         Runnable task = firstTask;  
    4.         firstTask = null;  
    5.         while (task != null || (task = getTask()) != null) {  
    6.             runTask(task);  
    7.             task = null;  
    8.         }  
    9.     } finally {  
    10.         workerDone(this);  
    11.     }  
    12. }  

         因为前面在delayedExecute方法里面创建work线程的firstTask参数为null,所以就通过getTask去从workQueue 里面获取task,getTask在正常情况下(即线程池没有关闭,线程数量没有超过corePoolSize等)是通过 workQueue.take()从workQueue里获取任务。根据上面的贴出来的take方法的代码,如果queue是空的,则take方法会阻塞 住,直到有新task被add进来。而在上面的delayedExecute方法的最后,会把创建的scheduledFutureTask加入到 workQueue,这样take方法中的available.await()就被唤醒;在take方法里面,如果workQueue不为空,则执行 task.getDelay()方法获取task的delay

    Java代码  收藏代码
    1. public long getDelay(TimeUnit unit) {  
    2.     return unit.convert(time - now(), TimeUnit.NANOSECONDS);  
    3. }  

       这里的time是通过两个方法把initialDelay变成一个triggerTime

    Java代码  收藏代码
    1. /** 
    2.  * Returns the trigger time of a delayed action. 
    3.  */  
    4. private long triggerTime(long delay, TimeUnit unit) {  
    5.      return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));  
    6. }  
    7.   
    8. /** 
    9.  * Returns the trigger time of a delayed action. 
    10.  */  
    11. long triggerTime(long delay) {  
    12.      return now() +  
    13.          ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));  
    14. }  

    注意看这个方法,这里返回的delay不是固定不变的,从task被放入workQueue起,不同的时间调用getDelay方法会得出不同的 delay。如果放入workQueue的task的initialDelay是5秒,那么根据take方法的代码,如果在放入workQueue5秒 后,就可以从delayQueue中拿到5秒前put进去的task,这样就实现了delay的功能。

       在本文的最前面提到scheduleAtFixedRate能够周期性地执行一项任务,那么这个是如何实现的呢?在 scheduleAtFixedRate方法里创建了一个ScheduledFutureTask,这个ScheduledFutureTask包装了 command,最后周期性执行的是ScheduledFutureTask的run方法。

    Java代码  收藏代码
    1. private void runPeriodic() {  
    2.     boolean ok = ScheduledFutureTask.super.runAndReset();  
    3.     boolean down = isShutdown();  
    4.     // Reschedule if not cancelled and not shutdown or policy allows  
    5.     if (ok && (!down ||  
    6.                (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&  
    7.                 !isStopped()))) {  
    8.         long p = period;  
    9.         if (p > 0)  
    10.             time += p;  
    11.         else  
    12.             time = triggerTime(-p);  
    13.         ScheduledThreadPoolExecutor.super.getQueue().add(this);  
    14.     }  
    15.     // This might have been the final executed delayed  
    16.     // task.  Wake up threads to check.  
    17.     else if (down)  
    18.         interruptIdleWorkers();  
    19. }  
    20.   
    21. /** 
    22.  * Overrides FutureTask version so as to reset/requeue if periodic. 
    23.  */  
    24. public void run() {  
    25.     if (isPeriodic())  
    26.         runPeriodic();  
    27.     else  
    28.         ScheduledFutureTask.super.run();  
    29. }  

         由上面的代码可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)这个方法的周期性会受 command的影响,如果command方法的执行时间是10秒,那么执行command的周期其实是20秒,即 scheduleAtFixedRate这个方法要等一个完整的command方法执行完成后才继续周期性地执行command方法,其实这样的设计也是 符合常理的。

         以上就是对ScheduledThreadPoolExecutor的一点小理解。

    来自:http://olylakers.iteye.com/blog/1218243

  • 相关阅读:
    转:同步、异步、阻塞和非阻塞
    转:回调函数
    转:同步/异步与阻塞/非阻塞的区别
    转:Socket在阻塞模式下的信息收发和文件接收
    转:直接用socket实现HTTP协议
    链接错误 LINK : fatal error LNK1104: 无法打开文件“XX.obj”
    转:MFC中常用类,宏,函数介绍
    转:线程同步技术剖析
    转:线程同步
    转:C++回调函数用法
  • 原文地址:https://www.cnblogs.com/sunxucool/p/3148141.html
Copyright © 2011-2022 走看看