zoukankan      html  css  js  c++  java
  • JUC 线程池的使用与源码解析

    线程池创建与使用

    线程池的创建

    Executors 框架提供了各种类型的线程池,主要有以下工厂方法∶

    • public static ExecutorService newFixedThreadPool(int nThreads)
    • public static ExecutorService newCachedThreadPool()
    • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
    • public static ExecutorService newSingleThreadExecutor()
    • public static ScheduledExecutorService newSingleThreadScheduledExecutor()

    corePoolSize > 1 的方法名称以 Pool 结尾,等于 1 的以 Executor 结尾。

    Executors 的真正实现类主要包括两个 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor。

    newFixedThreadPool

    线程数达到核心线程数之后不会再新建线程(一方面是队列是无边界的,另一方面是 corePoolSize = maximumPoolSize)。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize = maximumPoolSize
                                      0L, TimeUnit.MILLISECONDS,
                                      // 无界队列,所以即使设置了最大线程数,线程池中的线程数量也不会达到最大线程数
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    newSingleThreadExecutor

    类似于 newFixedThreadPool,只是将 corePoolSize 和 maximumPoolSize 都设置为 1。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, // corePoolSize = maximumPoolSize = 1
                                    0L, TimeUnit.MILLISECONDS,
                                    // 无界队列
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    newCachedThreadPool
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    newScheduledThreadPool

    可以指定核心线程数,用于创建一个用于执行周期性或者定时任务的线程池。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              // 延迟队列
              new DelayedWorkQueue());
    }
    

    ScheduledExecutorService 的核心方法如下:

    // 延迟 delay 时间后,执行 Runnable 任务
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
    
    // 延迟 delay 时间后,执行 Callable 任务
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    
    // 定时任务
    // 以上一个任务开始时间开始计时,period 时间后,如果上一个任务已完成,则立即执行,否则等待上一个任务完成后再执行
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    
    // 延迟任务
    // 以上一个任务的结束时间开始计时,delay 时间后,立即执行
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
    

    delay 或者 period 时间后,表示任务可以立即执行,但是也要先获取到线程才会立马执行,否则会先阻塞等待获取线程,和一般的线程池逻辑类似。

    线程池的使用

    等待所有任务线程执行完成

    引用:

    ExecutorService等待线程完成后优雅结束

    How to wait for all threads to finish, using ExecutorService?

    方法一:shutdown() / shutdownNow() + awaitTermination()
    ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
    
    while(...) {
        taskExecutor.execute(new MyTask());
    }
    
    // 线程池暂停接收新的任务
    taskExecutor.shutdown();
    
    try {
        // 等待所有任务完成
        taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    } catch (InterruptedException e) {
        // ...
    }
    
    方法二:invokeAll + shutdown() / shutdownNow() + awaitTermination()

    我们可以用来运行线程的第一种方法是 invokeAll() 方法,在所有任务完成或超时到期后,该方法返回 Future 对象列表。

    此外,我们必须注意返回的 Future 对象的顺序与提供的 Callable 对象的列表相同:

    ExecutorService taskExecutor = Executors.newFixedThreadPool(10);
    
    // your tasks
    List<Callable<String>> callables = Arrays.asList(new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000));
    
    // invokeAll() returns when all tasks are complete
    List<Future<String>> futures = taskExecutor.invokeAll(callables);
    
    taskExecutor.shutdown();
    
    try {
        // 等待所有任务完成
        taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    } catch (InterruptedException e) {
        // ...
    }
    
    方法三:使用 CountDownLatch
    ExecutorService taskExecutor = Executors.newFixedThreadPool(10);
    CountDownLatch latch = new CountDownLatch(2);
    
    for(int i = 0; i < 2; i++){
        taskExecutor.submit(() -> {
            try {
                // 业务逻辑...
                latch.countDown();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
    
    // wait for the latch to be decremented by the two remaining threads
    latch.await();
    
    方法四:使用 ExecutorCompletionService

    运行多个线程的另一种方法是使用 ExecutorCompletionService,它使用提供的 ExecutorService 来执行任务。

    与 invokeAll 的一个区别是返回表示执行任务的 Futures 的顺序。ExecutorCompletionService 使用队列按结束顺序存储结果,而 invokeAll 返回一个列表,该列表具有与给定任务列表的迭代器生成的顺序相同的顺序:

    CompletionService<String> service = new ExecutorCompletionService<>(WORKER_THREAD_POOL);
    
    List<Callable<String>> callables = Arrays.asList(new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000));
    
    for (Callable<String> callable : callables) {
          service.submit(callable);
    }
    
    方法五:使用 Java8 的 CompletableFuture
    ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
    List<Runnable> tasks = getTasks();
    CompletableFuture<?>[] futures = tasks.stream()
                                   .map(task -> CompletableFuture.runAsync(task, taskExecutor))
                                   .toArray(CompletableFuture[]::new);
    // 等待所有任务执行完成
    CompletableFuture.allOf(futures).join();    
    taskExecutor.shutdown();
    

    线程池的配置及优化

    配置主要是 ThreadPoolExecutor 构造方法的参数配置。

    coreThreadPoolSize

    每个线程都需要一定的栈内存空间。在最近的64位JVM中, 默认的栈大小 是1024KB。如果服务器收到大量请求,或者handleRequest方法执行很慢,服务器可能因为创建了大量线程而崩溃。例如有1000个并行的请求,创建出来的1000个线程需要使用1GB的JVM内存作为线程栈空间。另外,每个线程代码执行过程中创建的对象,还可能会在堆上创建对象。这样的情况恶化下去,将会超出JVM堆内存,并产生大量的垃圾回收操作,最终引发 内存溢出(OutOfMemoryErrors) 。

    这些线程不仅仅会消耗内存,它们还会使用其他有限的资源,例如文件句柄、数据库连接等。不可控的创建线程,还可能引发其他类型的错误和崩溃。因此,避免资源耗尽的一个重要方式,就是避免不可控的数据结构。

    顺便说下,由于线程栈大小引发的内存问题,可以通过-Xss开关来调整栈大小。缩小线程栈大小之后,可以减少每个线程的开销,但是可能会引发 栈溢出(StackOverflowErrors) 。对于一般应用程序而言,默认的1024KB过于富裕,调小为256KB或者512KB可能更为合适。Java允许的最小值是160KB。

    CPU密集型任务应配置尽可能小的线程,如配置 cpu 核数 + 1 个线程的线程池,减少线程的切换。

    IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 cpu 核数 * 2

    对于 IO 型的任务的最佳线程数,有个公式可以计算

    $$Nthreads = NCPU * UCPU * (1 + W/C)$$

    其中:

    ​ * NCPU 是处理器的核的数目

    ​ * UCPU 是期望的 CPU 利用率(该值应该介于 0 和 1 之间)

    ​ * W / C 是等待时间与计算时间的比率

    由于线程数的选定依赖于应用程序的类型,可能需要经过大量性能测试之后,才能得出最优的结果。

    通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用:

    • getTaskCount:线程池已经执行的和未执行的任务总数(所有线程的 completedTaskCount 数量加上阻塞队列中的元素个数);
    • getCompletedTaskCount:线程池已完成的任务数量(所有线程的 completedTaskCount 数量),该值小于等于 taskCount;
    • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize;
    • getPoolSize:线程池当前的线程数量(workers 里元素的个数);
    • getActiveCount:当前线程池中正在执行任务(独占锁被占用)的线程数量。

    通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。

    workQueue
    maximumPoolSize
    threadFactory

    默认使用 DefaultThreadFactory,可以自定义。

    RejectedExecutionHandler

    除了默认的 4 种拒绝策略外,还可以自定义拒绝策略。

    AbstractExecutorService 源码

    AbstractExecutorService 抽象类实现类 ExecutorService 接口。

    FutureTask 源码参考:Future 源码解析

    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    
    
    public abstract class AbstractExecutorService implements ExecutorService {
    
        // 使用线程对象 runnable 和 保存 runnable 执行结果的变量 value 来构造一个 FutureTask 对象
        // newTaskFor 方法使用了适配器模式,可以将 Runnable + value 或者 callable 对象构造成一个 FutureTask 对象
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        // 使用 callable 对象构造一个 FutureTask 对象
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        // 提交任务到线程池,返回一个 FutureTask 对象
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            // 将任务添加到线程池中执行
            execute(ftask);
            return ftask;
        }
    
        // 同上
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        // 同上
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    
        // 批量执行多个任务,但是只要一个任务完成就返回,同时中断其它任务
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                  boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
            // 当前待执行的任务数量
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
            // 创建一个 ExecutorCompletionService 对象,当前线程池对象作为 ExecutorCompletionService 的 executor
            // ExecutorCompletionService 可以维护一批 Future 任务,然后按照任务完成的先后顺序,添加到一个先进先出阻塞队列中
            // 然后通过 take 或者 poll 方法获取到的一个已经执行完成的 Future 任务,可以直接调用 future 任务的 get 方法获取执行结果
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);
    
            // For efficiency, especially in executors with limited
            // parallelism, check to see if previously submitted tasks are
            // done before submitting more of them. This interleaving
            // plus the exception mechanics account for messiness of main
            // loop.
    
            try {
                // Record exceptions so that if we fail to obtain any
                // result, we can throw the last exception we got.
                ExecutionException ee = null;
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
                Iterator<? extends Callable<T>> it = tasks.iterator();
    
                // Start one task for sure; the rest incrementally
                futures.add(ecs.submit(it.next()));
                --ntasks;
                // 当前正在执行的任务数量
                int active = 1;
    
                for (;;) {
                    Future<T> f = ecs.poll();
                    // 从 ecs 的队列里没有获取到任务
                    if (f == null) {
                        // 未执行的任务数量大于 0
                        if (ntasks > 0) {
                            --ntasks;
                            // 继续获取下一个未执行的任务,将其添加到线程池中去执行
                            futures.add(ecs.submit(it.next()));
                            ++active;
                        }
                        else if (active == 0)
                            // 任务都已经执行完成了
                            break;
                        else if (timed) {
                            // 从 ecs 的阻塞队列中获取一个已完成的 Future 对象,可超时
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                            if (f == null)
                                throw new TimeoutException();
                            nanos = deadline - System.nanoTime();
                        }
                        else
                            // 阻塞低从 ecs 的阻塞队列中获取一个已完成的 Future 对象
                            f = ecs.take();
                    }
                    
                    // 从阻塞队列中获取到了一个完成的 Future 任务(直接返回执行结果,并退出方法)
                    if (f != null) {
                        --active;
                        try {
                            // 返回获取到的一个任务的执行结果
                            return f.get();
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
    
                if (ee == null)
                    ee = new ExecutionException();
                throw ee;
    
            } finally {
                // 从阻塞队列中获取到了一个完成的 Future 任务并返回任务的执行结果,然后将所有的任务都中断掉
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            try {
                return doInvokeAny(tasks, false, 0);
            } catch (TimeoutException cannotHappen) {
                assert false;
                return null;
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return doInvokeAny(tasks, true, unit.toNanos(timeout));
        }
    
        // 批量执行多个任务
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                // 将多个任务依次添加到线程池中去执行
                for (Callable<T> t : tasks) {
                    RunnableFuture<T> f = newTaskFor(t);
                    // 将任务结果添加到 futures 中
                    futures.add(f);
                    execute(f);
                }
                // 遍历 futures 集合
                for (int i = 0, size = futures.size(); i < size; i++) {
                    Future<T> f = futures.get(i);
                    // 如果任务未完成
                    if (!f.isDone()) {
                        try {
                            // 阻塞获取执行结果
                            f.get();
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        }
                    }
                }
                // 执行到这里,说明所有任务都已完成了
                done = true;
                // 返回执行
                return futures;
            } finally {
                if (!done)
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);
            }
        }
    
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));
    
                final long deadline = System.nanoTime() + nanos;
                final int size = futures.size();
    
                // Interleave time checks and calls to execute in case
                // executor doesn't have any/much parallelism.
                for (int i = 0; i < size; i++) {
                    execute((Runnable)futures.get(i));
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L)
                        return futures;
                }
    
                for (int i = 0; i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) {
                        if (nanos <= 0L)
                            return futures;
                        try {
                            f.get(nanos, TimeUnit.NANOSECONDS);
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        nanos = deadline - System.nanoTime();
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);
            }
        }
    
    }
    

    ThreadPoolExecutor 源码

    对于核心的几个线程池,无论是 newFixedThreadPool() 方法、newCachedThreadPol()、还是 newSingleThreadExecutor() 方法,虽然看起来创建的线程有着完全不同的功能特点,但其内部实现均使用了 ThreadPoolExecutor 实现。

    ThreadPoolExecutor 属性
    public class ThreadPoolExecutor extends AbstractExecutorService {
    
        // ctl 字段存储线程池的当前状态和线程数
        // 高 3 位存放线程池的运行状态 (runState) 
        // 低 29 位存放线程池内有效线程(活跃)的数量 (workerCount)
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
        // 在 Java 中,一个 int 占据 32 位,所以 COUNT_BITS 的结果是 32 - 3 = 29
        private static final int COUNT_BITS = Integer.SIZE - 3;
    
        // CAPACITY 就代表了 workerCount 的上限,它是 ThreadPoolExecutor 中理论上的最大活跃线程数
        // 运算过程为 1 左移 29 位,也就是 00000000 00000000 00000000 00000001 --> 001 0000 00000000 00000000 00000000,再减去1的话,就是 000 11111 11111111 11111111 11111111,前三位代表线程池运行状态 runState,所以这里 workerCount 的理论最大值就应该是 29 个 1,即 536870911
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
    
        // RUNNING:接受新任务,并处理队列任务
        // -1 在 Java 底层是由 32 个 1 表示的,左移 29 位的话,即 111 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位全部为 1 的话,表示 RUNNING 状态,即 -536870912
        private static final int RUNNING    = -1 << COUNT_BITS;
    
        // SHUTDOWN:不接受新任务,但会处理队列任务
        // 在线程池处于 RUNNING 状态时,调用 shutdown() 方法会使线程池进入到该状态(finalize() 方法在执行过程中也会调用 shutdown() 方法进入该状态)
        // 0 在 Java 底层是由 32 个 0 表示的,无论左移多少位,还是 32 个 0,即 000 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位全部为 0 的话,表示 SHUTDOWN 状态,即 0;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
        // STOP:不接受新任务,不会处理队列任务,而且会中断正在处理过程中的任务
        // 在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态
        // 1 在 Java 底层是由前面的 31 个 0 和 1 个 1 组成的,左移 29 位的话,即 001 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位为 001 的话,表示 STOP 状态,即 536870912;
        private static final int STOP       =  1 << COUNT_BITS;
    
        // TIDYING:所有的任务已结束,workerCount 为 0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态
        // 在 Java 底层是由前面的 30 个 0 和 1 个 11 组成的,左移 29 位的话,即 011 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位为 011 的话,表示 TERMINATED 状态,即 1610612736;
        private static final int TIDYING    =  2 << COUNT_BITS;
    
        // TERMINATED:在 terminated() 方法执行完后进入该状态,默认 terminated() 方法中什么也没有做
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
    
        // 传入的 c 代表的是 ctl 的值,即高 3 位为线程池运行状态 runState,低 29 位为线程池中当前活动的线程数量 workerCount
    
        // runState:线程池运行状态,占据 ctl 的高 3 位,有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 五种状态
        // ~ 是按位取反的意思,CAPACITY 表示的是高位的 3 个 0,和低位的 29 个 1,而 ~CAPACITY 则表示高位的 3 个 1,低位的 29 个 0,然后再与入参 c 执行按位与操作,即高 3 位保持原样,低 29 位全部设置为 0,也就获取了线程池的运行状态 runState。
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
        // 取得当前线程池内有效线程的数量
        // workerCount:线程池中当前活动的线程数量,占据 ctl 的低 29 位
        // 将 c 与 CAPACITY 进行与操作 &,也就是与 000 11111 11111111 11111111 11111111 进行与操作,c 的前三位通过与 000 进行与操作,无论 c 前三位为何值,最终都会变成 000,也就是舍弃前三位的值,而 c 的低 29 位与 29 个 1 进行与操作,c 的低 29 位还是会保持原值,这样就从 AtomicInteger ctl 中解析出了 workerCount 的值
        private static int workerCountOf(int c)  { return c & CAPACITY; }
    
        // 原子变量 ctl 的初始化方法
        // 传入的 rs 表示线程池运行状态 runState,其是高 3 位有值,低 29 位全部为 0 的 int,而 wc 则代表线程池中有效线程的数量 workerCount,其为高 3 位全部为 0,而低 29 位有值得 int,将 runState 和 workerCount 做或操作 | 处理,即用 runState 的高 3 位,workerCount 的低 29 位填充的数字,而默认传入的 runState、workerCount 分别为 RUNNING 和 0。
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
        /*
         * Bit field accessors that don't require unpacking ctl.
         * These depend on the bit layout and on workerCount being never negative.
         */
    
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
    
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    
        /**
         * Attempts to CAS-increment the workerCount field of ctl.
         */
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
    
        /**
         * Attempts to CAS-decrement the workerCount field of ctl.
         */
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
        }
    
        /**
         * Decrements the workerCount field of ctl. This is called only on
         * abrupt termination of a thread (see processWorkerExit). Other
         * decrements are performed within getTask.
         */
        private void decrementWorkerCount() {
            do {} while (! compareAndDecrementWorkerCount(ctl.get()));
        }
    
        /**
         * workQueue 是用于持有任务并将其转换成工作线程 worker 的队列
         */
        private final BlockingQueue<Runnable> workQueue;
    
        private final ReentrantLock mainLock = new ReentrantLock();
    
        /**
         * workers 是包含线程池中所有工作线程 worker 的集合
         * 仅仅当获得 mainLock 锁时才能访问它
         */
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        /**
         * Wait condition to support awaitTermination
         */
        private final Condition termination = mainLock.newCondition();
    
        // 记录 workers 集合最大的元素个数
        private int largestPoolSize;
    
        /**
         * 线程池已完成的任务的数量
         */
        private long completedTaskCount;
    
        /**
         * 创建新线程的工厂类
         */
        private volatile ThreadFactory threadFactory;
    
        /**
         * 执行拒绝策略的处理器
         */
        private volatile RejectedExecutionHandler handler;
    
        /**
         * 空闲线程等待工作的超时时间(纳秒),即空闲线程存活时间 
         */
        private volatile long keepAliveTime;
    
        /**
         * 是否允许核心线程超时
         * 默认值为 false,如果为 false,core 线程在空闲时依然存活;如果为 true,则 core 线程等待工作,直到时间超时至 keepAliveTime
         */
        private volatile boolean allowCoreThreadTimeOut;
    
        /**
         * 核心线程池大小,保持存活的工作线程的最小数目,当小于 corePoolSize 时,会直接启动新的一个线程来处理任务,而不管线程池中是否有空闲线程
         */
        private volatile int corePoolSize;
    
        /**
         * 线程池中线程的最大数量
         */
        private volatile int maximumPoolSize;
    
        /**
         * The default rejected execution handler
         */
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
        private static final RuntimePermission shutdownPerm =
            new RuntimePermission("modifyThread");
    
        /* The context to be used when executing the finalizer, or null. */
        private final AccessControlContext acc;
    
        // Public constructors and methods
        // 以下是构造方法
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                null :
            AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    
        private void advanceRunState(int targetState) {
            for (;;) {
                int c = ctl.get();
                if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                    break;
            }
        }
    
        /**
         * Invokes {@code shutdown} when this executor is no longer
         * referenced and it has no threads.
         */
        protected void finalize() {
            SecurityManager sm = System.getSecurityManager();
            if (sm == null || acc == null) {
                shutdown();
            } else {
                PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
                AccessController.doPrivileged(pa, acc);
            }
        }
    
        /**
         * Sets the thread factory used to create new threads.
         *
         * @param threadFactory the new thread factory
         * @throws NullPointerException if threadFactory is null
         * @see #getThreadFactory
         */
        public void setThreadFactory(ThreadFactory threadFactory) {
            if (threadFactory == null)
                throw new NullPointerException();
            this.threadFactory = threadFactory;
        }
    
        /**
         * Returns the thread factory used to create new threads.
         *
         * @return the current thread factory
         * @see #setThreadFactory(ThreadFactory)
         */
        public ThreadFactory getThreadFactory() {
            return threadFactory;
        }
    
        /**
         * Sets a new handler for unexecutable tasks.
         *
         * @param handler the new handler
         * @throws NullPointerException if handler is null
         * @see #getRejectedExecutionHandler
         */
        public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
            if (handler == null)
                throw new NullPointerException();
            this.handler = handler;
        }
    
        /**
         * Returns the current handler for unexecutable tasks.
         *
         * @return the current handler
         * @see #setRejectedExecutionHandler(RejectedExecutionHandler)
         */
        public RejectedExecutionHandler getRejectedExecutionHandler() {
            return handler;
        }
    
        /**
         * Sets the core number of threads.  This overrides any value set
         * in the constructor.  If the new value is smaller than the
         * current value, excess existing threads will be terminated when
         * they next become idle.  If larger, new threads will, if needed,
         * be started to execute any queued tasks.
         *
         * @param corePoolSize the new core size
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         * @see #getCorePoolSize
         */
        public void setCorePoolSize(int corePoolSize) {
            if (corePoolSize < 0)
                throw new IllegalArgumentException();
            int delta = corePoolSize - this.corePoolSize;
            this.corePoolSize = corePoolSize;
            if (workerCountOf(ctl.get()) > corePoolSize)
                interruptIdleWorkers();
            else if (delta > 0) {
                // We don't really know how many new threads are "needed".
                // As a heuristic, prestart enough new workers (up to new
                // core size) to handle the current number of tasks in
                // queue, but stop if queue becomes empty while doing so.
                int k = Math.min(delta, workQueue.size());
                while (k-- > 0 && addWorker(null, true)) {
                    if (workQueue.isEmpty())
                        break;
                }
            }
        }
    
        /**
         * Returns the core number of threads.
         *
         * @return the core number of threads
         * @see #setCorePoolSize
         */
        public int getCorePoolSize() {
            return corePoolSize;
        }
    
        /**
         * Starts a core thread, causing it to idly wait for work. This
         * overrides the default policy of starting core threads only when
         * new tasks are executed. This method will return {@code false}
         * if all core threads have already been started.
         *
         * @return {@code true} if a thread was started
         */
        public boolean prestartCoreThread() {
            return workerCountOf(ctl.get()) < corePoolSize &&
                addWorker(null, true);
        }
    
        /**
         * Same as prestartCoreThread except arranges that at least one
         * thread is started even if corePoolSize is 0.
         */
        void ensurePrestart() {
            int wc = workerCountOf(ctl.get());
            if (wc < corePoolSize)
                addWorker(null, true);
            else if (wc == 0)
                addWorker(null, false);
        }
    
        /**
         * Starts all core threads, causing them to idly wait for work. This
         * overrides the default policy of starting core threads only when
         * new tasks are executed.
         *
         * @return the number of threads started
         */
        public int prestartAllCoreThreads() {
            int n = 0;
            while (addWorker(null, true))
                ++n;
            return n;
        }
    
        /**
         * Returns true if this pool allows core threads to time out and
         * terminate if no tasks arrive within the keepAlive time, being
         * replaced if needed when new tasks arrive. When true, the same
         * keep-alive policy applying to non-core threads applies also to
         * core threads. When false (the default), core threads are never
         * terminated due to lack of incoming tasks.
         *
         * @return {@code true} if core threads are allowed to time out,
         *         else {@code false}
         *
         * @since 1.6
         */
        public boolean allowsCoreThreadTimeOut() {
            return allowCoreThreadTimeOut;
        }
    
        /**
         * Sets the policy governing whether core threads may time out and
         * terminate if no tasks arrive within the keep-alive time, being
         * replaced if needed when new tasks arrive. When false, core
         * threads are never terminated due to lack of incoming
         * tasks. When true, the same keep-alive policy applying to
         * non-core threads applies also to core threads. To avoid
         * continual thread replacement, the keep-alive time must be
         * greater than zero when setting {@code true}. This method
         * should in general be called before the pool is actively used.
         *
         * @param value {@code true} if should time out, else {@code false}
         * @throws IllegalArgumentException if value is {@code true}
         *         and the current keep-alive time is not greater than zero
         *
         * @since 1.6
         */
        public void allowCoreThreadTimeOut(boolean value) {
            if (value && keepAliveTime <= 0)
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
            if (value != allowCoreThreadTimeOut) {
                allowCoreThreadTimeOut = value;
                if (value)
                    interruptIdleWorkers();
            }
        }
    
        /**
         * Sets the maximum allowed number of threads. This overrides any
         * value set in the constructor. If the new value is smaller than
         * the current value, excess existing threads will be
         * terminated when they next become idle.
         *
         * @param maximumPoolSize the new maximum
         * @throws IllegalArgumentException if the new maximum is
         *         less than or equal to zero, or
         *         less than the {@linkplain #getCorePoolSize core pool size}
         * @see #getMaximumPoolSize
         */
        public void setMaximumPoolSize(int maximumPoolSize) {
            if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
                throw new IllegalArgumentException();
            this.maximumPoolSize = maximumPoolSize;
            if (workerCountOf(ctl.get()) > maximumPoolSize)
                interruptIdleWorkers();
        }
    
        /**
         * Returns the maximum allowed number of threads.
         *
         * @return the maximum allowed number of threads
         * @see #setMaximumPoolSize
         */
        public int getMaximumPoolSize() {
            return maximumPoolSize;
        }
    
        /**
         * Sets the time limit for which threads may remain idle before
         * being terminated.  If there are more than the core number of
         * threads currently in the pool, after waiting this amount of
         * time without processing a task, excess threads will be
         * terminated.  This overrides any value set in the constructor.
         *
         * @param time the time to wait.  A time value of zero will cause
         *        excess threads to terminate immediately after executing tasks.
         * @param unit the time unit of the {@code time} argument
         * @throws IllegalArgumentException if {@code time} less than zero or
         *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
         * @see #getKeepAliveTime(TimeUnit)
         */
        public void setKeepAliveTime(long time, TimeUnit unit) {
            if (time < 0)
                throw new IllegalArgumentException();
            if (time == 0 && allowsCoreThreadTimeOut())
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
            long keepAliveTime = unit.toNanos(time);
            long delta = keepAliveTime - this.keepAliveTime;
            this.keepAliveTime = keepAliveTime;
            if (delta < 0)
                interruptIdleWorkers();
        }
    
        /**
         * Returns the thread keep-alive time, which is the amount of time
         * that threads in excess of the core pool size may remain
         * idle before being terminated.
         *
         * @param unit the desired time unit of the result
         * @return the time limit
         * @see #setKeepAliveTime(long, TimeUnit)
         */
        public long getKeepAliveTime(TimeUnit unit) {
            return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
        }
    
        /* User-level queue utilities */
    
        /**
         * Returns the task queue used by this executor. Access to the
         * task queue is intended primarily for debugging and monitoring.
         * This queue may be in active use.  Retrieving the task queue
         * does not prevent queued tasks from executing.
         *
         * @return the task queue
         */
        public BlockingQueue<Runnable> getQueue() {
            return workQueue;
        }
    
        /**
         * Removes this task from the executor's internal queue if it is
         * present, thus causing it not to be run if it has not already
         * started.
         *
         * <p>This method may be useful as one part of a cancellation
         * scheme.  It may fail to remove tasks that have been converted
         * into other forms before being placed on the internal queue. For
         * example, a task entered using {@code submit} might be
         * converted into a form that maintains {@code Future} status.
         * However, in such cases, method {@link #purge} may be used to
         * remove those Futures that have been cancelled.
         *
         * @param task the task to remove
         * @return {@code true} if the task was removed
         */
        public boolean remove(Runnable task) {
            boolean removed = workQueue.remove(task);
            tryTerminate(); // In case SHUTDOWN and now empty
            return removed;
        }
    
        /**
         * Tries to remove from the work queue all {@link Future}
         * tasks that have been cancelled. This method can be useful as a
         * storage reclamation operation, that has no other impact on
         * functionality. Cancelled tasks are never executed, but may
         * accumulate in work queues until worker threads can actively
         * remove them. Invoking this method instead tries to remove them now.
         * However, this method may fail to remove tasks in
         * the presence of interference by other threads.
         */
        public void purge() {
            final BlockingQueue<Runnable> q = workQueue;
            try {
                Iterator<Runnable> it = q.iterator();
                while (it.hasNext()) {
                    Runnable r = it.next();
                    if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                        it.remove();
                }
            } catch (ConcurrentModificationException fallThrough) {
                // Take slow path if we encounter interference during traversal.
                // Make copy for traversal and call remove for cancelled entries.
                // The slow path is more likely to be O(N*N).
                for (Object r : q.toArray())
                    if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                        q.remove(r);
            }
    
            tryTerminate(); // In case SHUTDOWN and now empty
        }
    
        /* Statistics */
    
        /**
         * Returns the current number of threads in the pool.
         *
         * @return the number of threads
         */
        public int getPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Remove rare and surprising possibility of
                // isTerminated() && getPoolSize() > 0
                return runStateAtLeast(ctl.get(), TIDYING) ? 0
                    : workers.size();
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the approximate number of threads that are actively
         * executing tasks.
         *
         * @return the number of threads
         */
        public int getActiveCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int n = 0;
                for (Worker w : workers)
                    if (w.isLocked())
                        ++n;
                return n;
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the largest number of threads that have ever
         * simultaneously been in the pool.
         *
         * @return the number of threads
         */
        public int getLargestPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                return largestPoolSize;
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the approximate total number of tasks that have ever been
         * scheduled for execution. Because the states of tasks and
         * threads may change dynamically during computation, the returned
         * value is only an approximation.
         *
         * @return the number of tasks
         */
        public long getTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                long n = completedTaskCount;
                for (Worker w : workers) {
                    n += w.completedTasks;
                    if (w.isLocked())
                        ++n;
                }
                return n + workQueue.size();
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the approximate total number of tasks that have
         * completed execution. Because the states of tasks and threads
         * may change dynamically during computation, the returned value
         * is only an approximation, but one that does not ever decrease
         * across successive calls.
         *
         * @return the number of tasks
         */
        public long getCompletedTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                long n = completedTaskCount;
                for (Worker w : workers)
                    n += w.completedTasks;
                return n;
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns a string identifying this pool, as well as its state,
         * including indications of run state and estimated worker and
         * task counts.
         *
         * @return a string identifying this pool, as well as its state
         */
        public String toString() {
            long ncompleted;
            int nworkers, nactive;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                ncompleted = completedTaskCount;
                nactive = 0;
                nworkers = workers.size();
                for (Worker w : workers) {
                    ncompleted += w.completedTasks;
                    if (w.isLocked())
                        ++nactive;
                }
            } finally {
                mainLock.unlock();
            }
            int c = ctl.get();
            String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
                         (runStateAtLeast(c, TERMINATED) ? "Terminated" :
                          "Shutting down"));
            return super.toString() +
                "[" + rs +
                ", pool size = " + nworkers +
                ", active threads = " + nactive +
                ", queued tasks = " + workQueue.size() +
                ", completed tasks = " + ncompleted +
                "]";
        }
    }
    
    ThreadFactory
    public interface ThreadFactory {
    
        /**
         * Constructs a new {@code Thread}.  Implementations may also initialize
         * priority, name, daemon status, {@code ThreadGroup}, etc.
         *
         * @param r a runnable to be executed by new thread instance
         * @return constructed thread, or {@code null} if the request to
         *         create a thread is rejected
         */
        Thread newThread(Runnable r);
    }
    
    public class Executors {
    
        public static ThreadFactory defaultThreadFactory() {
            return new DefaultThreadFactory();
        }
    
        /**
         * 默认的线程工厂
         */
        static class DefaultThreadFactory implements ThreadFactory {
            static final AtomicInteger poolNumber = new AtomicInteger(1);//池数量
            final ThreadGroup group;//线程组
            final AtomicInteger threadNumber = new AtomicInteger(1);//线程数量
            final String namePrefix;
    
            /*
             * 创建默认的线程工厂
             */
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null)? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
            }
    
            /*
             * 创建一个新的线程
             */
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      // 新线程的名字
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                // 将后台守护线程设置为应用线程
                if (t.isDaemon())
                    t.setDaemon(false);
    
                // 将线程的优先级全部设置为 NORM_PRIORITY
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
    
                return t;
            }
        }
    
    }
    
    Worker
    // 通过继承 AQS 来实现独占锁这个功能
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        // 执行任务的线程
        final Thread thread;
    
        /** Initial task to run.  Possibly null. */
        // 要执行的任务
        Runnable firstTask;
    
        /** Per-thread task counter */
        // thread 线程已完成的任务数量
        volatile long completedTasks;
    
        /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
        Worker(Runnable firstTask) {
            // 设置 AQS 是 state 为 -1,主要目的是为了在 runWoker 之前不让中断。
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // this 表示 new 的 Worker 对象,Worker 实现了 Runnable 接口
            // 所以这里是用 Worker 对象来创建一个 thread 对象
            // Worker 中的 thread 的 start 方法会执行 Worker 的 run 方法
            // Worker 的 run 方法会调用线程池的 runWorker(this) 方法
            // runWorker(this) 则是调用 worker 的 firstTask 的 run 方法
            // 好处是可以重复利用 Worker 中的 thread ——> 处理阻塞队列中的任务
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        // 尝试独占锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        // 释放独占锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        // 中断已启动线程
        void interruptIfStarted() {
            Thread t;
            // getState() >= 0 说明该线程已启动
            // 线程 t 不能为 null
            // 并且 t 没有被中断过(中断过就不需要再次中断了)
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    // 中断该线程
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    
    execute
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    
        // 获取线程池状态
        int c = ctl.get();
        // 1. 当前线程池工作线程数小于核心线程池数
        if (workerCountOf(c) < corePoolSize) {
            // 使用核心线程池中的线程处理任务,成功则返回
            if (addWorker(command, true))
                return;
            // 如果调用核心线程池的线程处理任务失败,则重新获取线程池状态
            c = ctl.get();
        }
        // 2. 如果线程池当前状态仍然处于运行中,则将任务添加到阻塞队列
        // addWorker 添加失败会走到这里
        if (isRunning(c) && workQueue.offer(command)) {
            // 添加到阻塞队列成功后再重新获取线程池状态
            int recheck = ctl.get();
            // 如果当前线程池状态不是运行中,则从阻塞队列中移除掉刚刚添加的任务
            // remove 成功了就 reject 该任务,否则说明任务已经被执行了
            if (!isRunning(recheck) && remove(command))
                // 移除掉任务后跑出拒绝处理异常
                reject(command);
            // 否则如果当前线程池线程空,则添加一个线程
            else if (workerCountOf(recheck) == 0)
                /*
                    addWorker(null, false) 也就是创建一个线程,但并没有传入任务(null),因为任务已经被添加到                 workQueue 中了(remove(command) 失败才进入此 if 代码块),所以 worker 在执行的时候,会直                 接从 workQueue 中获取任务(getTask())。
                    */
                // addWorker(null, false) 为了保证线程池在 RUNNING 状态下必须要有一个线程来执行任务
                addWorker(null, false);
        }
        // 3. 阻塞队列已满,则新增线程处理任务
        else if (!addWorker(command, false))
            // 新增线程处理任务失败,抛出拒绝处理异常
            reject(command);
    }
    
    reject
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
    
    addWorker
    // firstTask:要执行的任务
    // core:是否添加到核心线程池
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // rs >= SHUTDOWN 说明当前线程池不再接受新的任务
            // 但是线程池状态为 SHUTDOWN 并且阻塞队列有任务时,仍可以处理这些任务
            // 此时 firstTask = null,不是新增任务,而是新增线程来处理任务,即:
            // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
    
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            // corePoolSize 未满,核心线程的数量加 1
            // 获取最新的核心线程池数量
            // 获取最新的线程池状态(和原先状态不一致,重新循环)
            for (;;) {
                // 线程池中的线程数
                int wc = workerCountOf(c);
                // 如果线程数超限,则返回
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 自增 workerCount,如果成功,则退出 retry 循环
                // 否则更新 ctl,然后判断当前状态是否改变,已改变就从外层 for 循环开始重新执行(外层 for 循环有判断状态逻辑)
                // 如果状态没有改变,则重试自增 workerCount 操作
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 设置 workerCount 失败,重新获取线程池状态
                c = ctl.get();  // Re-read ctl
                // 如果线程池当前的状态和方法开始时的状态一致,则重新循环本层的 for 循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        
        try {
            // 初始化 worker
            w = new Worker(firstTask);
            // 执行这个任务的线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 获取 mianLock 锁,准备启动线程
                // 先判断线程池的状态,再判断线程是否已启动
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    // rs < SHUTDOWN 表示是 RUNNING 状态
                    // rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程,用来处理阻塞队列中的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果线程已经被 start 过了,则抛出异常,不允许重复调用 start
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 添加任务到 HashSet 集合中
                        workers.add(w);
                        int s = workers.size();
                        // 如果 workers 的长度(任务队列长度)大于最大线程数量,则更新最大线程数量
                        // largestPoolSize 记录着线程池中出现过的最大线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放 mainLock 锁
                    mainLock.unlock();
                }
                // 已添加任务到 workers 集合
                if (workerAdded) {
                    // 启动线程
                    t.start();
                    // 线程已启动
                    workerStarted = true;
                }
            }
        } finally {
            // 任务添加失败,或者任务添加成功但是启动失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    addWorkerFailed
    /**
         * Rolls back the worker thread creation.
         * - removes worker from workers, if present
         * - decrements worker count
         * - rechecks for termination, in case the existence of this
         *   worker was holding up termination
         */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        // 获取 mainLock 锁
        mainLock.lock();
        try {
            // 如果 worker 启动失败,则:
            // 1. 如果 worker 不为 null,则从 workers 集合中移除该任务
            if (w != null)
                workers.remove(w);
            // 2. workerCount 自减 1
            decrementWorkerCount();
            // 3. 根据线程池状态进行判断是否结束线程池
            tryTerminate();
        } finally {
            // 释放 mainLock 锁
            mainLock.unlock();
        }
    }
    
    tryTerminate
    // 根据线程池状态进行判断是否结束线程池
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
    
            // 当前线程池的状态为以下几种情况时,直接返回:
    
            // 1. 因为还在运行中,不能停止
            if (isRunning(c) ||
                // 2. TIDYING 或 TERMINATED,其它线程已经在结束线程池了,无需当前线程来结束
                runStateAtLeast(c, TIDYING) ||
                // 3. 调用 shutdown() 方法后的状态是 SHUTDOWN,但是仍然可以处理队列中的任务
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 4. 如果线程数量不为 0,则中断一个空闲的工作线程,并返回
            if (workerCountOf(c) != 0) { // Eligible to terminate
                /**
    			   当 shutdown() 方法被调用时,会执行 interruptIdleWorkers(),
    			   此方法会先检查线程是否是空闲状态,如果发现线程不是空闲状态,才会中断线程,
    			   中断线程让在任务队列中阻塞的线程醒过来。但是如果在执行 interruptIdleWorkers() 方法时,
    			   线程正在运行,此时并没有被中断;如果线程执行完任务后,然后又去调用了getTask (),
    			   这时如果 workQueue 中没有任务了,调用 workQueue.take() 时就会一直阻塞。
    			   这时该线程便错过了 shutdown()  的中断信号,若没有额外的操作,线程会一直处于阻塞的状态。
    			   所以每次在工作线程结束时调用 tryTerminate 方法来尝试中断一个空闲工作线程,
    			   避免在队列为空时取任务一直阻塞的情况,弥补了 shutdown() 中丢失的信号。
                    */
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            // 只能是以下情形会继续下面的逻辑:结束线程池。
            // 1. SHUTDOWN 状态,这时不再接受新任务而且任务队列也空了
            // 2. STOP 状态,当调用了 shutdownNow 方法
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 5. 这里尝试设置状态为 TIDYING,如果设置成功,则调用 terminated 方法
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // terminated 方法默认什么都不做,留给子类实现
                        terminated();
                    } finally {
                        // 设置状态为 TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    
    interruptWorkers
    // 中断所有已启动线程
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    interruptIdleWorkers 相关方法
    // 中断所有空闲线程
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 首先看当前线程是否已经中断,如果没有中断,就看线程是否处于空闲状态 
                // 如果能获得线程关联的 Worker 锁,说明线程处于空闲状态,可以中断 
                // 否则说明线程不能中断
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 如果 onlyOne 为 true,只尝试中断第一个线程
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    private static final boolean ONLY_ONE = true;
    
    runWorker
    final void runWorker(Worker w) {
        // 获取当前线程(等价于 w 的 thread)
        Thread wt = Thread.currentThread();
        // 获取当前 Worker 对象的任务 
        Runnable task = w.firstTask;
        w.firstTask = null;
        // unlock 源码:release(1); 
        // new Woker() 时,设置了 state 是 -1,这里调用 unlock,作用是将 state 位置为 0,允许 worker 中断
        w.unlock(); // allow interrupts
        // 用来标记线程是正常退出循环还是异常退出
        boolean completedAbruptly = true;
        try {
            // 如果任务不为空,说明是刚创建线程,
            // 如果任务为空,则从队列中取任务,如果队列没有任务,线程就会阻塞在这里(getTask 方法里调用了队列的 take 阻塞方法)。这里从阻塞队列中获取任务并执行,而不用新建线程去执行,这就是线程池的优势。
            // task 执行完后在 finally 块中将其设置成了 null,所以第一次 worker 中 firstTask 执行完成后,后面都会从阻塞队列中获取任务来处理
            while (task != null || (task = getTask()) != null) {
                // worker 获取独占锁,准备执行任务
                w.lock();
    
                /**
                    第一个条件 runStateAtLeast(ctl.get(), STOP) 为 true,表示状态 >= STOP,而线程没有被中断,则线程需要被中断
                    第一个条件 runStateAtLeast(ctl.get(), STOP) 为 false,则再去判断当前线程是否被中断,如果被中断,则继续判断是否线程池状态 >= STOP
                    因为前面调用了 Thread.interrupted(),所以 wt.isInterrupted() 为 false,即线程没有被中断,则线程需要被中断
                */
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
    
                try {
                    // 任务执行之前做一些处理,空函数,需要用户定义处理逻辑
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务,也就是提交到线程池里的任务,且捕获异常
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        // 因为 runnable 方法不能抛出 checkedException ,所以这里
                        // 将异常包装成 Error 抛出
                        thrown = x; 
                        throw new Error(x);
                    } finally {
                        // 任务执行完之后做一些处理,默认空函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 如果在执行 task.run() 时抛出异常,是不会走到这里的
            // 所以抛出异常时,completedAbruptly 是 true,表示线程异常退出
            completedAbruptly = false;
        } finally {
            // 线程
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    getTask
    // 从线程池阻塞队列中取任务
    // 返回 null 前(线程正常销毁退出),都会进行 workerCount 减 1 操作
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            // 获取当前线程池状态
            int rs = runStateOf(c);
    
            // rs >= STOP(线程池不接收任务,也不处理阻塞队列中的任务)
            // 或者 
            // rs >= SHUTDOWN 且 阻塞队列为空(线程池不接收任务,且把阻塞队列中的任务处理完了)
            // 这时候将核心线程的数量减 1,并直接返回 null,线程不再继续处理任务
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 方法实现:do {} while (! compareAndDecrementWorkerCount(ctl.get()));
                // 不停的获取线程数量,并进行核心线程数的自减操作
                decrementWorkerCount();
                // 当前线程需要销毁
                return null;
            }
    
            // 获取当前线程池工作线程数
            int wc = workerCountOf(c);
    
            // Are workers subject to culling?
            // timed 用于判断是否需要进行超时控制。
            // allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时
            // wc > corePoolSize,表示当前线程池中的线程数量大于 corePoolSize,对于超过核心线程数量的这些线程,需要进行超时控制
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            // wc > maximumPoolSize 需要销毁线程(setMaximumPoolSize 可能会导致 maximumPoolSize 变小了)
            // (这里 wc 要么 > maximumPoolSize,要么 > corePoolSize,所以销毁线程不会对线程池造成影响)
            // timed && timedOut 说明线程空闲超时了,需要销毁线程
            if ((wc > maximumPoolSize || (timed && timedOut))
                // wc 大于 1 或 阻塞队列为空
                && (wc > 1 || workQueue.isEmpty())) {
                // 线程数自减
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                // 根据 timed 来判断 workQueue 是超时等待获取队列任务,还是一直阻塞等待任务
                Runnable r = timed
                    // 超时等待:当超过给定 keepAliveTime 时间还没有获取到任务时,则会返回 null,此时 Woker 会被销毁(getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法)
                    // keepAliveTime 就是线程的空闲时间(所以可以用来作为获取任务的等待超时时间)
                    ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
                    // 阻塞等待:一直阻塞,直到有任务进来
                    : workQueue.take();
                if (r != null)
                    return r;
                // 获取任务超时,需要重新走循环获取任务
                timedOut = true;
            } catch (InterruptedException retry) {
                // 如果获取任务时当前线程发生了中断,则设置 timedOut 为 false 并返回循环重试
                timedOut = false;
            }
        }
    }
    
    processWorkerExit
    // processWorkerExit 方法逻辑和 addWorkerFailed 方法逻辑类似
    // processWorkerExit 需要将该线程已完成的任务数加到线程池的所有已完成任务中
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果 completedAbruptly 值为 true,则说明线程执行时出现了异常,将 workerCount 减 1
        // 如果线程执行时没有出现异常,说明在 getTask() 方法中可能已经已经对 workerCount 进行了减 1 操作,这里就不必再减了
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 统计完成的任务数
            completedTaskCount += w.completedTasks;
            // 从 workers 中移除,也就表示着从线程池中移除了一个工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        // 根据线程池状态进行判断是否结束线程池
        tryTerminate();
    
        int c = ctl.get();
        
        // getTask 方法中,线程数多了要销毁线程
        // 这里线程数少了,要添加线程
        // 此时状态是 RUNNING 或者 SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 不是异常退出,说明从 getTask() 返回 null 导致的退出
            if (!completedAbruptly) {
                // 最小线程数,allowCoreThreadTimeOut 为 true,则核心线程可以被销毁,所以数量最少可以为 0,否则最少要保留 corePoolSize 个核心线程
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && !workQueue.isEmpty())
                    // 如果 allowCoreThreadTimeOut = true,并且等待队列有任务,至少保留一个线程来处理任务
                    // 修正最小核心线程数为 1
                    min = 1;
                // 走到这里,说明 allowCoreThreadTimeOut = false,则 workerCount 不少于 corePoolSize
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 异常退出
            // 线程池中,当前活跃线程数不满足大于等于 min,要给线程池添加一个线程来处理任务
            addWorker(null, false);
        }
    }
    
    hook 方法
    /* Extension hooks */
    
    protected void beforeExecute(Thread t, Runnable r) { }
    
    protected void afterExecute(Runnable r, Throwable t) { }
    
    /**
         * Method invoked when the Executor has terminated.  Default
         * implementation does nothing. Note: To properly nest multiple
         * overridings, subclasses should generally invoke
         * {@code super.terminated} within this method.
         */
    protected void terminated() { }
    
    shutdown、shutdownNow 相关方法
    /*
         * Methods for controlling interrupts to worker threads.
         */
    
    /**
         * If there is a security manager, makes sure caller has
         * permission to shut down threads in general (see shutdownPerm).
         * If this passes, additionally makes sure the caller is allowed
         * to interrupt each worker thread. This might not be true even if
         * first check passed, if the SecurityManager treats some threads
         * specially.
         */
    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }
    
    void onShutdown() {
    }
    
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }
    
    /**
    * 取出阻塞队列中没有被执行的任务并返回
         * Drains the task queue into a new list, normally using
         * drainTo. But if the queue is a DelayQueue or any other kind of
         * queue for which poll or drainTo may fail to remove some
         * elements, it deletes them one by one.
         */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        // drainTo 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数)
        // 通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            // 将 List 转换为数组,循环,取出 drainTo 方法未取完的元素
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }
    
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查当前线程是否有关闭线程池的权限
            checkShutdownAccess();
            // 将线程池状态提升为 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中断空闲线程,这里最终调用 interruptIdleWorkers(false);
            interruptIdleWorkers();
            // hook 方法,默认为空,让用户在线程池关闭时可以做一些操作
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 检查是否可以关闭线程池
        tryTerminate();
    }
    
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查线程是否具有关闭线程池的权限
            checkShutdownAccess();
            // 将线程池状态提升为 STOP
            advanceRunState(STOP);
            // 中断所有工作线程,无论是否空闲
            interruptWorkers();
            // 取出队列中没有被执行的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    
    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }
    
    public boolean isTerminating() {
        int c = ctl.get();
        return ! isRunning(c) && runStateLessThan(c, TERMINATED);
    }
    
    public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }
    
    // 等待线程池状态变为 TERMINATED 则返回,或者时间超时。由于整个过程独占锁,所以一般调用 shutdown 或者 shutdownNow 后使用
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 如果线程池状态为 TERMINATED,则返回 true
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                // 如果超时了,并且状态还不是 TERMINATED,则返回 false
                if (nanos <= 0)
                    return false;
                // 超时等待
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    RejectedExecutionHandler
    public interface RejectedExecutionHandler {
        //  r 为请求执行的任务,executor 为线程池
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    
    /* Predefined RejectedExecutionHandlers */
    
    /**
    直接在调用者线程中,运行当前被丢弃的任务
    这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降
         * A handler for rejected tasks that runs the rejected task
         * directly in the calling thread of the {@code execute} method,
         * unless the executor has been shut down, in which case the task
         * is discarded.
         */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
             * Creates a {@code CallerRunsPolicy}.
             */
        public CallerRunsPolicy() { }
    
        /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    
    /**
    直接抛出异常
         * A handler for rejected tasks that throws a
         * {@code RejectedExecutionException}.
         */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
             * Creates an {@code AbortPolicy}.
             */
        public AbortPolicy() { }
    
        /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 抛出 RejectedExecutionException 异常
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    
    /**
    丢弃无法处理的任务
         * A handler for rejected tasks that silently discards the
         * rejected task.
         */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
             * Creates a {@code DiscardPolicy}.
             */
        public DiscardPolicy() { }
    
        /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 不处理直接丢弃掉任务
        }
    }
    
    /**
    丢弃队列中最早被阻塞的线程
         * A handler for rejected tasks that discards the oldest unhandled
         * request and then retries {@code execute}, unless the executor
         * is shut down, in which case the task is discarded.
         */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
             * Creates a {@code DiscardOldestPolicy} for the given executor.
             */
        public DiscardOldestPolicy() { }
    
        /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // 从线程池中的阻塞队列中取出第一个元素(丢弃队列中最久的一个待处理任务)
                e.getQueue().poll();
                // 将请求处理的任务 r 再次放到线程池中去执行
                e.execute(r);
            }
        }
    }
    

    ScheduledThreadPoolExecutor 源码

    ScheduledThreadPoolExecutor 源码后面再分析。

  • 相关阅读:
    猪苓汤证与黄连阿胶汤(包括栀子豆豉汤)
    女子脸上长斑案
    js 标签云效果
    JS 黑客帝国文字下落效果
    修改webftp,在线文件管理
    利用百度地图API,获取经纬度坐标
    测试img在不显示时是否加载?
    PHP 生成指定大小随机图片
    超简易静态Web服务器
    js 编号生成器
  • 原文地址:https://www.cnblogs.com/wu726/p/15631997.html
Copyright © 2011-2022 走看看