zoukankan      html  css  js  c++  java
  • java 线程池(2)

    ScheduledThreadPoolExecutor概述

              ScheduledThreadPoolExecutor下文简称 STPE.

      1 public class ScheduledThreadPoolExecutor
      2         extends ThreadPoolExecutor
      3         implements ScheduledExecutorService

               STPE 可以看到继承线程池类,从名字也可以看出这个STPE是一个执行周期任务的线程池。STPE的几个特点 :

    • 继承 ThreadPoolExecutor
    • 内部使用DelayedQueue 即是任务达到某个时间要求才返回的任务队列
    • 在运行之前,取消一个已提交的任务(task调用Cancel 方法),那么该任务不会执行,默认情况下,这样一个已经取消的任务不会自动从任务队列移除,直到延迟时间到了才移除,为了防止队列中保持已经取消的任务,使用 setRemoveOnCancelPolicy 设置true ,会在取消后立即移除队列。

               下面看看几个周期方法 :

    • schedule(); : 任务开始前延时A秒执行
    • scheduleAtFixedRate();  (例如: 首个任务开始前延时A秒,时间为B秒的任务)
    • scheduleWithFixedDelay();   (例如: 首个任务开始前延时A秒,执行任务后延时B秒再进行下一个任务)

             内部结构 :

    • DelayedWorkerQueue 内部类
    • ScheduledFutureTask 内部类
    • 几个控制变量

    源码分析

    重要的方法

             先跟着流程走一遍,把工作思路先走一遍

      1     public ScheduledFuture<?> schedule(Runnable command,
      2                                        long delay,
      3                                        TimeUnit unit) {
      4         if (command == null || unit == null)
      5             throw new NullPointerException();
      6         RunnableScheduledFuture<?> t = decorateTask(command,
      7             new ScheduledFutureTask<Void>(command, null,
      8                                           triggerTime(delay, unit)));
      9         delayedExecute(t);
     10         return t;
     11     }
     12 
     13 
     14 
     15 
     16     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
     17                                                   long initialDelay,
     18                                                   long period,
     19                                                   TimeUnit unit) {
     20         if (command == null || unit == null)
     21             throw new NullPointerException();
     22         if (period <= 0)
     23             throw new IllegalArgumentException();
     24         //包装成  ScheduledFutureTask
     25         ScheduledFutureTask<Void> sft =
     26             new ScheduledFutureTask<Void>(command,
     27                                           null,
     28                                           triggerTime(initialDelay, unit),
     29                                           unit.toNanos(period));
     30         //装饰成  RunnableScheduledFuture
     31         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
     32         sft.outerTask = t;
     33         delayedExecute(t);
     34         return t;
     35     }
     36 
     37 
     38 
     39 
     40     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
     41                                                      long initialDelay,
     42                                                      long delay,
     43                                                      TimeUnit unit) {
     44         if (command == null || unit == null)
     45             throw new NullPointerException();
     46         if (delay <= 0)
     47             throw new IllegalArgumentException();
     48         ScheduledFutureTask<Void> sft =
     49             new ScheduledFutureTask<Void>(command,
     50                                           null,
     51                                           triggerTime(initialDelay, unit),
     52                                           unit.toNanos(-delay));
     53         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
     54         sft.outerTask = t;
     55         delayedExecute(t);
     56         return t;
     57     }

             可以看到三个方法最后都会调用 delayedExecute 方法。

      1     private void delayedExecute(RunnableScheduledFuture<?> task) {
      2         if (isShutdown())
      3             reject(task);
      4         else {
      5         	//任务队列加入任务 
      6             super.getQueue().add(task);
      7             // 此时任务有可能在执行了,判断是不是 Running 状态,
      8             if (isShutdown() &&
      9                 !canRunInCurrentRunState(task.isPeriodic()) &&
     10                 remove(task))
     11                 task.cancel(false);
     12             else
     13                 ensurePrestart();
     14         }
     15     }
     16 
     17 
     18     /**
     19      * Same as prestartCoreThread except arranges that at least one
     20      * thread is started even if corePoolSize is 0.
     21      *
     22      * 此时任务已入列,即使是 corePoolSize 为 0 ,也要开线程执行
     23      *
     24      */
     25     void ensurePrestart() {
     26         int wc = workerCountOf(ctl.get());
     27         if (wc < corePoolSize)
     28             //以core线程数量为上限,增加线程执行
     29             addWorker(null, true);
     30         else if (wc == 0)
     31             //不以core线程数量为上限,增加线程执行
     32             addWorker(null, false);
     33     }

           可以看到,任务加入队列后就进行判断是否线程池shutdown , 最后 addWorker 方法,创建线程就OK了(addWorker 在ThreadPoolExecutor 我们上节已经分析过了,那么此时就等待线程从队列中获取任务就可以了)。

    DelayedWorkQueue

            下面贴出它的注释,这个队列从名字就可以看出是和 Delay 相关,同时是基于堆操作的,既然是堆,那么应该要知道二叉堆的上浮和下沉操作。

             加入元素的操作。

      1     static class DelayedWorkQueue extends AbstractQueue<Runnable>
      2         implements BlockingQueue<Runnable>
      1         public void put(Runnable e) {
      2             offer(e);
      3         }
      4 
      5         //插入一个元素
      6         public boolean offer(Runnable x) {
      7             if (x == null)
      8                 throw new NullPointerException();
      9             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
     10             final ReentrantLock lock = this.lock;
     11             lock.lock();
     12             try {
     13                 int i = size;
     14                 if (i >= queue.length)
     15                 	//扩容
     16                     grow();
     17                 size = i + 1;
     18 
     19                 //当前队列中没元素
     20                 if (i == 0) {
     21                     queue[0] = e;
     22                     setIndex(e, 0);
     23                 } else {
     24                 	//已有元素可,二叉堆上浮操作(上浮意味着都尾部添加再上浮上去)
     25                     siftUp(i, e);
     26                 }
     27                 //无元素队列加入时的情况 ,即是说有元素时是不会唤醒的
     28                 if (queue[0] == e) {
     29                     leader = null;
     30                     //唤醒第一个
     31                     available.signal();
     32                 }
     33             } finally {
     34                 lock.unlock();
     35             }
     36             return true;
     37         }
     38 
     39 
     40         /**
     41          * Sifts element added at bottom up to its heap-ordered spot.
     42          * Call only when holding lock.
     43          *
     44          * 必须加锁
     45          *
     46          */
     47         private void siftUp(int k, RunnableScheduledFuture<?> key) {
     48             while (k > 0) {
     49                 int parent = (k - 1) >>> 1;
     50                 RunnableScheduledFuture<?> e = queue[parent];
     51                 if (key.compareTo(e) >= 0)
     52                     break;
     53                 // 父类元素大于下面的子类元素 ,交换 
     54                 queue[k] = e;
     55                 setIndex(e, k);
     56                 k = parent;
     57             }
     58             queue[k] = key;
     59             setIndex(key, k);
     60         }

            上面的 offer 方法是加锁操作。

            下面是take 方法,为什么要看take 方法呢?这是因为在ThreadPoolExecute 执行任务,线程获取队列中的方法,调用的是队列的take 方法或是 poll 方法。

      1         public RunnableScheduledFuture<?> take() throws InterruptedException {
      2             //加锁,所以只有一个线程可以进来
      3             final ReentrantLock lock = this.lock;
      4             lock.lockInterruptibly();
      5             try {
      6                 for (;;) {
      7                     RunnableScheduledFuture<?> first = queue[0];
      8                     if (first == null)
      9                         //没有了就阻塞, await方法会释放所有的锁,其他的也会进来的
     10                         //唤醒之后,继续for循环
     11                         available.await();
     12                     else {
     13                         //拿到任务,要是到了延迟的时间就返回任务
     14                         long delay = first.getDelay(NANOSECONDS);
     15                         if (delay <= 0)
     16                             return finishPoll(first);
     17 
     18                         //拿到任务,但是还没到时间执行任务
     19                         first = null; // don't retain ref while waiting
     20                         //下面的代码就是先来的拿到 leader ,然后执行 awaitNanos();其它的就 await 
     21                         if (leader != null)
     22                             available.await();
     23                         else {
     24                             Thread thisThread = Thread.currentThread();
     25                             leader = thisThread;
     26                             try {
     27                                 available.awaitNanos(delay);
     28                             } finally {
     29                                 if (leader == thisThread)
     30                                     leader = null;
     31                             }
     32                         }
     33                     }
     34                 }
     35             } finally {
     36                 //唤醒后面的
     37                 if (leader == null && queue[0] != null)
     38                     available.signal();
     39                 lock.unlock();
     40             }
     41         }

             这个地方使用到了ConditionObject 的 await 唤醒的操作在 有元素加入到队列中,调用 signal 方法,释放锁后就会唤醒下一个元素。同时我们可以看到,leader 的作用在这里起到了 “leader---只能有一个获取得到”的作用。

     

    ScheduledFutureTask

      1     private class ScheduledFutureTask<V>
      2             extends FutureTask<V> implements RunnableScheduledFuture<V>

    schedulepe

         

             ScheduledFutureTask 继承Runnable并又线程执行,我们直接看run方法。

      1 
      2         /**
      3          * Overrides FutureTask version so as to reset/requeue if periodic.
      4          */
      5     public void run() {
      6             boolean periodic = isPeriodic();
      7             if (!canRunInCurrentRunState(periodic))
      8                 cancel(false);
      9             //是不是时间周期任务
     10             else if (!periodic)
     11                 ScheduledFutureTask.super.run();
     12             //周期任务
     13             else if (ScheduledFutureTask.super.runAndReset()) {
     14                 //设置下次执行的时间周期
     15                 setNextRunTime();
     16                 //重新执行这个任务
     17                 reExecutePeriodic(outerTask);
     18             }
     19     }
     20 
     21 
     22 
     23     /**
     24      * Executes the computation without setting its result, and then
     25      * resets this future to initial state, failing to do so if the
     26      * computation encounters an exception or is cancelled.  This is
     27      * designed for use with tasks that intrinsically execute more
     28      * than once.
     29      *
     30      * @return {@code true} if successfully run and reset
     31      */
     32     //父类的 runAndReset
     33     protected boolean runAndReset() {
     34         //对变量 runner 进行CAS 操作 ,
     35         if (state != NEW ||
     36             !UNSAFE.compareAndSwapObject(this, runnerOffset,
     37                                          null, Thread.currentThread()))
     38             return false;
     39         boolean ran = false;
     40         int s = state;
     41         try {
     42             Callable<V> c = callable;
     43             if (c != null && s == NEW) {
     44                 try {
     45                     //上面的call 方法调用的 run 方法
     46                     c.call(); // don't set result
     47                     ran = true;
     48                 } catch (Throwable ex) {
     49                     setException(ex);
     50                 }
     51             }
     52         } finally {
     53             // runner must be non-null until state is settled to
     54             // prevent concurrent calls to run()
     55             runner = null;
     56             // state must be re-read after nulling runner to prevent
     57             // leaked interrupts
     58             s = state;
     59             if (s >= INTERRUPTING)
     60                 handlePossibleCancellationInterrupt(s);
     61         }
     62         return ran && s == NEW;
     63     }
     64 
     65 
     66     //父类的 run 方法 
     67     public void run() {
     68         if (state != NEW ||
     69             !UNSAFE.compareAndSwapObject(this, runnerOffset,
     70                                          null, Thread.currentThread()))
     71             return;
     72         try {
     73             Callable<V> c = callable;
     74             if (c != null && state == NEW) {
     75                 V result;
     76                 boolean ran;
     77                 try {
     78                     //这是调用了 call 方法 
     79                     result = c.call();
     80                     ran = true;
     81                 } catch (Throwable ex) {
     82                     result = null;
     83                     ran = false;
     84                     setException(ex);
     85                 }
     86                 if (ran)
     87                     set(result);
     88             }
     89         } finally {
     90             // runner must be non-null until state is settled to
     91             // prevent concurrent calls to run()
     92             runner = null;
     93             // state must be re-read after nulling runner to prevent
     94             // leaked interrupts
     95             int s = state;
     96             if (s >= INTERRUPTING)
     97                 handlePossibleCancellationInterrupt(s);
     98         }
     99     }
    100 
    101         /**
    102          * Sets the next time to run for a periodic task.
    103          */
    104     private void setNextRunTime() {
    105         long p = period;
    106         if (p > 0)
    107             time += p;
    108         else
    109             time = triggerTime(-p);
    110     }
    111 
    112 
    113 
    114     /**
    115      * Requeues a periodic task unless current run state precludes it.
    116      * Same idea as delayedExecute except drops task rather than rejecting.
    117      *
    118      * @param task the task
    119      */
    120     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    121         if (canRunInCurrentRunState(true)) {
    122             //又再一次加入队列
    123             super.getQueue().add(task);
    124             if (!canRunInCurrentRunState(true) && remove(task))
    125                 task.cancel(false);
    126             else
    127                 //再次开始执行
    128                 ensurePrestart();
    129         }
    130     }

              我们看到了最终都会执行任务父类里,一个变量的 call方法,我们现在看看这个 call 方法。

      1     /**
      2     * Creates a periodic action with given nano time and period.
      3     */
      4     ScheduledFutureTask(Runnable r, V result, long ns, long period) {
      5             super(r, result);
      6             this.time = ns;
      7             this.period = period;
      8             this.sequenceNumber = sequencer.getAndIncrement();
      9     }
     10 
     11     period : 正数(时间周期),负数(固定延迟后执行任务),0(不是可重复的任务)
     12 
     13     //父类构造函数
     14     //上面的super(r, result);
     15     public FutureTask(Runnable runnable, V result) {
     16         this.callable = Executors.callable(runnable, result);
     17         this.state = NEW;       // ensure visibility of callable
     18     }
     19 
     20 
     21     //上面的Executors.callable
     22     public static <T> Callable<T> callable(Runnable task, T result) {
     23         if (task == null)
     24             throw new NullPointerException();
     25         return new RunnableAdapter<T>(task, result);
     26     }
     27 
     28 
     29 
     30     static final class RunnableAdapter<T> implements Callable<T> {
     31         final Runnable task;
     32         final T result;
     33         RunnableAdapter(Runnable task, T result) {
     34             this.task = task;
     35             this.result = result;
     36         }
     37         public T call() {
     38             //实际调用的是 run方法
     39             task.run();
     40             return result;
     41         }
     42     }

            可以看到 call 方法里其实执行的是 任务的 run 方法,然后返回个 result .

            另外还有几个控制变量,他们在 delayedExecute 和 重写父类的方法 onShutdown 有关,主要的作用是取消任务后是否立即从队列中删除。

      1    /**
      2      * False if should cancel/suppress periodic tasks on shutdown.
      3      */
      4     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
      5 
      6     /**
      7      * False if should cancel non-periodic tasks on shutdown.
      8      */
      9     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
     10 
     11     /**
     12      * True if ScheduledFutureTask.cancel should remove from queue
     13      */
     14     private volatile boolean removeOnCancel = false;
      1     /**
      2      * Cancels and clears the queue of all tasks that should not be run
      3      * due to shutdown policy.  Invoked within super.shutdown.
      4      */
      5     @Override void onShutdown() {
      6         BlockingQueue<Runnable> q = super.getQueue();
      7         boolean keepDelayed =
      8             getExecuteExistingDelayedTasksAfterShutdownPolicy();
      9         boolean keepPeriodic =
     10             getContinueExistingPeriodicTasksAfterShutdownPolicy();
     11         if (!keepDelayed && !keepPeriodic) {
     12             for (Object e : q.toArray())
     13                 if (e instanceof RunnableScheduledFuture<?>)
     14                     ((RunnableScheduledFuture<?>) e).cancel(false);
     15             q.clear();
     16         }
     17         else {
     18             // Traverse snapshot to avoid iterator exceptions
     19             for (Object e : q.toArray()) {
     20                 if (e instanceof RunnableScheduledFuture) {
     21                     RunnableScheduledFuture<?> t =
     22                         (RunnableScheduledFuture<?>)e;
     23                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
     24                         t.isCancelled()) { // also remove if already cancelled
     25                         if (q.remove(t))
     26                             t.cancel(false);
     27                     }
     28                 }
     29             }
     30         }
     31         tryTerminate();
     32     }

    参考资料 :

  • 相关阅读:
    java_29打印流
    java_26 缓冲流
    java-27 Properties类的使用
    java_24.1文件流的应用--复制文件
    java_25 FileReader类和FileWriter类
    java_25.1字节转为字符OutputStreamWriter
    java_23 File类
    java_24 FileOutputStream类和FileInputStream类
    java_21 Set接口、HashSet类、LinkedSet类
    随机数Random和静态函数Math
  • 原文地址:https://www.cnblogs.com/Benjious/p/10220268.html
Copyright © 2011-2022 走看看