zoukankan      html  css  js  c++  java
  • Java Executor 框架

         Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自http://www.javaclubcn.com/a/jichuzhishi/2012/1116/170.html)

         本篇博文分析Executor中几个比较重要的接口和类。

         Executor

    1 public interface Executor {
    2      void execute(Runnable command);
    3 }

         Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法。它没有直接的实现类,有一个重要的子接口ExecutorService。

         ExecutorService

     1 //继承自Executor接口
     2 public interface ExecutorService extends Executor {
     3     /**
     4      * 关闭方法,调用后执行之前提交的任务,不再接受新的任务
     5      */
     6     void shutdown();
     7     /**
     8      * 从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表
     9      */
    10     List<Runnable> shutdownNow();
    11     /**
    12      * 判断执行器是否已经关闭
    13      */
    14     boolean isShutdown();
    15     /**
    16      * 关闭后所有任务是否都已完成
    17      */
    18     boolean isTerminated();
    19     /**
    20      * 中断
    21      */
    22     boolean awaitTermination(long timeout, TimeUnit unit)
    23         throws InterruptedException;
    24     /**
    25      * 提交一个Callable任务
    26      */
    27     <T> Future<T> submit(Callable<T> task);
    28     /**
    29      * 提交一个Runable任务,result要返回的结果
    30      */
    31     <T> Future<T> submit(Runnable task, T result);
    32     /**
    33      * 提交一个任务
    34      */
    35     Future<?> submit(Runnable task);
    36     /**
    37      * 执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表
    38      */
    39     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    40         throws InterruptedException;
    41     /**
    42      * 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
    43      */
    44     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    45                                   long timeout, TimeUnit unit)
    46         throws InterruptedException;
    47     /**
    48      * 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
    49      */
    50     <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    51         throws InterruptedException, ExecutionException;
    52     /**
    53      * 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
    54      */
    55     <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    56                     long timeout, TimeUnit unit)
    57         throws InterruptedException, ExecutionException, TimeoutException;
    58 }

        ExecutorService接口继承自Executor接口,定义了终止、提交任务、跟踪任务返回结果等方法。

        ExecutorService涉及到Runnable、Callable、Future接口,这些接口的具体内容如下。

     1 // 实现Runnable接口的类将被Thread执行,表示一个基本的任务
     2 public interface Runnable {
     3     // run方法就是它所有的内容,就是实际执行的任务
     4     public abstract void run();
     5 }
     6 // Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
     7 public interface Callable<V> {
     8     // 相对于run方法的带有返回值的call方法
     9     V call() throws Exception;
    10 }
    Future
     1 // Future代表异步任务的执行结果
     2 public interface Future<V> {
     3 
     4     /**
     5      * 尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。
     6      */
     7     boolean cancel(boolean mayInterruptIfRunning);
     8 
     9     /**
    10      * 返回代表的任务是否在完成之前被取消了
    11      */
    12     boolean isCancelled();
    13 
    14     /**
    15      * 如果任务已经完成,返回true
    16      */
    17     boolean isDone();
    18 
    19     /**
    20      * 获取异步任务的执行结果(如果任务没执行完将等待)
    21      */
    22     V get() throws InterruptedException, ExecutionException;
    23 
    24     /**
    25      * 获取异步任务的执行结果(有最常等待时间的限制)
    26      *
    27      *  timeout表示等待的时间,unit是它时间单位
    28      */
    29     V get(long timeout, TimeUnit unit)
    30         throws InterruptedException, ExecutionException, TimeoutException;
    31 }

         ExecutorService有一个子接口ScheduledExecutorService和一个抽象实现类AbstractExecutorService。

         ScheduledExecutorService

     1 // 可以安排指定时间或周期性的执行任务的ExecutorService
     2 public interface ScheduledExecutorService extends ExecutorService {
     3     /**
     4      * 在指定延迟后执行一个任务,只执行一次
     5      */
     6     public ScheduledFuture<?> schedule(Runnable command,
     7                        long delay, TimeUnit unit);
     8     /**
     9      * 与上面的方法相同,只是接受的是Callable任务
    10      */
    11     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
    12                        long delay, TimeUnit unit);
    13     /**
    14      * 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit
    15      * 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...
    16      */
    17     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    18                           long initialDelay,
    19                           long period,
    20                           TimeUnit unit);
    21     /**
    22      * 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit
    23      * 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...
    24      */
    25     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
    26                              long initialDelay,
    27                              long delay,
    28                              TimeUnit unit);
    29 }

        ScheduledExecutorService定义了四个方法,已经在上面给出基本的解释。ScheduledExecutorService有两个实现类,分别是DelegatedScheduledExecutorService和ScheduledThreadPoolExecutor,将在后面介绍。还需要解释的是ScheduledFuture。

        ScheduledFuture继承自Future和Delayed接口,自身没有添加方法。Delayed接口定义了一个获取剩余延迟的方法。

         AbstractExecutorService

      1 // 提供ExecutorService的默认实现
      2 public abstract class AbstractExecutorService implements ExecutorService {
      3     /*
      4      * 为指定的Runnable和value构造一个FutureTask,value表示默认被返回的Future
      5      */
      6     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
      7         return new FutureTask<T>(runnable, value);
      8     }
      9     /*
     10      * 为指定的Callable创建一个FutureTask
     11      */
     12     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
     13         return new FutureTask<T>(callable);
     14     }
     15     /*
     16      * 提交Runnable任务
     17      */
     18     public Future<?> submit(Runnable task) {
     19         if (task == null) throw new NullPointerException();
     20         // 通过newTaskFor方法构造RunnableFuture,默认的返回值是null
     21         RunnableFuture<Object> ftask = newTaskFor(task, null);
     22         // 调用具体实现的execute方法
     23         execute(ftask);
     24         return ftask;
     25     }
     26     /*
     27      * 提交Runnable任务
     28      */
     29     public <T> Future<T> submit(Runnable task, T result) {
     30         if (task == null) throw new NullPointerException();
     31         // 通过newTaskFor方法构造RunnableFuture,默认的返回值是result
     32         RunnableFuture<T> ftask = newTaskFor(task, result);
     33         execute(ftask);
     34         return ftask;
     35     }
     36     /*
     37      * 提交Callable任务
     38      */
     39     public <T> Future<T> submit(Callable<T> task) {
     40         if (task == null) throw new NullPointerException();
     41         RunnableFuture<T> ftask = newTaskFor(task);
     42         execute(ftask);
     43         return ftask;
     44     }
     45 
     46     /*
     47      * doInvokeAny的具体实现(核心内容),其它几个方法都是重载方法,都对这个方法进行调用
     48      * tasks 是被执行的任务集,timed标志是否定时的,nanos表示定时的情况下执行任务的限制时间
     49      */
     50     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
     51                             boolean timed, long nanos)
     52         throws InterruptedException, ExecutionException, TimeoutException {
     53         // tasks空判断
     54         if (tasks == null)
     55             throw new NullPointerException();
     56         // 任务数量
     57         int ntasks = tasks.size();
     58         if (ntasks == 0)
     59             throw new IllegalArgumentException();
     60         // 创建对应数量的Future返回集
     61         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
     62         ExecutorCompletionService<T> ecs =
     63             new ExecutorCompletionService<T>(this);
     64         try {
     65             // 执行异常
     66             ExecutionException ee = null;
     67             // System.nanoTime()根据系统计时器当回当前的纳秒值
     68             long lastTime = (timed)? System.nanoTime() : 0;
     69             // 获取任务集的遍历器
     70             Iterator<? extends Callable<T>> it = tasks.iterator();
     71 
     72             // 向执行器ExecutorCompletionService提交一个任务,并将结果加入futures中
     73             futures.add(ecs.submit(it.next
     74             // 修改任务计数器
     75             --ntasks;
     76             // 活跃任务计数器
     77             int active = 1;
     78             for (;;) {
     79                 // 获取并移除代表已完成任务的Future,如果不存在,返回null
     80                 Future<T> f = ecs.poll();
     81                 if (f == null) {
     82                     // 没有任务完成,且任务集中还有未提交的任务
     83                     if (ntasks > 0) {
     84                         // 剩余任务计数器减1
     85                         --ntasks;
     86                         // 提交任务并添加结果
     87                         futures.add(ecs.submit(it.next()));
     88                         // 活跃任务计数器加1
     89                         ++active;
     90                     }
     91                     // 没有剩余任务,且没有活跃任务(所有任务可能都会取消),跳过这一次循环
     92                     else if (active == 0)
     93                         break;
     94                     else if (timed) {
     95                         // 获取并移除代表已完成任务的Future,如果不存在,会等待nanos指定的纳秒数
     96                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
     97                         if (f == null)
     98                             throw new TimeoutException();
     99                         // 计算剩余可用时间
    100                         long now = System.nanoTime();
    101                         nanos -= now - lastTime;
    102                         lastTime = now;
    103                     }
    104                     else
    105                         // 获取并移除表示下一个已完成任务的未来,等待,如果目前不存在。
    106                         // 执行到这一步说明已经没有任务任务可以提交,只能等待某一个任务的返回
    107                         f = ecs.take();
    108                 }
    109                 // f不为空说明有一个任务完成了
    110                 if (f != null) {
    111                     // 已完成一个任务,所以活跃任务计数减1
    112                     --active;
    113                     try {
    114                         // 返回该任务的结果
    115                         return f.get();
    116                     } catch (InterruptedException ie) {
    117                         throw ie;
    118                     } catch (ExecutionException eex) {
    119                         ee = eex;
    120                     } catch (RuntimeException rex) {
    121                         ee = new ExecutionException(rex);
    122                     }
    123                 }
    124             }
    125             // 如果没有成功返回结果则抛出异常
    126             if (ee == null)
    127                 ee = new ExecutionException();
    128             throw ee;
    129 
    130         } finally {
    131             // 无论执行中发生异常还是顺利结束,都将取消剩余未执行的任务
    132             for (Future<T> f : futures)
    133                 f.cancel(true);
    134         }
    135     }
    136 
    137     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    138         throws InterruptedException, ExecutionException {
    139         try {
    140             // 非定时任务的doInvokeAny调用
    141             return doInvokeAny(tasks, false, 0);
    142         } catch (TimeoutException cannotHappen) {
    143             assert false;
    144             return null;
    145         }
    146     }
    147     // 定时任务的invokeAny调用,timeout表示超时时间,unit表示时间单位
    148     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    149                            long timeout, TimeUnit unit)
    150         throws InterruptedException, ExecutionException, TimeoutException {
    151         return doInvokeAny(tasks, true, unit.toNanos(timeout));
    152     }
    153     // 无超时设置的invokeAll方法
    154     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    155         throws InterruptedException {
    156         // 空任务判断
    157         if (tasks == null)
    158             throw new NullPointerException();
    159         // 创建大小为任务数量的结果集
    160         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    161         // 是否完成所有任务的标记
    162         boolean done = false;
    163         try {
    164             // 遍历并执行任务
    165             for (Callable<T> t : tasks) {
    166                 RunnableFuture<T> f = newTaskFor(t);
    167                 futures.add(f);
    168                 execute(f);
    169             }
    170             // 遍历结果集
    171             for (Future<T> f : futures) {
    172                 // 如果某个任务没完成,通过f调用get()方法
    173                 if (!f.isDone()) {
    174                     try {
    175                         // get方法等待计算完成,然后获取结果(会等待)。所以调用get后任务就会完成计算,否则会等待
    176                         f.get();
    177                     } catch (CancellationException ignore) {
    178                     } catch (ExecutionException ignore) {
    179                     }
    180                 }
    181             }
    182             // 标志所有任务执行完成
    183             done = true;
    184             // 返回结果
    185             return futures;
    186         } finally {
    187             // 假如没有完成所有任务(可能是发生异常等情况),将任务取消
    188             if (!done)
    189                 for (Future<T> f : futures)
    190                     f.cancel(true);
    191         }
    192     }
    193     // 超时设置的invokeAll方法
    194     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    195                                          long timeout, TimeUnit unit)
    196         throws InterruptedException {
    197         // 需要执行的任务集为空或时间单位为空,抛出异常
    198         if (tasks == null || unit == null)
    199             throw new NullPointerException();
    200         // 将超时时间转为纳秒单位
    201         long nanos = unit.toNanos(timeout);
    202         // 创建任务结果集
    203         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    204         // 是否全部完成的标志
    205         boolean done = false;
    206         try {
    207             // 遍历tasks,将任务转为RunnableFuture
    208             for (Callable<T> t : tasks)
    209                 futures.add(newTaskFor(t));
    210             // 记录当前时间(单位是纳秒)
    211             long lastTime = System.nanoTime();
    212             // 获取迭代器
    213             Iterator<Future<T>> it = futures.iterator();
    214             // 遍历
    215             while (it.hasNext()) {
    216                 // 执行任务
    217                 execute((Runnable)(it.next()));
    218                 // 记录当前时间
    219                 long now = System.nanoTime();
    220                 // 计算剩余可用时间
    221                 nanos -= now - lastTime;
    222                 // 更新上一次执行时间
    223                 lastTime = now;
    224                 // 超时,返回保存任务状态的结果集
    225                 if (nanos <= 0)
    226                     return futures;
    227             }
    228 
    229             for (Future<T> f : futures) {
    230                 // 如果有任务没完成
    231                 if (!f.isDone()) {
    232                     // 时间已经用完,返回保存任务状态的结果集
    233                     if (nanos <= 0)
    234                         return futures;
    235                     try {
    236                         // 获取计算结果,最多等待给定的时间nanos,单位是纳秒
    237                         f.get(nanos, TimeUnit.NANOSECONDS);
    238                     } catch (CancellationException ignore) {
    239                     } catch (ExecutionException ignore) {
    240                     } catch (TimeoutException toe) {
    241                         return futures;
    242                     }
    243                     // 计算可用时间
    244                     long now = System.nanoTime();
    245                     nanos -= now - lastTime;
    246                     lastTime = now;
    247                 }
    248             }
    249             // 修改是否全部完成的标记
    250             done = true;
    251             // 返回结果集
    252             return futures;
    253         } finally {
    254             // 假如没有完成所有任务(可能是时间已经用完、发生异常等情况),将任务取消
    255             if (!done)
    256                 for (Future<T> f : futures)
    257                     f.cancel(true);
    258         }
    259     }
    260 }

        AbstractExecutor实现了ExecutorService接口的部分方法。具体代码的分析在上面已经给出。

        AbstractExecutor有两个子类:DelegatedExecutorService、ThreadPoolExecutor。将在后面介绍。

        下面是AbstractExecutor中涉及到的RunnableFuture、FutureTask、ExecutorCompletionService。

        RunnableFuture继承自Future和Runnable,只有一个run()方法(Runnable中已经有一个run方法了,为什么RunnableFuture还要重新写一个run方法呢?求高手指教)。RunnableFuture接口看上去就像是Future和Runnable两个接口的组合。

        FutureTask实现了RunnableFuture接口,除了实现了Future和Runnable中的方法外,它还有自己的方法和一个内部类Sync。

        ExecutorCompletionService实现了CompletionService接口,将结果从复杂的一部分物种解耦出来。这些内容后续会介绍,不过这里先介绍框架中的其它内容,弄清整体框架。

        下面看继承自AbstractExecutorService的ThreadPoolExecutor。

        ThreadPoolExecutor

    ThreadPoolExecutor(好长)
      1 public class ThreadPoolExecutor extends AbstractExecutorService {
      2     // 检查关闭的权限        
      3     private static final RuntimePermission shutdownPerm =
      4         new RuntimePermission("modifyThread");
      5     /* runState提供了主要的生命周期控制,可取值有以下几个:
      6      * RUNNING:接受新的任务,处理队列中的任务
      7      * SHUTDOWN:不再接受新的任务,但是处理队列中的任务
      8      * STOP:不接受新任务,也不处理队列中的任务,打断正在处理的任务
      9      * TERMINATED:和STOP类似,同时终止所有线程
     10      * RUNNING -> SHUTDOWN
     11      *    On invocation of shutdown(), perhaps implicitly in finalize()
     12      * (RUNNING or SHUTDOWN) -> STOP
     13      *    On invocation of shutdownNow()
     14      * SHUTDOWN -> TERMINATED
     15      *    When both queue and pool are empty
     16      * STOP -> TERMINATED
     17      *    When pool is empty
     18      * 
     19      */
     20     volatile int runState;
     21     static final int RUNNING    = 0;
     22     static final int SHUTDOWN   = 1;
     23     static final int STOP       = 2;
     24     static final int TERMINATED = 3;
     25 
     26     // 用于保持任务的队列
     27     private final BlockingQueue<Runnable> workQueue;
     28     // poolSize, corePoolSize, maximumPoolSize, runState, workers set的更新锁
     29     private final ReentrantLock mainLock = new ReentrantLock();
     30     // mainLock锁的一个Condition实例
     31     private final Condition termination = mainLock.newCondition();
     32     // 保持线程池中所有的工作线程。只有获取mainLock锁后才能访问。
     33     private final HashSet<Worker> workers = new HashSet<Worker>();
     34     // 空闲线程的等待时间,大为是纳秒
     35     private volatile long  keepAliveTime;
     36     // 是否允许核心线程“活着” false(默认值)允许,哪怕空闲;true则使用keepAliveTime来控制等待超时时间
     37     private volatile boolean allowCoreThreadTimeOut;
     38     // 核心线程池的大小
     39     private volatile int   corePoolSize;
     40     // pool size最大值
     41     private volatile int   maximumPoolSize;
     42     // 当前pool大小
     43     private volatile int   poolSize;
     44     // 拒绝执行的处理器 顾名思义,当一个任务被拒绝执行后将同个这个处理器进行处理
     45     private volatile RejectedExecutionHandler handler;
     46     // 线程工厂,用于创建线程
     47     private volatile ThreadFactory threadFactory;
     48     // 最终pool size达到的最大值
     49     private int largestPoolSize;
     50     // 已完成任务计数
     51     private long completedTaskCount;
     52     // 默认的拒绝执行的处理器
     53     private static final RejectedExecutionHandler defaultHandler =
     54         new AbortPolicy();
     55     /**
     56      * 关于借个size的说明:
     57      * 线程池数量poolSize指工作线程Worker对象的集合workers的实际大小,通过workers.size()可直接获得。            
     58      * 核心线程池数量corePoolSize,可理解为工作线程Worker对象的集合workers的目标大小。
     59      * 如果poolSize > corePoolSize,那么ThreadPoolExecutor就会有机制在适当的时候回收闲置的线程。
     60      * 最大线程池数量maxPoolSize,就是工作线程Worker对象的集合workers的大小上限。
     61      * 假如说任务队列满了,再来新任务时,若poolSize还没达到maxPoolSize,则继续创建新的线程来执行新任务,
     62      * 若不幸poolSize达到了上限maxPoolSize,那不能再创建新的线程了,只能采取reject策略来拒绝新任务。
     63      */
     64     /** 构造方法 开始*/
     65     public ThreadPoolExecutor(int corePoolSize,
     66                               int maximumPoolSize,
     67                               long keepAliveTime,
     68                               TimeUnit unit,
     69                               BlockingQueue<Runnable> workQueue) {
     70         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     71              Executors.defaultThreadFactory(), defaultHandler);
     72     }
     73     public ThreadPoolExecutor(int corePoolSize,
     74                               int maximumPoolSize,
     75                               long keepAliveTime,
     76                               TimeUnit unit,
     77                               BlockingQueue<Runnable> workQueue,
     78                               ThreadFactory threadFactory) {
     79         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     80              threadFactory, defaultHandler);
     81     }
     82     public ThreadPoolExecutor(int corePoolSize,
     83                               int maximumPoolSize,
     84                               long keepAliveTime,
     85                               TimeUnit unit,
     86                               BlockingQueue<Runnable> workQueue,
     87                               RejectedExecutionHandler handler) {
     88         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     89              Executors.defaultThreadFactory(), handler);
     90     }
     91     // 主要的构造方法,其它构造方法都是对这个方法的调用
     92     public ThreadPoolExecutor(int corePoolSize,
     93                               int maximumPoolSize,
     94                               long keepAliveTime,
     95                               TimeUnit unit,
     96                               BlockingQueue<Runnable> workQueue,
     97                               ThreadFactory threadFactory,
     98                               RejectedExecutionHandler handler) {
     99         // 非法输入(明显这些值都是不能小于0的)
    100         if (corePoolSize < 0 ||
    101             maximumPoolSize <= 0 ||
    102             maximumPoolSize < corePoolSize ||
    103             keepAliveTime < 0)
    104             throw new IllegalArgumentException();
    105         // 空判断
    106         if (workQueue == null || threadFactory == null || handler == null)
    107             throw new NullPointerException();
    108         this.corePoolSize = corePoolSize;
    109         this.maximumPoolSize = maximumPoolSize;
    110         this.workQueue = workQueue;
    111         this.keepAliveTime = unit.toNanos(keepAliveTime);
    112         this.threadFactory = threadFactory;
    113         this.handler = handler;
    114     }
    115     /** 构造方法 结束*/
    116     
    117 
    118     // 执行Runnable任务
    119     public void execute(Runnable command) {
    120         if (command == null)
    121             throw new NullPointerException();
    122             /*如果当前线程数量poolSize>=核心线程数量corePoolSize,
    123             那当然无法再把当前任务加入到核心线程池中执行了,于是进花括号选择其他的策略执行;
    124             如果poolSize没有达到corePoolSize,那很自然是把当前任务放到核心线程池执行,
    125             也就是执行逻辑或运算符后的方法addIfUnderCorePoolSize(command)。
    126             “放到核心线程池执行”是什么意思呢?
    127             就是new 一个新工作线程放到workers集合中,让这个新线程来执行当前的任务command,而这个新线程可以认为是核心线程池中的其中一个线程。*/
    128         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
    129             // 线程池状态时RUNNING且能将任务添加到worker队列中
    130             if (runState == RUNNING && workQueue.offer(command)) {
    131                 // 加入了队列以后,只要保证有工作线程就ok了,工作线程会自动去执行任务队列的。
    132                 // 所以判断一下if ( runState != RUNNING || poolSize == 0),
    133                 // 在这个if为true时候,去保证一下任务队列有线程会执行,即执行ensureQueuedTaskHandled(command)方法。
    134                 // 这里有两种情况,情况一:runState != RUNNING,这种情况在ensureQueuedTaskHandled方法中会把任务丢给reject拒绝策略处理,
    135                 // 情况二:poolSize == 0,这种情况是new一个新线程加入到工作线程集合workers中。
    136                 if (runState != RUNNING || poolSize == 0)
    137                     ensureQueuedTaskHandled(command);
    138             }
    139             // 程序执行到这个分支,说明当前状态runState != RUNNING,或者任务队列workQueue已经满了。
    140             // 先看第一个条件下,前面解释过runState,除了RUNNING状态,其他三个状态都不能接收新任务,
    141             // 所以当runState != RUNNING时新任务只能根据reject策略拒绝,
    142             // 而这个拒绝的逻辑是在addIfUnderMaximumPoolSize方法中实现的;
    143             // 再看第二个条件下,workQueue已经满,潜在的条件是runState == RUNNING,这种情况怎么处理新任务呢?
    144             // 很简单,若当前线程数量poolSize没有达到最大线程数量maxPoolSize,
    145             // 则创建新的线程去执行这个无法加入任务队列的新任务,
    146             // 否则就根据reject策略拒绝
    147             else if (!addIfUnderMaximumPoolSize(command))
    148                 reject(command); // is shutdown or saturated
    149         }
    150     }
    151 
    152     private Thread addThread(Runnable firstTask) {
    153         Worker w = new Worker(firstTask);
    154         // 创建一个新Thread t
    155         Thread t = threadFactory.newThread(w);
    156         if (t != null) {
    157             w.thread = t;
    158             workers.add(w);
    159             int nt = ++poolSize;
    160             // 跟踪线程池大小的最大值
    161             if (nt > largestPoolSize)
    162                 largestPoolSize = nt;
    163         }
    164         return t;
    165     }
    166 
    167     // 创建并启动新线程执行firstTask(在运行线程数小于核心线程池大小的情况且状态是RUNNING)
    168     private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    169         Thread t = null;
    170         final ReentrantLock mainLock = this.mainLock;
    171         // 获取锁
    172         mainLock.lock();
    173         try {
    174             if (poolSize < corePoolSize && runState == RUNNING)
    175                 // 创建一个新线程
    176                 t = addThread(firstTask);
    177         } finally {
    178             // 释放锁
    179             mainLock.unlock();
    180         }
    181         if (t == null)
    182             return false;
    183         // 启动线程执行任务
    184         t.start();
    185         return true;
    186     }
    187 
    188     // 创建并启动新线程执行firstTask(在运行线程数小于pool size的最大值的情况且状态是RUNNING)
    189     private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    190         Thread t = null;
    191         final ReentrantLock mainLock = this.mainLock;
    192         mainLock.lock();
    193         try {
    194             if (poolSize < maximumPoolSize && runState == RUNNING)
    195                 t = addThread(firstTask);
    196         } finally {
    197             mainLock.unlock();
    198         }
    199         if (t == null)
    200             return false;
    201         t.start();
    202         return true;
    203     }
    204 
    205     // 确保任务被处理
    206     private void ensureQueuedTaskHandled(Runnable command) {
    207         final ReentrantLock mainLock = this.mainLock;
    208         mainLock.lock();
    209         // 拒绝标记
    210         boolean reject = false;
    211         Thread t = null;
    212         try {
    213             int state = runState;
    214             // 如果状态不是RUNNING,能成功从worker队列中移除,则拒绝这个任务执行
    215             if (state != RUNNING && workQueue.remove(command))
    216                 reject = true;
    217             else if (state < STOP &&
    218                      poolSize < Math.max(corePoolSize, 1) &&
    219                      !workQueue.isEmpty())
    220                 t = addThread(null);
    221         } finally {
    222             mainLock.unlock();
    223         }
    224         if (reject)
    225             reject(command);
    226         else if (t != null)
    227             // 不用拒绝任务则启动线程执行任务
    228             t.start();
    229     }
    230 
    231     // 调用RejectedExecutionHandler决绝任务
    232     void reject(Runnable command) {
    233         handler.rejectedExecution(command, this);
    234     }
    235     // 工作线程,实现了Runnable接口
    236     private final class Worker implements Runnable {
    237         // 每个任务执行都必须获取和释放runLock。这主要是防止中断的目的是取消工作线程,而不是中断正在运行的任务。
    238         private final ReentrantLock runLock = new ReentrantLock();
    239         // 要执行的任务
    240         private Runnable firstTask;
    241         // 每个线程完成任务的计数器,最后会统计到completedTaskCount中
    242         volatile long completedTasks;
    243         // 用于执行Runnable的线程
    244         Thread thread;
    245         // 构造方法
    246         Worker(Runnable firstTask) {
    247             this.firstTask = firstTask;
    248         }
    249         // 判断这个线程是否活动
    250         boolean isActive() {
    251             return runLock.isLocked();
    252         }
    253         // 中断闲置线程
    254         void interruptIfIdle() {
    255             final ReentrantLock runLock = this.runLock;
    256             if (runLock.tryLock()) {
    257                 try {
    258             if (thread != Thread.currentThread())
    259             thread.interrupt();
    260                 } finally {
    261                     runLock.unlock();
    262                 }
    263             }
    264         }
    265         // 中断
    266         void interruptNow() {
    267             thread.interrupt();
    268         }
    269 
    270         
    271         private void runTask(Runnable task) {
    272             final ReentrantLock runLock = this.runLock;
    273             runLock.lock();
    274             try {
    275                 
    276                 if (runState < STOP &&
    277                     Thread.interrupted() &&
    278                     runState >= STOP)
    279                     thread.interrupt();
    280                 
    281                 boolean ran = false;
    282                 beforeExecute(thread, task);
    283                 try {
    284                     task.run();
    285                     ran = true;
    286                     afterExecute(task, null);
    287                     ++completedTasks;
    288                 } catch (RuntimeException ex) {
    289                     if (!ran)
    290                         afterExecute(task, ex);
    291                     throw ex;
    292                 }
    293             } finally {
    294                 runLock.unlock();
    295             }
    296         }
    297 
    298         
    299         public void run() {
    300             try {
    301                 Runnable task = firstTask;
    302                 firstTask = null;
    303                 /** 
    304                  * 注意这段while循环的执行逻辑,每执行完一个核心线程后,就会去线程池 
    305                  * 队列中取下一个核心线程,如取出的核心线程为null,则当前工作线程终止 
    306                  */  
    307                 while (task != null || (task = getTask()) != null) {
    308                     //你所提交的核心线程(任务)的运行逻辑  
    309                     runTask(task);
    310                     task = null;
    311                 }
    312             } finally {
    313                 // 当前工作线程退出  
    314                 workerDone(this);
    315             }
    316         }
    317     }
    318 
    319     // 从池队列中取的核心线程(任务)的方法
    320     Runnable getTask() {
    321         for (;;) {
    322             try {
    323                 // 获取运行状态
    324                 int state = runState;
    325                 // 大于SHUTDOWN,即STOP和TERMINATED状态,没有任务
    326                 if (state > SHUTDOWN)
    327                     return null;
    328                 Runnable r;
    329                 // SHUTDOWN状态
    330                 if (state == SHUTDOWN)  // 帮助清空队列
    331                     r = workQueue.poll();
    332                 // 状态时RUNNING,且poolSize > corePoolSize或allowCoreThreadTimeOut
    333                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
    334                     // 获取并移除元素(等待指定的时间)
    335                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
    336                 else
    337                     // 获取并移除元素(会一直等待知道获取到有效元素)
    338                     r = workQueue.take();
    339                 // 获取结果不为空,返回
    340                 if (r != null)
    341                     return r;
    342                 // 检查一个获取任务失败的线程能否退出
    343                 if (workerCanExit()) {
    344                     if (runState >= SHUTDOWN) // 中断其他线程
    345                         interruptIdleWorkers();
    346                     return null;
    347                 }
    348                 // Else retry
    349             } catch (InterruptedException ie) {
    350                 // On interruption, re-check runState
    351             }
    352         }
    353     }
    354 
    355     // 检查一个获取任务失败的线程能否退出
    356     private boolean workerCanExit() {
    357         final ReentrantLock mainLock = this.mainLock;
    358         mainLock.lock();
    359         boolean canExit;
    360         try {
    361             // 可以退出的条件是状态为STOP或TERMINATED或至少有一个处理非空队列的线程(在允许超时的情况下)
    362             canExit = runState >= STOP ||
    363                 workQueue.isEmpty() ||
    364                 (allowCoreThreadTimeOut &&
    365                  poolSize > Math.max(1, corePoolSize));
    366         } finally {
    367             mainLock.unlock();
    368         }
    369         return canExit;
    370     }
    371 
    372     // 中断其他线程
    373     void interruptIdleWorkers() {
    374         final ReentrantLock mainLock = this.mainLock;
    375         mainLock.lock();
    376         try {
    377             // 遍历工作线程
    378             for (Worker w : workers)
    379                 // 尝试中断闲置线程
    380                 w.interruptIfIdle();
    381         } finally {
    382             mainLock.unlock();
    383         }
    384     }
    385     // 工作线程退出要处理的逻辑 
    386     void workerDone(Worker w) {
    387         final ReentrantLock mainLock = this.mainLock;
    388         mainLock.lock();
    389         try {
    390             completedTaskCount += w.completedTasks;
    391             workers.remove(w);//从工作线程缓存中删除  
    392             if (--poolSize == 0)//poolSize减一,这时其实又可以创建工作线程了  
    393                 tryTerminate();//尝试终止  
    394         } finally {
    395             mainLock.unlock();
    396         }
    397     }
    398 
    399     // 尝试终止
    400     private void tryTerminate() {
    401         //终止的前提条件就是线程池里已经没有工作线程(Worker)了  
    402         if (poolSize == 0) {
    403             int state = runState;
    404             /** 
    405              * 如果当前已经没有了工作线程(Worker),但是线程队列里还有等待的线程任务,则创建一个 
    406              * 工作线程来执行线程队列中等待的任务 
    407              */  
    408             if (state < STOP && !workQueue.isEmpty()) {
    409                 state = RUNNING; // disable termination check below
    410                 Thread t = addThread(null);
    411                 if (t != null)
    412                     t.start();
    413             }
    414             // 设置池状态为终止状态  
    415             if (state == STOP || state == SHUTDOWN) {
    416                 runState = TERMINATED;
    417                 termination.signalAll();
    418                 terminated();
    419             }
    420         }
    421     }
    422     // 发起一个有序的关闭在以前已提交任务的执行,但不接受新任务。如果已经关闭,调用不会有其他影响。
    423     public void shutdown() {
    424         // Gets the system security interface.
    425         SecurityManager security = System.getSecurityManager();
    426         if (security != null)
    427             // 检查权限(以抛出异常的形式)
    428             security.checkPermission(shutdownPerm);
    429         final ReentrantLock mainLock = this.mainLock;
    430         mainLock.lock();
    431         try {
    432             if (security != null) { // 检查调用者是否能修改线程
    433                 for (Worker w : workers)
    434                     security.checkAccess(w.thread);
    435             }
    436             // 获取运行状态
    437             int state = runState;
    438             // 小于SHUTDOWN的不就是RUNNING么。。。
    439             if (state < SHUTDOWN)
    440                 runState = SHUTDOWN;
    441 
    442             try {
    443                 for (Worker w : workers) {
    444                     // 中断线程
    445                     w.interruptIfIdle();
    446                 }
    447             } catch (SecurityException se) { // Try to back out
    448                 runState = state;
    449                 // tryTerminate() here would be a no-op
    450                 throw se;
    451             }
    452             // 尝试终止
    453             tryTerminate(); // Terminate now if pool and queue empty
    454         } finally {
    455             mainLock.unlock();
    456         }
    457     }
    458 
    459     
    460     public List<Runnable> shutdownNow() {
    461         SecurityManager security = System.getSecurityManager();
    462         if (security != null)
    463             security.checkPermission(shutdownPerm);
    464 
    465         final ReentrantLock mainLock = this.mainLock;
    466         mainLock.lock();
    467         try {
    468             if (security != null) { // Check if caller can modify our threads
    469                 for (Worker w : workers)
    470                     security.checkAccess(w.thread);
    471             }
    472 
    473             int state = runState;
    474             // 与上一个方法主要区别在于状态和interruptNow方法
    475             if (state < STOP)
    476                 runState = STOP;
    477 
    478             try {
    479                 for (Worker w : workers) {
    480                     w.interruptNow();
    481                 }
    482             } catch (SecurityException se) { // Try to back out
    483                 runState = state;
    484                 // tryTerminate() here would be a no-op
    485                 throw se;
    486             }
    487 
    488             List<Runnable> tasks = drainQueue();
    489             tryTerminate(); // Terminate now if pool and queue empty
    490             return tasks;
    491         } finally {
    492             mainLock.unlock();
    493         }
    494     }
    495 
    496     // 清空队列
    497     private List<Runnable> drainQueue() {
    498         List<Runnable> taskList = new ArrayList<Runnable>();
    499         // 将队列中的所有元素一到taskList中
    500         workQueue.drainTo(taskList);
    501         while (!workQueue.isEmpty()) {
    502             Iterator<Runnable> it = workQueue.iterator();
    503             try {
    504                 if (it.hasNext()) {
    505                     Runnable r = it.next();
    506                     // 从workQueue中移除,并添加到taskList中
    507                     if (workQueue.remove(r))
    508                         taskList.add(r);
    509                 }
    510             } catch (ConcurrentModificationException ignore) {
    511             }
    512         }
    513         return taskList;
    514     }
    515 
    516     public boolean isShutdown() {
    517         return runState != RUNNING;
    518     }
    519 
    520     
    521     boolean isStopped() {
    522         return runState == STOP;
    523     }
    524 
    525     
    526     public boolean isTerminating() {
    527         int state = runState;
    528         return state == SHUTDOWN || state == STOP;
    529     }
    530 
    531     public boolean isTerminated() {
    532         return runState == TERMINATED;
    533     }
    534     
    535     public boolean awaitTermination(long timeout, TimeUnit unit)
    536         throws InterruptedException {
    537         long nanos = unit.toNanos(timeout);
    538         final ReentrantLock mainLock = this.mainLock;
    539         mainLock.lock();
    540         try {
    541             for (;;) {
    542                 if (runState == TERMINATED)
    543                     return true;
    544                 if (nanos <= 0)
    545                     return false;
    546                 nanos = termination.awaitNanos(nanos);
    547             }
    548         } finally {
    549             mainLock.unlock();
    550         }
    551     }
    552 
    553     
    554     protected void finalize()  {
    555         shutdown();
    556     }
    557 
    558     
    559     public void setThreadFactory(ThreadFactory threadFactory) {
    560         if (threadFactory == null)
    561             throw new NullPointerException();
    562         this.threadFactory = threadFactory;
    563     }
    564 
    565     
    566     public ThreadFactory getThreadFactory() {
    567         return threadFactory;
    568     }
    569 
    570     
    571     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
    572         if (handler == null)
    573             throw new NullPointerException();
    574         this.handler = handler;
    575     }
    576 
    577     
    578     public RejectedExecutionHandler getRejectedExecutionHandler() {
    579         return handler;
    580     }
    581 
    582     // 设置核心线程数 这里的设置将覆盖构造方法中的设置
    583     // 如果小于构造方法的设置,多余的线程将被闲置
    584     // 如果大于构造方法的设置,新线程将被用于执行排队的任务
    585     public void setCorePoolSize(int corePoolSize) {
    586         if (corePoolSize < 0)
    587             throw new IllegalArgumentException();
    588         final ReentrantLock mainLock = this.mainLock;
    589         mainLock.lock();
    590         try {
    591             int extra = this.corePoolSize - corePoolSize;
    592             this.corePoolSize = corePoolSize;
    593             // 大于构造方法的设置
    594             if (extra < 0) {
    595                 int n = workQueue.size(); 
    596                 while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) {
    597                     Thread t = addThread(null);
    598                     if (t != null)
    599                         t.start();
    600                     else
    601                         break;
    602                 }
    603             }
    604             // 小于构造方法的设置
    605             else if (extra > 0 && poolSize > corePoolSize) {
    606                 try {
    607                     Iterator<Worker> it = workers.iterator();
    608                     while (it.hasNext() &&
    609                            extra-- > 0 &&
    610                            poolSize > corePoolSize &&
    611                            workQueue.remainingCapacity() == 0)
    612                         it.next().interruptIfIdle();
    613                 } catch (SecurityException ignore) {
    614                     // Not an error; it is OK if the threads stay live
    615                 }
    616             }
    617         } finally {
    618             mainLock.unlock();
    619         }
    620     }
    621 
    622     
    623     public int getCorePoolSize() {
    624         return corePoolSize;
    625     }
    626 
    627     
    628     public boolean prestartCoreThread() {
    629         return addIfUnderCorePoolSize(null);
    630     }
    631 
    632     
    633     public int prestartAllCoreThreads() {
    634         int n = 0;
    635         while (addIfUnderCorePoolSize(null))
    636             ++n;
    637         return n;
    638     }
    639 
    640     
    641     public boolean allowsCoreThreadTimeOut() {
    642         return allowCoreThreadTimeOut;
    643     }
    644 
    645     
    646     public void allowCoreThreadTimeOut(boolean value) {
    647         if (value && keepAliveTime <= 0)
    648             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    649 
    650         allowCoreThreadTimeOut = value;
    651     }
    652 
    653     // 设置所允许的最大的线程数。这将覆盖在构造函数中设置的任何值。如果新值小于当前值,多余的现有线程将被终止时,他们成为闲置。
    654     public void setMaximumPoolSize(int maximumPoolSize) {
    655         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
    656             throw new IllegalArgumentException();
    657         final ReentrantLock mainLock = this.mainLock;
    658         mainLock.lock();
    659         try {
    660             int extra = this.maximumPoolSize - maximumPoolSize;
    661             this.maximumPoolSize = maximumPoolSize;
    662             if (extra > 0 && poolSize > maximumPoolSize) {
    663                 try {
    664                     Iterator<Worker> it = workers.iterator();
    665                     while (it.hasNext() &&
    666                            extra > 0 &&
    667                            poolSize > maximumPoolSize) {
    668                         it.next().interruptIfIdle();
    669                         --extra;
    670                     }
    671                 } catch (SecurityException ignore) {
    672                     // Not an error; it is OK if the threads stay live
    673                 }
    674             }
    675         } finally {
    676             mainLock.unlock();
    677         }
    678     }
    679 
    680     
    681     public int getMaximumPoolSize() {
    682         return maximumPoolSize;
    683     }
    684 
    685     
    686     public void setKeepAliveTime(long time, TimeUnit unit) {
    687         if (time < 0)
    688             throw new IllegalArgumentException();
    689         if (time == 0 && allowsCoreThreadTimeOut())
    690             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    691         this.keepAliveTime = unit.toNanos(time);
    692     }
    693 
    694     
    695     public long getKeepAliveTime(TimeUnit unit) {
    696         return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
    697     }
    698 
    699     
    700     public BlockingQueue<Runnable> getQueue() {
    701         return workQueue;
    702     }
    703 
    704     
    705     public boolean remove(Runnable task) {
    706         return getQueue().remove(task);
    707     }
    708 
    709     // 移除所有被取消的任务
    710     public void purge() {
    711         // Fail if we encounter interference during traversal
    712         try {
    713             Iterator<Runnable> it = getQueue().iterator();
    714             while (it.hasNext()) {
    715                 Runnable r = it.next();
    716                 if (r instanceof Future<?>) {
    717                     Future<?> c = (Future<?>)r;
    718                     if (c.isCancelled())
    719                         it.remove();
    720                 }
    721             }
    722         }
    723         catch (ConcurrentModificationException ex) {
    724             return;
    725         }
    726     }
    727 
    728     
    729     public int getPoolSize() {
    730         return poolSize;
    731     }
    732 
    733     // 获取活跃线程数
    734     public int getActiveCount() {
    735         final ReentrantLock mainLock = this.mainLock;
    736         mainLock.lock();
    737         try {
    738             int n = 0;
    739             for (Worker w : workers) {
    740                 if (w.isActive())
    741                     ++n;
    742             }
    743             return n;
    744         } finally {
    745             mainLock.unlock();
    746         }
    747     }
    748 
    749     
    750     public int getLargestPoolSize() {
    751         final ReentrantLock mainLock = this.mainLock;
    752         mainLock.lock();
    753         try {
    754             return largestPoolSize;
    755         } finally {
    756             mainLock.unlock();
    757         }
    758     }
    759 
    760     
    761     public long getTaskCount() {
    762         final ReentrantLock mainLock = this.mainLock;
    763         mainLock.lock();
    764         try {
    765             long n = completedTaskCount;
    766             for (Worker w : workers) {
    767                 // 统计已经完成的任务
    768                 n += w.completedTasks;
    769                 // 如果w是活跃线程,说明正在执行一个任务,所以n加一
    770                 if (w.isActive())
    771                     ++n;
    772             }
    773             // 加上队列中的任务
    774             return n + workQueue.size();
    775         } finally {
    776             mainLock.unlock();
    777         }
    778     }
    779 
    780     // 获取已完成的任务数
    781     public long getCompletedTaskCount() {
    782         final ReentrantLock mainLock = this.mainLock;
    783         mainLock.lock();
    784         try {
    785             long n = completedTaskCount;
    786             for (Worker w : workers)
    787                 n += w.completedTasks;
    788             return n;
    789         } finally {
    790             mainLock.unlock();
    791         }
    792     }
    793 
    794     
    795     protected void beforeExecute(Thread t, Runnable r) { }
    796 
    797     
    798     protected void afterExecute(Runnable r, Throwable t) { }
    799 
    800     
    801     protected void terminated() { }
    802 
    803     // 实现了RejectedExecutionHandler,即是一个拒绝执行的Handler
    804     public static class CallerRunsPolicy implements RejectedExecutionHandler {
    805         
    806         public CallerRunsPolicy() { }
    807 
    808     
    809         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    810             if (!e.isShutdown()) {
    811                 r.run();
    812             }
    813         }
    814     }
    815 
    816     
    817     public static class AbortPolicy implements RejectedExecutionHandler {
    818         
    819         public AbortPolicy() { }
    820 
    821         
    822         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    823             throw new RejectedExecutionException();
    824         }
    825     }
    826 
    827     
    828     public static class DiscardPolicy implements RejectedExecutionHandler {
    829         
    830         public DiscardPolicy() { }
    831 
    832         
    833         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    834         }
    835     }
    836 
    837     
    838     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    839         
    840         public DiscardOldestPolicy() { }
    841 
    842         
    843         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    844             if (!e.isShutdown()) {
    845                 e.getQueue().poll();
    846                 e.execute(r);
    847             }
    848         }
    849     }
    850 }

        可以参考http://xtu-xiaoxin.iteye.com/blog/647744

        从上面的框架结构图中可以可以看出剩下的就是ScheduledThreadPoolExecutor和Executors。Executors是一个工具类,提供一些工厂和实用方法。

        下面看ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor并实现了ScheduledExecutorService接口。

        ScheduledThreadPoolExecutor

    ScheduledThreadPoolExecutor
    // 可以安排指定时间或周期性的执行任务的ExecutorService
    public class ScheduledThreadPoolExecutor
            extends ThreadPoolExecutor
            implements ScheduledExecutorService {
        // 在Shutdown的时候如果要取消或关闭任务,设置为false;true表示继续执行任务,在Shutdown之后
        private volatile boolean continueExistingPeriodicTasksAfterShutdown;
        // false表示在Shutdown的时候取消Delayed的任务;true表示执行这个任务
        private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
        // 打破调度联系,进而保证先进先出的顺序捆绑项目中的序列号
        private static final AtomicLong sequencer = new AtomicLong(0);
        // 基准时间
        private static final long NANO_ORIGIN = System.nanoTime();
        // 当前时间(相对于基准时间的值)
        final long now() {
            return System.nanoTime() - NANO_ORIGIN;
        }
        // RunnableScheduledFuture接口表示是否是周期性的
        private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
            private final long sequenceNumber;
            // 预定安排执行的时刻
            private long time;
            // 表示重复任务,0表示不重复,正数表示固定比率,负数表示固定延时
            private final long period;
            // 构造方法,构造一个只执行一次的任务
            ScheduledFutureTask(Runnable r, V result, long ns) {
                super(r, result);
                this.time = ns;
                this.period = 0;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
            // 构造方法,构造一个按指定ns开始执行,指定period周期性执行
            ScheduledFutureTask(Runnable r, V result, long ns, long period) {
                super(r, result);
                this.time = ns;
                this.period = period;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
            // 构造方法,构造一个延时执行的任务
            ScheduledFutureTask(Callable<V> callable, long ns) {
                super(callable);
                this.time = ns;
                this.period = 0;
                this.sequenceNumber = sequencer.getAndIncrement();
            }
            // 按指定单位获取延时时间
            public long getDelay(TimeUnit unit) {
                return unit.convert(time - now(), TimeUnit.NANOSECONDS);
            }
            // 判断传入延时和这个任务延时之间的大小关系
            public int compareTo(Delayed other) {
                // 为什么可以和Delayed比较?因为这个类实现了RunnableScheduledFuture接口,而RunnableScheduledFuture接口继承自Delayed接口
                if (other == this) // compare zero ONLY if same object
                    return 0;
                // other是ScheduledFutureTask实例
                if (other instanceof ScheduledFutureTask) {
                    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                    long diff = time - x.time;
                    // 比较大小
                    if (diff < 0)
                        return -1;
                    else if (diff > 0)
                        return 1;
                    else if (sequenceNumber < x.sequenceNumber)
                        return -1;
                    else
                        return 1;
                }
                long d = (getDelay(TimeUnit.NANOSECONDS) -
                          other.getDelay(TimeUnit.NANOSECONDS));
                return (d == 0)? 0 : ((d < 0)? -1 : 1);
            }
            // 是否周期性的(包括延时的情况)
            public boolean isPeriodic() {
                return period != 0;
            }
            // 执行周期性的任务
            private void runPeriodic() {
                // 执行任务
                boolean ok = ScheduledFutureTask.super.runAndReset();
                // 判断是否已经shutdown
                boolean down = isShutdown();
                // 重新安排任务(如果没有shutdown或在没有关闭且允许在shutdown之后执行已存在的任务)
                if (ok && (!down ||
                           (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                            !isStopped()))) {
                    long p = period;
                    if (p > 0)
                        // 计算下一次执行的时间
                        time += p;
                    else
                        // 计算触发时间
                        time = triggerTime(-p);
                    // 将任务添加到队列中
                    ScheduledThreadPoolExecutor.super.getQueue().add(this);
                }
                else if (down)
                    interruptIdleWorkers();
            }
            // 执行任务,根据是否周期性调用不同的方法
            public void run() {
                if (isPeriodic())
                    runPeriodic();
                else
                    ScheduledFutureTask.super.run();
            }
        }
        // 延迟执行
        private void delayedExecute(Runnable command) {
            // 如果已经shutdown,决绝任务
            if (isShutdown()) {
                reject(command);
                return;
            }
            if (getPoolSize() < getCorePoolSize())
                // 预启动线程
                prestartCoreThread();
    
            super.getQueue().add(command);
        }
    
        // 取消和清除关闭政策不允许运行的任务
        private void cancelUnwantedTasks() {
            // 获取shutdown策略
            boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
            boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
            if (!keepDelayed && !keepPeriodic)
                super.getQueue().clear();
            else if (keepDelayed || keepPeriodic) {
                Object[] entries = super.getQueue().toArray();
                for (int i = 0; i < entries.length; ++i) {
                    Object e = entries[i];
                    if (e instanceof RunnableScheduledFuture) {
                        RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
                        // 根据是否周期性的任务通过制定的值判断进行取消操作
                        if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
                            t.cancel(false);
                    }
                }
                entries = null;
                // 净化,移除已经取消的任务
                purge();
            }
        }
    
        public boolean remove(Runnable task) {
            if (!(task instanceof RunnableScheduledFuture))
                return false;
            return getQueue().remove(task);
        }
    
        protected <V> RunnableScheduledFuture<V> decorateTask(
            Runnable runnable, RunnableScheduledFuture<V> task) {
            return task;
        }
    
        protected <V> RunnableScheduledFuture<V> decorateTask(
            Callable<V> callable, RunnableScheduledFuture<V> task) {
            return task;
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                 ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                  new DelayedWorkQueue(), threadFactory);
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                  RejectedExecutionHandler handler) {
            super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                  new DelayedWorkQueue(), handler);
        }
    
        private long triggerTime(long delay, TimeUnit unit) {
             return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
        }
    
        long triggerTime(long delay) {
             return now() +
                 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }
        // 避免移除,返回延迟的值
        private long overflowFree(long delay) {
            Delayed head = (Delayed) super.getQueue().peek();
            if (head != null) {
                long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
                if (headDelay < 0 && (delay - headDelay < 0))
                    delay = Long.MAX_VALUE + headDelay;
            }
            return delay;
        }
     
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
                  new DelayedWorkQueue(), threadFactory, handler);
        }
        // 根据执行的延时时间执行任务
        public ScheduledFuture<?> schedule(Runnable command,
                                           long delay,
                                           TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            // ScheduledFutureTask的result为null
            RunnableScheduledFuture<?> t = decorateTask(command,
                new ScheduledFutureTask<Void>(command, null,
                                              triggerTime(delay, unit)));
            // 延时执行
            delayedExecute(t);
            return t;
        }
        // 上一个方法的重载形式,接收的是Callable
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay,
                                               TimeUnit unit) {
            if (callable == null || unit == null)
                throw new NullPointerException();
            RunnableScheduledFuture<V> t = decorateTask(callable,
                new ScheduledFutureTask<V>(callable,
                              triggerTime(delay, unit)));
            delayedExecute(t);
            return t;
        }
        /**
         * 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit
         * 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...
         */
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            RunnableScheduledFuture<?> t = decorateTask(command,
                new ScheduledFutureTask<Object>(command,
                                                null,
                                                triggerTime(initialDelay, unit),
                                                unit.toNanos(period)));
            delayedExecute(t);
            return t;
        }
        /**
         * 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit
         * 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...
         */
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (delay <= 0)
                throw new IllegalArgumentException();
            RunnableScheduledFuture<?> t = decorateTask(command,
                new ScheduledFutureTask<Boolean>(command,
                                                 null,
                                                 triggerTime(initialDelay, unit),
                                                 unit.toNanos(-delay)));
            delayedExecute(t);
            return t;
        }
    
    
        // 执行任务
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            // 立即执行,延时时间是0
            schedule(command, 0, TimeUnit.NANOSECONDS);
        }
    
        // 重新 AbstractExecutorService 的方法
    
        public Future<?> submit(Runnable task) {
            return schedule(task, 0, TimeUnit.NANOSECONDS);
        }
    
        public <T> Future<T> submit(Runnable task, T result) {
            return schedule(Executors.callable(task, result),
                            0, TimeUnit.NANOSECONDS);
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            return schedule(task, 0, TimeUnit.NANOSECONDS);
        }
    
        
        public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
            continueExistingPeriodicTasksAfterShutdown = value;
            if (!value && isShutdown())
                cancelUnwantedTasks();
        }
    
        
        public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
            return continueExistingPeriodicTasksAfterShutdown;
        }
    
       
        public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
            executeExistingDelayedTasksAfterShutdown = value;
            if (!value && isShutdown())
                cancelUnwantedTasks();
        }
    
        
        public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
            return executeExistingDelayedTasksAfterShutdown;
        }
    
    
        // 关闭
        public void shutdown() {
            // 取消任务
            cancelUnwantedTasks();
            super.shutdown();
        }
    
        // 立即关闭,调用的是父类立即关闭的方法
        public List<Runnable> shutdownNow() {
            return super.shutdownNow();
        }
    
        // 返回使用这个执行器的任务队列
        public BlockingQueue<Runnable> getQueue() {
            return super.getQueue();
        }
    
        // 将DelayQueue<RunnableScheduledFuture> 包装为 BlockingQueue<Runnable>的类
        // 类似于代理
        private static class DelayedWorkQueue
            extends AbstractCollection<Runnable>
            implements BlockingQueue<Runnable> {
    
            private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
            public Runnable poll() { return dq.poll(); }
            public Runnable peek() { return dq.peek(); }
            public Runnable take() throws InterruptedException { return dq.take(); }
            public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
                return dq.poll(timeout, unit);
            }
    
            public boolean add(Runnable x) {
            return dq.add((RunnableScheduledFuture)x);
        }
            public boolean offer(Runnable x) {
            return dq.offer((RunnableScheduledFuture)x);
        }
            public void put(Runnable x) {
                dq.put((RunnableScheduledFuture)x);
            }
            public boolean offer(Runnable x, long timeout, TimeUnit unit) {
                return dq.offer((RunnableScheduledFuture)x, timeout, unit);
            }
    
            public Runnable remove() { return dq.remove(); }
            public Runnable element() { return dq.element(); }
            public void clear() { dq.clear(); }
            public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
            public int drainTo(Collection<? super Runnable> c, int maxElements) {
                return dq.drainTo(c, maxElements);
            }
    
            public int remainingCapacity() { return dq.remainingCapacity(); }
            public boolean remove(Object x) { return dq.remove(x); }
            public boolean contains(Object x) { return dq.contains(x); }
            public int size() { return dq.size(); }
            public boolean isEmpty() { return dq.isEmpty(); }
            public Object[] toArray() { return dq.toArray(); }
            public <T> T[] toArray(T[] array) { return dq.toArray(array); }
            public Iterator<Runnable> iterator() {
                return new Iterator<Runnable>() {
                    private Iterator<RunnableScheduledFuture> it = dq.iterator();
                    public boolean hasNext() { return it.hasNext(); }
                    public Runnable next() { return it.next(); }
                    public void remove() { it.remove(); }
                };
            }
        }
    }

        在代码中都加了注释,我想大致能解释清楚吧。

        Executor涉及的类还是比较多的,到此为止剩下的还有Executors

        Executors

        Executors中所定义的 ExecutorExecutorServiceScheduledExecutorServiceThreadFactoryCallable 类的工厂和实用方法。此类支持以下各种方法:

    • 创建并返回设置有常用配置字符串的 ExecutorService 的方法。
    • 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
    • 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
    • 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
    • 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。 

         Executors提供的都是工具形式的方法,所以都是static的,并且这个类也没有必要实例化,所以它的构造方法时private的。下面主要看一下几个内部类。

       RunnableAdapter

     1 static final class RunnableAdapter<T> implements Callable<T> {
     2         final Runnable task;
     3         final T result;
     4         RunnableAdapter(Runnable  task, T result) {
     5             this.task = task;
     6             this.result = result;
     7         }
     8         public T call() {
     9             task.run();
    10             return result;
    11         }
    12 }

        适配器。以Callable的形式执行Runnable并且返回给定的result。

        PrivilegedCallable

     1 static final class PrivilegedCallable<T> implements Callable<T> {
     2     private final AccessControlContext acc;
     3     private final Callable<T> task;
     4     private T result;
     5     private Exception exception;
     6     PrivilegedCallable(Callable<T> task) {
     7         this.task = task;
     8         this.acc = AccessController.getContext();
     9     }
    10 
    11     public T call() throws Exception {
    12         AccessController.doPrivileged(new PrivilegedAction<T>() {
    13             public T run() {
    14                 try {
    15                     result = task.call();
    16                 } catch (Exception ex) {
    17                     exception = ex;
    18                 }
    19                 return null;
    20             }
    21         }, acc);
    22         if (exception != null)
    23             throw exception;
    24         else
    25             return result;
    26     }
    27 }

        在访问控制下运行的Callable。涉及到Java.security包中的内容。

        PrivilegedCallableUsingCurrentClassLoader类与上面的PrivilegedCallable类似,只是使用的是CurrentClassLoader。

        DefaultThreadFactory

     1     static class DefaultThreadFactory implements ThreadFactory {
     2         static final AtomicInteger poolNumber = new AtomicInteger(1);
     3         final ThreadGroup group;
     4         final AtomicInteger threadNumber = new AtomicInteger(1);
     5         final String namePrefix;
     6 
     7         DefaultThreadFactory() {
     8             SecurityManager s = System.getSecurityManager();
     9             group = (s != null)? s.getThreadGroup() :
    10                                  Thread.currentThread().getThreadGroup();
    11             namePrefix = "pool-" +
    12                           poolNumber.getAndIncrement() +
    13                          "-thread-";
    14         }
    15 
    16         public Thread newThread(Runnable r) {
    17             // 调用Thread构造方法创建线程
    18             Thread t = new Thread(group, r,
    19                                   namePrefix + threadNumber.getAndIncrement(),
    20                                   0);
    21             // 取消守护线程设置
    22             if (t.isDaemon())
    23                 t.setDaemon(false);
    24             // 设置默认优先级
    25             if (t.getPriority() != Thread.NORM_PRIORITY)
    26                 t.setPriority(Thread.NORM_PRIORITY);
    27             return t;
    28         }
    29     }

        DefaultThreadFactory 是默认的线程工程,提供创建线程的方法。

        PrivilegedThreadFactory继承自DefaultThreadFactory,区别在于线程执行的run方法指定了classLoader并受到权限的控制。

        DelegatedExecutorService继承自AbstractExecutorService,是一个包装类,暴露ExecutorService的方法。

        DelegatedScheduledExecutorService继承自DelegatedExecutorService,实现了ScheduledExecutorService接口。它也是一个包装类,公开ScheduledExecutorService方法。 

    如果本文对您有帮助,点一下右下角的“推荐”
  • 相关阅读:
    参数传递(值传递与引用传递)
    存入redis中的java对象都需要序列化
    windows环境Apache服务器启动失败的原因
    IDEA/Eclipse安装 Alibaba Java Coding Guidelines 插件
    使用freemarker对模板进行渲染
    java使用freemarker作为模板导出Excel表格
    mybatis 中 foreach collection的三种用法
    利用freemarker导出页面格式复杂的excel
    mysql limit和offset用法
    设计模式之二十一:中介者模式(Mediator)
  • 原文地址:https://www.cnblogs.com/hzmark/p/JavaExecutor.html
Copyright © 2011-2022 走看看