zoukankan      html  css  js  c++  java
  • JUC 线程池的使用与源码解析

    线程池创建与使用

    线程池的创建

    Executors 框架提供了各种类型的线程池,主要有以下工厂方法∶

    • public static ExecutorService newFixedThreadPool(int nThreads)
    • public static ExecutorService newCachedThreadPool()
    • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
    • public static ExecutorService newSingleThreadExecutor()
    • public static ScheduledExecutorService newSingleThreadScheduledExecutor()

    corePoolSize > 1 的方法名称以 Pool 结尾,等于 1 的以 Executor 结尾。

    Executors 的真正实现类主要包括两个 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor。

    newFixedThreadPool

    线程数达到核心线程数之后不会再新建线程(一方面是队列是无边界的,另一方面是 corePoolSize = maximumPoolSize)。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize = maximumPoolSize
                                      0L, TimeUnit.MILLISECONDS,
                                      // 无界队列,所以即使设置了最大线程数,线程池中的线程数量也不会达到最大线程数
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    newSingleThreadExecutor

    类似于 newFixedThreadPool,只是将 corePoolSize 和 maximumPoolSize 都设置为 1。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, // corePoolSize = maximumPoolSize = 1
                                    0L, TimeUnit.MILLISECONDS,
                                    // 无界队列
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    newCachedThreadPool
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    newScheduledThreadPool

    可以指定核心线程数,用于创建一个用于执行周期性或者定时任务的线程池。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              // 延迟队列
              new DelayedWorkQueue());
    }
    

    ScheduledExecutorService 的核心方法如下:

    // 延迟 delay 时间后,执行 Runnable 任务
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
    
    // 延迟 delay 时间后,执行 Callable 任务
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    
    // 定时任务
    // 以上一个任务开始时间开始计时,period 时间后,如果上一个任务已完成,则立即执行,否则等待上一个任务完成后再执行
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    
    // 延迟任务
    // 以上一个任务的结束时间开始计时,delay 时间后,立即执行
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
    

    delay 或者 period 时间后,表示任务可以立即执行,但是也要先获取到线程才会立马执行,否则会先阻塞等待获取线程,和一般的线程池逻辑类似。

    线程池的使用

    等待所有任务线程执行完成

    引用:

    ExecutorService等待线程完成后优雅结束

    How to wait for all threads to finish, using ExecutorService?

    方法一:shutdown() / shutdownNow() + awaitTermination()
    ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
    
    while(...) {
        taskExecutor.execute(new MyTask());
    }
    
    // 线程池暂停接收新的任务
    taskExecutor.shutdown();
    
    try {
        // 等待所有任务完成
        taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    } catch (InterruptedException e) {
        // ...
    }
    
    方法二:invokeAll + shutdown() / shutdownNow() + awaitTermination()

    我们可以用来运行线程的第一种方法是 invokeAll() 方法,在所有任务完成或超时到期后,该方法返回 Future 对象列表。

    此外,我们必须注意返回的 Future 对象的顺序与提供的 Callable 对象的列表相同:

    ExecutorService taskExecutor = Executors.newFixedThreadPool(10);
    
    // your tasks
    List<Callable<String>> callables = Arrays.asList(new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000));
    
    // invokeAll() returns when all tasks are complete
    List<Future<String>> futures = taskExecutor.invokeAll(callables);
    
    taskExecutor.shutdown();
    
    try {
        // 等待所有任务完成
        taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    } catch (InterruptedException e) {
        // ...
    }
    
    方法三:使用 CountDownLatch
    ExecutorService taskExecutor = Executors.newFixedThreadPool(10);
    CountDownLatch latch = new CountDownLatch(2);
    
    for(int i = 0; i < 2; i++){
        taskExecutor.submit(() -> {
            try {
                // 业务逻辑...
                latch.countDown();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
    
    // wait for the latch to be decremented by the two remaining threads
    latch.await();
    
    方法四:使用 ExecutorCompletionService

    运行多个线程的另一种方法是使用 ExecutorCompletionService,它使用提供的 ExecutorService 来执行任务。

    与 invokeAll 的一个区别是返回表示执行任务的 Futures 的顺序。ExecutorCompletionService 使用队列按结束顺序存储结果,而 invokeAll 返回一个列表,该列表具有与给定任务列表的迭代器生成的顺序相同的顺序:

    CompletionService<String> service = new ExecutorCompletionService<>(WORKER_THREAD_POOL);
    
    List<Callable<String>> callables = Arrays.asList(new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000));
    
    for (Callable<String> callable : callables) {
          service.submit(callable);
    }
    
    方法五:使用 Java8 的 CompletableFuture
    ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
    List<Runnable> tasks = getTasks();
    CompletableFuture<?>[] futures = tasks.stream()
                                   .map(task -> CompletableFuture.runAsync(task, taskExecutor))
                                   .toArray(CompletableFuture[]::new);
    // 等待所有任务执行完成
    CompletableFuture.allOf(futures).join();    
    taskExecutor.shutdown();
    

    线程池的配置及优化

    配置主要是 ThreadPoolExecutor 构造方法的参数配置。

    coreThreadPoolSize

    每个线程都需要一定的栈内存空间。在最近的64位JVM中, 默认的栈大小 是1024KB。如果服务器收到大量请求,或者handleRequest方法执行很慢,服务器可能因为创建了大量线程而崩溃。例如有1000个并行的请求,创建出来的1000个线程需要使用1GB的JVM内存作为线程栈空间。另外,每个线程代码执行过程中创建的对象,还可能会在堆上创建对象。这样的情况恶化下去,将会超出JVM堆内存,并产生大量的垃圾回收操作,最终引发 内存溢出(OutOfMemoryErrors) 。

    这些线程不仅仅会消耗内存,它们还会使用其他有限的资源,例如文件句柄、数据库连接等。不可控的创建线程,还可能引发其他类型的错误和崩溃。因此,避免资源耗尽的一个重要方式,就是避免不可控的数据结构。

    顺便说下,由于线程栈大小引发的内存问题,可以通过-Xss开关来调整栈大小。缩小线程栈大小之后,可以减少每个线程的开销,但是可能会引发 栈溢出(StackOverflowErrors) 。对于一般应用程序而言,默认的1024KB过于富裕,调小为256KB或者512KB可能更为合适。Java允许的最小值是160KB。

    CPU密集型任务应配置尽可能小的线程,如配置 cpu 核数 + 1 个线程的线程池,减少线程的切换。

    IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 cpu 核数 * 2

    对于 IO 型的任务的最佳线程数,有个公式可以计算

    $$Nthreads = NCPU * UCPU * (1 + W/C)$$

    其中:

    ​ * NCPU 是处理器的核的数目

    ​ * UCPU 是期望的 CPU 利用率(该值应该介于 0 和 1 之间)

    ​ * W / C 是等待时间与计算时间的比率

    由于线程数的选定依赖于应用程序的类型,可能需要经过大量性能测试之后,才能得出最优的结果。

    通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用:

    • getTaskCount:线程池已经执行的和未执行的任务总数(所有线程的 completedTaskCount 数量加上阻塞队列中的元素个数);
    • getCompletedTaskCount:线程池已完成的任务数量(所有线程的 completedTaskCount 数量),该值小于等于 taskCount;
    • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize;
    • getPoolSize:线程池当前的线程数量(workers 里元素的个数);
    • getActiveCount:当前线程池中正在执行任务(独占锁被占用)的线程数量。

    通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。

    workQueue
    maximumPoolSize
    threadFactory

    默认使用 DefaultThreadFactory,可以自定义。

    RejectedExecutionHandler

    除了默认的 4 种拒绝策略外,还可以自定义拒绝策略。

    AbstractExecutorService 源码

    AbstractExecutorService 抽象类实现类 ExecutorService 接口。

    FutureTask 源码参考:Future 源码解析

    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    
    
    public abstract class AbstractExecutorService implements ExecutorService {
    
        // 使用线程对象 runnable 和 保存 runnable 执行结果的变量 value 来构造一个 FutureTask 对象
        // newTaskFor 方法使用了适配器模式,可以将 Runnable + value 或者 callable 对象构造成一个 FutureTask 对象
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        // 使用 callable 对象构造一个 FutureTask 对象
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        // 提交任务到线程池,返回一个 FutureTask 对象
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            // 将任务添加到线程池中执行
            execute(ftask);
            return ftask;
        }
    
        // 同上
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        // 同上
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    
        // 批量执行多个任务,但是只要一个任务完成就返回,同时中断其它任务
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                  boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
            // 当前待执行的任务数量
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
            // 创建一个 ExecutorCompletionService 对象,当前线程池对象作为 ExecutorCompletionService 的 executor
            // ExecutorCompletionService 可以维护一批 Future 任务,然后按照任务完成的先后顺序,添加到一个先进先出阻塞队列中
            // 然后通过 take 或者 poll 方法获取到的一个已经执行完成的 Future 任务,可以直接调用 future 任务的 get 方法获取执行结果
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);
    
            // For efficiency, especially in executors with limited
            // parallelism, check to see if previously submitted tasks are
            // done before submitting more of them. This interleaving
            // plus the exception mechanics account for messiness of main
            // loop.
    
            try {
                // Record exceptions so that if we fail to obtain any
                // result, we can throw the last exception we got.
                ExecutionException ee = null;
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
                Iterator<? extends Callable<T>> it = tasks.iterator();
    
                // Start one task for sure; the rest incrementally
                futures.add(ecs.submit(it.next()));
                --ntasks;
                // 当前正在执行的任务数量
                int active = 1;
    
                for (;;) {
                    Future<T> f = ecs.poll();
                    // 从 ecs 的队列里没有获取到任务
                    if (f == null) {
                        // 未执行的任务数量大于 0
                        if (ntasks > 0) {
                            --ntasks;
                            // 继续获取下一个未执行的任务,将其添加到线程池中去执行
                            futures.add(ecs.submit(it.next()));
                            ++active;
                        }
                        else if (active == 0)
                            // 任务都已经执行完成了
                            break;
                        else if (timed) {
                            // 从 ecs 的阻塞队列中获取一个已完成的 Future 对象,可超时
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                            if (f == null)
                                throw new TimeoutException();
                            nanos = deadline - System.nanoTime();
                        }
                        else
                            // 阻塞低从 ecs 的阻塞队列中获取一个已完成的 Future 对象
                            f = ecs.take();
                    }
                    
                    // 从阻塞队列中获取到了一个完成的 Future 任务(直接返回执行结果,并退出方法)
                    if (f != null) {
                        --active;
                        try {
                            // 返回获取到的一个任务的执行结果
                            return f.get();
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
    
                if (ee == null)
                    ee = new ExecutionException();
                throw ee;
    
            } finally {
                // 从阻塞队列中获取到了一个完成的 Future 任务并返回任务的执行结果,然后将所有的任务都中断掉
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            try {
                return doInvokeAny(tasks, false, 0);
            } catch (TimeoutException cannotHappen) {
                assert false;
                return null;
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return doInvokeAny(tasks, true, unit.toNanos(timeout));
        }
    
        // 批量执行多个任务
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                // 将多个任务依次添加到线程池中去执行
                for (Callable<T> t : tasks) {
                    RunnableFuture<T> f = newTaskFor(t);
                    // 将任务结果添加到 futures 中
                    futures.add(f);
                    execute(f);
                }
                // 遍历 futures 集合
                for (int i = 0, size = futures.size(); i < size; i++) {
                    Future<T> f = futures.get(i);
                    // 如果任务未完成
                    if (!f.isDone()) {
                        try {
                            // 阻塞获取执行结果
                            f.get();
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        }
                    }
                }
                // 执行到这里,说明所有任务都已完成了
                done = true;
                // 返回执行
                return futures;
            } finally {
                if (!done)
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);
            }
        }
    
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));
    
                final long deadline = System.nanoTime() + nanos;
                final int size = futures.size();
    
                // Interleave time checks and calls to execute in case
                // executor doesn't have any/much parallelism.
                for (int i = 0; i < size; i++) {
                    execute((Runnable)futures.get(i));
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L)
                        return futures;
                }
    
                for (int i = 0; i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) {
                        if (nanos <= 0L)
                            return futures;
                        try {
                            f.get(nanos, TimeUnit.NANOSECONDS);
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        nanos = deadline - System.nanoTime();
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);
            }
        }
    
    }
    

    ThreadPoolExecutor 源码

    对于核心的几个线程池,无论是 newFixedThreadPool() 方法、newCachedThreadPol()、还是 newSingleThreadExecutor() 方法,虽然看起来创建的线程有着完全不同的功能特点,但其内部实现均使用了 ThreadPoolExecutor 实现。

    ThreadPoolExecutor 属性
    public class ThreadPoolExecutor extends AbstractExecutorService {
    
        // ctl 字段存储线程池的当前状态和线程数
        // 高 3 位存放线程池的运行状态 (runState) 
        // 低 29 位存放线程池内有效线程(活跃)的数量 (workerCount)
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
        // 在 Java 中,一个 int 占据 32 位,所以 COUNT_BITS 的结果是 32 - 3 = 29
        private static final int COUNT_BITS = Integer.SIZE - 3;
    
        // CAPACITY 就代表了 workerCount 的上限,它是 ThreadPoolExecutor 中理论上的最大活跃线程数
        // 运算过程为 1 左移 29 位,也就是 00000000 00000000 00000000 00000001 --> 001 0000 00000000 00000000 00000000,再减去1的话,就是 000 11111 11111111 11111111 11111111,前三位代表线程池运行状态 runState,所以这里 workerCount 的理论最大值就应该是 29 个 1,即 536870911
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
    
        // RUNNING:接受新任务,并处理队列任务
        // -1 在 Java 底层是由 32 个 1 表示的,左移 29 位的话,即 111 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位全部为 1 的话,表示 RUNNING 状态,即 -536870912
        private static final int RUNNING    = -1 << COUNT_BITS;
    
        // SHUTDOWN:不接受新任务,但会处理队列任务
        // 在线程池处于 RUNNING 状态时,调用 shutdown() 方法会使线程池进入到该状态(finalize() 方法在执行过程中也会调用 shutdown() 方法进入该状态)
        // 0 在 Java 底层是由 32 个 0 表示的,无论左移多少位,还是 32 个 0,即 000 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位全部为 0 的话,表示 SHUTDOWN 状态,即 0;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
        // STOP:不接受新任务,不会处理队列任务,而且会中断正在处理过程中的任务
        // 在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态
        // 1 在 Java 底层是由前面的 31 个 0 和 1 个 1 组成的,左移 29 位的话,即 001 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位为 001 的话,表示 STOP 状态,即 536870912;
        private static final int STOP       =  1 << COUNT_BITS;
    
        // TIDYING:所有的任务已结束,workerCount 为 0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态
        // 在 Java 底层是由前面的 30 个 0 和 1 个 11 组成的,左移 29 位的话,即 011 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位为 011 的话,表示 TERMINATED 状态,即 1610612736;
        private static final int TIDYING    =  2 << COUNT_BITS;
    
        // TERMINATED:在 terminated() 方法执行完后进入该状态,默认 terminated() 方法中什么也没有做
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
    
        // 传入的 c 代表的是 ctl 的值,即高 3 位为线程池运行状态 runState,低 29 位为线程池中当前活动的线程数量 workerCount
    
        // runState:线程池运行状态,占据 ctl 的高 3 位,有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 五种状态
        // ~ 是按位取反的意思,CAPACITY 表示的是高位的 3 个 0,和低位的 29 个 1,而 ~CAPACITY 则表示高位的 3 个 1,低位的 29 个 0,然后再与入参 c 执行按位与操作,即高 3 位保持原样,低 29 位全部设置为 0,也就获取了线程池的运行状态 runState。
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
        // 取得当前线程池内有效线程的数量
        // workerCount:线程池中当前活动的线程数量,占据 ctl 的低 29 位
        // 将 c 与 CAPACITY 进行与操作 &,也就是与 000 11111 11111111 11111111 11111111 进行与操作,c 的前三位通过与 000 进行与操作,无论 c 前三位为何值,最终都会变成 000,也就是舍弃前三位的值,而 c 的低 29 位与 29 个 1 进行与操作,c 的低 29 位还是会保持原值,这样就从 AtomicInteger ctl 中解析出了 workerCount 的值
        private static int workerCountOf(int c)  { return c & CAPACITY; }
    
        // 原子变量 ctl 的初始化方法
        // 传入的 rs 表示线程池运行状态 runState,其是高 3 位有值,低 29 位全部为 0 的 int,而 wc 则代表线程池中有效线程的数量 workerCount,其为高 3 位全部为 0,而低 29 位有值得 int,将 runState 和 workerCount 做或操作 | 处理,即用 runState 的高 3 位,workerCount 的低 29 位填充的数字,而默认传入的 runState、workerCount 分别为 RUNNING 和 0。
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
        /*
         * Bit field accessors that don't require unpacking ctl.
         * These depend on the bit layout and on workerCount being never negative.
         */
    
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
    
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    
        /**
         * Attempts to CAS-increment the workerCount field of ctl.
         */
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
    
        /**
         * Attempts to CAS-decrement the workerCount field of ctl.
         */
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
        }
    
        /**
         * Decrements the workerCount field of ctl. This is called only on
         * abrupt termination of a thread (see processWorkerExit). Other
         * decrements are performed within getTask.
         */
        private void decrementWorkerCount() {
            do {} while (! compareAndDecrementWorkerCount(ctl.get()));
        }
    
        /**
         * workQueue 是用于持有任务并将其转换成工作线程 worker 的队列
         */
        private final BlockingQueue<Runnable> workQueue;
    
        private final ReentrantLock mainLock = new ReentrantLock();
    
        /**
         * workers 是包含线程池中所有工作线程 worker 的集合
         * 仅仅当获得 mainLock 锁时才能访问它
         */
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        /**
         * Wait condition to support awaitTermination
         */
        private final Condition termination = mainLock.newCondition();
    
        // 记录 workers 集合最大的元素个数
        private int largestPoolSize;
    
        /**
         * 线程池已完成的任务的数量
         */
        private long completedTaskCount;
    
        /**
         * 创建新线程的工厂类
         */
        private volatile ThreadFactory threadFactory;
    
        /**
         * 执行拒绝策略的处理器
         */
        private volatile RejectedExecutionHandler handler;
    
        /**
         * 空闲线程等待工作的超时时间(纳秒),即空闲线程存活时间 
         */
        private volatile long keepAliveTime;
    
        /**
         * 是否允许核心线程超时
         * 默认值为 false,如果为 false,core 线程在空闲时依然存活;如果为 true,则 core 线程等待工作,直到时间超时至 keepAliveTime
         */
        private volatile boolean allowCoreThreadTimeOut;
    
        /**
         * 核心线程池大小,保持存活的工作线程的最小数目,当小于 corePoolSize 时,会直接启动新的一个线程来处理任务,而不管线程池中是否有空闲线程
         */
        private volatile int corePoolSize;
    
        /**
         * 线程池中线程的最大数量
         */
        private volatile int maximumPoolSize;
    
        /**
         * The default rejected execution handler
         */
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
        private static final RuntimePermission shutdownPerm =
            new RuntimePermission("modifyThread");
    
        /* The context to be used when executing the finalizer, or null. */
        private final AccessControlContext acc;
    
        // Public constructors and methods
        // 以下是构造方法
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                null :
            AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    
        private void advanceRunState(int targetState) {
            for (;;) {
                int c = ctl.get();
                if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                    break;
            }
        }
    
        /**
         * Invokes {@code shutdown} when this executor is no longer
         * referenced and it has no threads.
         */
        protected void finalize() {
            SecurityManager sm = System.getSecurityManager();
            if (sm == null || acc == null) {
                shutdown();
            } else {
                PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
                AccessController.doPrivileged(pa, acc);
            }
        }
    
        /**
         * Sets the thread factory used to create new threads.
         *
         * @param threadFactory the new thread factory
         * @throws NullPointerException if threadFactory is null
         * @see #getThreadFactory
         */
        public void setThreadFactory(ThreadFactory threadFactory) {
            if (threadFactory == null)
                throw new NullPointerException();
            this.threadFactory = threadFactory;
        }
    
        /**
         * Returns the thread factory used to create new threads.
         *
         * @return the current thread factory
         * @see #setThreadFactory(ThreadFactory)
         */
        public ThreadFactory getThreadFactory() {
            return threadFactory;
        }
    
        /**
         * Sets a new handler for unexecutable tasks.
         *
         * @param handler the new handler
         * @throws NullPointerException if handler is null
         * @see #getRejectedExecutionHandler
         */
        public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
            if (handler == null)
                throw new NullPointerException();
            this.handler = handler;
        }
    
        /**
         * Returns the current handler for unexecutable tasks.
         *
         * @return the current handler
         * @see #setRejectedExecutionHandler(RejectedExecutionHandler)
         */
        public RejectedExecutionHandler getRejectedExecutionHandler() {
            return handler;
        }
    
        /**
         * Sets the core number of threads.  This overrides any value set
         * in the constructor.  If the new value is smaller than the
         * current value, excess existing threads will be terminated when
         * they next become idle.  If larger, new threads will, if needed,
         * be started to execute any queued tasks.
         *
         * @param corePoolSize the new core size
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         * @see #getCorePoolSize
         */
        public void setCorePoolSize(int corePoolSize) {
            if (corePoolSize < 0)
                throw new IllegalArgumentException();
            int delta = corePoolSize - this.corePoolSize;
            this.corePoolSize = corePoolSize;
            if (workerCountOf(ctl.get()) > corePoolSize)
                interruptIdleWorkers();
            else if (delta > 0) {
                // We don't really know how many new threads are "needed".
                // As a heuristic, prestart enough new workers (up to new
                // core size) to handle the current number of tasks in
                // queue, but stop if queue becomes empty while doing so.
                int k = Math.min(delta, workQueue.size());
                while (k-- > 0 && addWorker(null, true)) {
                    if (workQueue.isEmpty())
                        break;
                }
            }
        }
    
        /**
         * Returns the core number of threads.
         *
         * @return the core number of threads
         * @see #setCorePoolSize
         */
        public int getCorePoolSize() {
            return corePoolSize;
        }
    
        /**
         * Starts a core thread, causing it to idly wait for work. This
         * overrides the default policy of starting core threads only when
         * new tasks are executed. This method will return {@code false}
         * if all core threads have already been started.
         *
         * @return {@code true} if a thread was started
         */
        public boolean prestartCoreThread() {
            return workerCountOf(ctl.get()) < corePoolSize &&
                addWorker(null, true);
        }
    
        /**
         * Same as prestartCoreThread except arranges that at least one
         * thread is started even if corePoolSize is 0.
         */
        void ensurePrestart() {
            int wc = workerCountOf(ctl.get());
            if (wc < corePoolSize)
                addWorker(null, true);
            else if (wc == 0)
                addWorker(null, false);
        }
    
        /**
         * Starts all core threads, causing them to idly wait for work. This
         * overrides the default policy of starting core threads only when
         * new tasks are executed.
         *
         * @return the number of threads started
         */
        public int prestartAllCoreThreads() {
            int n = 0;
            while (addWorker(null, true))
                ++n;
            return n;
        }
    
        /**
         * Returns true if this pool allows core threads to time out and
         * terminate if no tasks arrive within the keepAlive time, being
         * replaced if needed when new tasks arrive. When true, the same
         * keep-alive policy applying to non-core threads applies also to
         * core threads. When false (the default), core threads are never
         * terminated due to lack of incoming tasks.
         *
         * @return {@code true} if core threads are allowed to time out,
         *         else {@code false}
         *
         * @since 1.6
         */
        public boolean allowsCoreThreadTimeOut() {
            return allowCoreThreadTimeOut;
        }
    
        /**
         * Sets the policy governing whether core threads may time out and
         * terminate if no tasks arrive within the keep-alive time, being
         * replaced if needed when new tasks arrive. When false, core
         * threads are never terminated due to lack of incoming
         * tasks. When true, the same keep-alive policy applying to
         * non-core threads applies also to core threads. To avoid
         * continual thread replacement, the keep-alive time must be
         * greater than zero when setting {@code true}. This method
         * should in general be called before the pool is actively used.
         *
         * @param value {@code true} if should time out, else {@code false}
         * @throws IllegalArgumentException if value is {@code true}
         *         and the current keep-alive time is not greater than zero
         *
         * @since 1.6
         */
        public void allowCoreThreadTimeOut(boolean value) {
            if (value && keepAliveTime <= 0)
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
            if (value != allowCoreThreadTimeOut) {
                allowCoreThreadTimeOut = value;
                if (value)
                    interruptIdleWorkers();
            }
        }
    
        /**
         * Sets the maximum allowed number of threads. This overrides any
         * value set in the constructor. If the new value is smaller than
         * the current value, excess existing threads will be
         * terminated when they next become idle.
         *
         * @param maximumPoolSize the new maximum
         * @throws IllegalArgumentException if the new maximum is
         *         less than or equal to zero, or
         *         less than the {@linkplain #getCorePoolSize core pool size}
         * @see #getMaximumPoolSize
         */
        public void setMaximumPoolSize(int maximumPoolSize) {
            if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
                throw new IllegalArgumentException();
            this.maximumPoolSize = maximumPoolSize;
            if (workerCountOf(ctl.get()) > maximumPoolSize)
                interruptIdleWorkers();
        }
    
        /**
         * Returns the maximum allowed number of threads.
         *
         * @return the maximum allowed number of threads
         * @see #setMaximumPoolSize
         */
        public int getMaximumPoolSize() {
            return maximumPoolSize;
        }
    
        /**
         * Sets the time limit for which threads may remain idle before
         * being terminated.  If there are more than the core number of
         * threads currently in the pool, after waiting this amount of
         * time without processing a task, excess threads will be
         * terminated.  This overrides any value set in the constructor.
         *
         * @param time the time to wait.  A time value of zero will cause
         *        excess threads to terminate immediately after executing tasks.
         * @param unit the time unit of the {@code time} argument
         * @throws IllegalArgumentException if {@code time} less than zero or
         *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
         * @see #getKeepAliveTime(TimeUnit)
         */
        public void setKeepAliveTime(long time, TimeUnit unit) {
            if (time < 0)
                throw new IllegalArgumentException();
            if (time == 0 && allowsCoreThreadTimeOut())
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
            long keepAliveTime = unit.toNanos(time);
            long delta = keepAliveTime - this.keepAliveTime;
            this.keepAliveTime = keepAliveTime;
            if (delta < 0)
                interruptIdleWorkers();
        }
    
        /**
         * Returns the thread keep-alive time, which is the amount of time
         * that threads in excess of the core pool size may remain
         * idle before being terminated.
         *
         * @param unit the desired time unit of the result
         * @return the time limit
         * @see #setKeepAliveTime(long, TimeUnit)
         */
        public long getKeepAliveTime(TimeUnit unit) {
            return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
        }
    
        /* User-level queue utilities */
    
        /**
         * Returns the task queue used by this executor. Access to the
         * task queue is intended primarily for debugging and monitoring.
         * This queue may be in active use.  Retrieving the task queue
         * does not prevent queued tasks from executing.
         *
         * @return the task queue
         */
        public BlockingQueue<Runnable> getQueue() {
            return workQueue;
        }
    
        /**
         * Removes this task from the executor's internal queue if it is
         * present, thus causing it not to be run if it has not already
         * started.
         *
         * <p>This method may be useful as one part of a cancellation
         * scheme.  It may fail to remove tasks that have been converted
         * into other forms before being placed on the internal queue. For
         * example, a task entered using {@code submit} might be
         * converted into a form that maintains {@code Future} status.
         * However, in such cases, method {@link #purge} may be used to
         * remove those Futures that have been cancelled.
         *
         * @param task the task to remove
         * @return {@code true} if the task was removed
         */
        public boolean remove(Runnable task) {
            boolean removed = workQueue.remove(task);
            tryTerminate(); // In case SHUTDOWN and now empty
            return removed;
        }
    
        /**
         * Tries to remove from the work queue all {@link Future}
         * tasks that have been cancelled. This method can be useful as a
         * storage reclamation operation, that has no other impact on
         * functionality. Cancelled tasks are never executed, but may
         * accumulate in work queues until worker threads can actively
         * remove them. Invoking this method instead tries to remove them now.
         * However, this method may fail to remove tasks in
         * the presence of interference by other threads.
         */
        public void purge() {
            final BlockingQueue<Runnable> q = workQueue;
            try {
                Iterator<Runnable> it = q.iterator();
                while (it.hasNext()) {
                    Runnable r = it.next();
                    if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                        it.remove();
                }
            } catch (ConcurrentModificationException fallThrough) {
                // Take slow path if we encounter interference during traversal.
                // Make copy for traversal and call remove for cancelled entries.
                // The slow path is more likely to be O(N*N).
                for (Object r : q.toArray())
                    if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                        q.remove(r);
            }
    
            tryTerminate(); // In case SHUTDOWN and now empty
        }
    
        /* Statistics */
    
        /**
         * Returns the current number of threads in the pool.
         *
         * @return the number of threads
         */
        public int getPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Remove rare and surprising possibility of
                // isTerminated() && getPoolSize() > 0
                return runStateAtLeast(ctl.get(), TIDYING) ? 0
                    : workers.size();
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the approximate number of threads that are actively
         * executing tasks.
         *
         * @return the number of threads
         */
        public int getActiveCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int n = 0;
                for (Worker w : workers)
                    if (w.isLocked())
                        ++n;
                return n;
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the largest number of threads that have ever
         * simultaneously been in the pool.
         *
         * @return the number of threads
         */
        public int getLargestPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                return largestPoolSize;
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the approximate total number of tasks that have ever been
         * scheduled for execution. Because the states of tasks and
         * threads may change dynamically during computation, the returned
         * value is only an approximation.
         *
         * @return the number of tasks
         */
        public long getTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                long n = completedTaskCount;
                for (Worker w : workers) {
                    n += w.completedTasks;
                    if (w.isLocked())
                        ++n;
                }
                return n + workQueue.size();
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns the approximate total number of tasks that have
         * completed execution. Because the states of tasks and threads
         * may change dynamically during computation, the returned value
         * is only an approximation, but one that does not ever decrease
         * across successive calls.
         *
         * @return the number of tasks
         */
        public long getCompletedTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                long n = completedTaskCount;
                for (Worker w : workers)
                    n += w.completedTasks;
                return n;
            } finally {
                mainLock.unlock();
            }
        }
    
        /**
         * Returns a string identifying this pool, as well as its state,
         * including indications of run state and estimated worker and
         * task counts.
         *
         * @return a string identifying this pool, as well as its state
         */
        public String toString() {
            long ncompleted;
            int nworkers, nactive;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                ncompleted = completedTaskCount;
                nactive = 0;
                nworkers = workers.size();
                for (Worker w : workers) {
                    ncompleted += w.completedTasks;
                    if (w.isLocked())
                        ++nactive;
                }
            } finally {
                mainLock.unlock();
            }
            int c = ctl.get();
            String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
                         (runStateAtLeast(c, TERMINATED) ? "Terminated" :
                          "Shutting down"));
            return super.toString() +
                "[" + rs +
                ", pool size = " + nworkers +
                ", active threads = " + nactive +
                ", queued tasks = " + workQueue.size() +
                ", completed tasks = " + ncompleted +
                "]";
        }
    }
    
    ThreadFactory
    public interface ThreadFactory {
    
        /**
         * Constructs a new {@code Thread}.  Implementations may also initialize
         * priority, name, daemon status, {@code ThreadGroup}, etc.
         *
         * @param r a runnable to be executed by new thread instance
         * @return constructed thread, or {@code null} if the request to
         *         create a thread is rejected
         */
        Thread newThread(Runnable r);
    }
    
    public class Executors {
    
        public static ThreadFactory defaultThreadFactory() {
            return new DefaultThreadFactory();
        }
    
        /**
         * 默认的线程工厂
         */
        static class DefaultThreadFactory implements ThreadFactory {
            static final AtomicInteger poolNumber = new AtomicInteger(1);//池数量
            final ThreadGroup group;//线程组
            final AtomicInteger threadNumber = new AtomicInteger(1);//线程数量
            final String namePrefix;
    
            /*
             * 创建默认的线程工厂
             */
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null)? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
            }
    
            /*
             * 创建一个新的线程
             */
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      // 新线程的名字
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                // 将后台守护线程设置为应用线程
                if (t.isDaemon())
                    t.setDaemon(false);
    
                // 将线程的优先级全部设置为 NORM_PRIORITY
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
    
                return t;
            }
        }
    
    }
    
    Worker
    // 通过继承 AQS 来实现独占锁这个功能
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        // 执行任务的线程
        final Thread thread;
    
        /** Initial task to run.  Possibly null. */
        // 要执行的任务
        Runnable firstTask;
    
        /** Per-thread task counter */
        // thread 线程已完成的任务数量
        volatile long completedTasks;
    
        /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
        Worker(Runnable firstTask) {
            // 设置 AQS 是 state 为 -1,主要目的是为了在 runWoker 之前不让中断。
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // this 表示 new 的 Worker 对象,Worker 实现了 Runnable 接口
            // 所以这里是用 Worker 对象来创建一个 thread 对象
            // Worker 中的 thread 的 start 方法会执行 Worker 的 run 方法
            // Worker 的 run 方法会调用线程池的 runWorker(this) 方法
            // runWorker(this) 则是调用 worker 的 firstTask 的 run 方法
            // 好处是可以重复利用 Worker 中的 thread ——> 处理阻塞队列中的任务
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        // 尝试独占锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        // 释放独占锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        // 中断已启动线程
        void interruptIfStarted() {
            Thread t;
            // getState() >= 0 说明该线程已启动
            // 线程 t 不能为 null
            // 并且 t 没有被中断过(中断过就不需要再次中断了)
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    // 中断该线程
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    
    execute
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    
        // 获取线程池状态
        int c = ctl.get();
        // 1. 当前线程池工作线程数小于核心线程池数
        if (workerCountOf(c) < corePoolSize) {
            // 使用核心线程池中的线程处理任务,成功则返回
            if (addWorker(command, true))
                return;
            // 如果调用核心线程池的线程处理任务失败,则重新获取线程池状态
            c = ctl.get();
        }
        // 2. 如果线程池当前状态仍然处于运行中,则将任务添加到阻塞队列
        // addWorker 添加失败会走到这里
        if (isRunning(c) && workQueue.offer(command)) {
            // 添加到阻塞队列成功后再重新获取线程池状态
            int recheck = ctl.get();
            // 如果当前线程池状态不是运行中,则从阻塞队列中移除掉刚刚添加的任务
            // remove 成功了就 reject 该任务,否则说明任务已经被执行了
            if (!isRunning(recheck) && remove(command))
                // 移除掉任务后跑出拒绝处理异常
                reject(command);
            // 否则如果当前线程池线程空,则添加一个线程
            else if (workerCountOf(recheck) == 0)
                /*
                    addWorker(null, false) 也就是创建一个线程,但并没有传入任务(null),因为任务已经被添加到                 workQueue 中了(remove(command) 失败才进入此 if 代码块),所以 worker 在执行的时候,会直                 接从 workQueue 中获取任务(getTask())。
                    */
                // addWorker(null, false) 为了保证线程池在 RUNNING 状态下必须要有一个线程来执行任务
                addWorker(null, false);
        }
        // 3. 阻塞队列已满,则新增线程处理任务
        else if (!addWorker(command, false))
            // 新增线程处理任务失败,抛出拒绝处理异常
            reject(command);
    }
    
    reject
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
    
    addWorker
    // firstTask:要执行的任务
    // core:是否添加到核心线程池
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // rs >= SHUTDOWN 说明当前线程池不再接受新的任务
            // 但是线程池状态为 SHUTDOWN 并且阻塞队列有任务时,仍可以处理这些任务
            // 此时 firstTask = null,不是新增任务,而是新增线程来处理任务,即:
            // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
    
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            // corePoolSize 未满,核心线程的数量加 1
            // 获取最新的核心线程池数量
            // 获取最新的线程池状态(和原先状态不一致,重新循环)
            for (;;) {
                // 线程池中的线程数
                int wc = workerCountOf(c);
                // 如果线程数超限,则返回
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 自增 workerCount,如果成功,则退出 retry 循环
                // 否则更新 ctl,然后判断当前状态是否改变,已改变就从外层 for 循环开始重新执行(外层 for 循环有判断状态逻辑)
                // 如果状态没有改变,则重试自增 workerCount 操作
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 设置 workerCount 失败,重新获取线程池状态
                c = ctl.get();  // Re-read ctl
                // 如果线程池当前的状态和方法开始时的状态一致,则重新循环本层的 for 循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        
        try {
            // 初始化 worker
            w = new Worker(firstTask);
            // 执行这个任务的线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 获取 mianLock 锁,准备启动线程
                // 先判断线程池的状态,再判断线程是否已启动
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    // rs < SHUTDOWN 表示是 RUNNING 状态
                    // rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程,用来处理阻塞队列中的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果线程已经被 start 过了,则抛出异常,不允许重复调用 start
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 添加任务到 HashSet 集合中
                        workers.add(w);
                        int s = workers.size();
                        // 如果 workers 的长度(任务队列长度)大于最大线程数量,则更新最大线程数量
                        // largestPoolSize 记录着线程池中出现过的最大线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放 mainLock 锁
                    mainLock.unlock();
                }
                // 已添加任务到 workers 集合
                if (workerAdded) {
                    // 启动线程
                    t.start();
                    // 线程已启动
                    workerStarted = true;
                }
            }
        } finally {
            // 任务添加失败,或者任务添加成功但是启动失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    addWorkerFailed
    /**
         * Rolls back the worker thread creation.
         * - removes worker from workers, if present
         * - decrements worker count
         * - rechecks for termination, in case the existence of this
         *   worker was holding up termination
         */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        // 获取 mainLock 锁
        mainLock.lock();
        try {
            // 如果 worker 启动失败,则:
            // 1. 如果 worker 不为 null,则从 workers 集合中移除该任务
            if (w != null)
                workers.remove(w);
            // 2. workerCount 自减 1
            decrementWorkerCount();
            // 3. 根据线程池状态进行判断是否结束线程池
            tryTerminate();
        } finally {
            // 释放 mainLock 锁
            mainLock.unlock();
        }
    }
    
    tryTerminate
    // 根据线程池状态进行判断是否结束线程池
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
    
            // 当前线程池的状态为以下几种情况时,直接返回:
    
            // 1. 因为还在运行中,不能停止
            if (isRunning(c) ||
                // 2. TIDYING 或 TERMINATED,其它线程已经在结束线程池了,无需当前线程来结束
                runStateAtLeast(c, TIDYING) ||
                // 3. 调用 shutdown() 方法后的状态是 SHUTDOWN,但是仍然可以处理队列中的任务
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 4. 如果线程数量不为 0,则中断一个空闲的工作线程,并返回
            if (workerCountOf(c) != 0) { // Eligible to terminate
                /**
    			   当 shutdown() 方法被调用时,会执行 interruptIdleWorkers(),
    			   此方法会先检查线程是否是空闲状态,如果发现线程不是空闲状态,才会中断线程,
    			   中断线程让在任务队列中阻塞的线程醒过来。但是如果在执行 interruptIdleWorkers() 方法时,
    			   线程正在运行,此时并没有被中断;如果线程执行完任务后,然后又去调用了getTask (),
    			   这时如果 workQueue 中没有任务了,调用 workQueue.take() 时就会一直阻塞。
    			   这时该线程便错过了 shutdown()  的中断信号,若没有额外的操作,线程会一直处于阻塞的状态。
    			   所以每次在工作线程结束时调用 tryTerminate 方法来尝试中断一个空闲工作线程,
    			   避免在队列为空时取任务一直阻塞的情况,弥补了 shutdown() 中丢失的信号。
                    */
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            // 只能是以下情形会继续下面的逻辑:结束线程池。
            // 1. SHUTDOWN 状态,这时不再接受新任务而且任务队列也空了
            // 2. STOP 状态,当调用了 shutdownNow 方法
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 5. 这里尝试设置状态为 TIDYING,如果设置成功,则调用 terminated 方法
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // terminated 方法默认什么都不做,留给子类实现
                        terminated();
                    } finally {
                        // 设置状态为 TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    
    interruptWorkers
    // 中断所有已启动线程
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    interruptIdleWorkers 相关方法
    // 中断所有空闲线程
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 首先看当前线程是否已经中断,如果没有中断,就看线程是否处于空闲状态 
                // 如果能获得线程关联的 Worker 锁,说明线程处于空闲状态,可以中断 
                // 否则说明线程不能中断
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 如果 onlyOne 为 true,只尝试中断第一个线程
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    private static final boolean ONLY_ONE = true;
    
    runWorker
    final void runWorker(Worker w) {
        // 获取当前线程(等价于 w 的 thread)
        Thread wt = Thread.currentThread();
        // 获取当前 Worker 对象的任务 
        Runnable task = w.firstTask;
        w.firstTask = null;
        // unlock 源码:release(1); 
        // new Woker() 时,设置了 state 是 -1,这里调用 unlock,作用是将 state 位置为 0,允许 worker 中断
        w.unlock(); // allow interrupts
        // 用来标记线程是正常退出循环还是异常退出
        boolean completedAbruptly = true;
        try {
            // 如果任务不为空,说明是刚创建线程,
            // 如果任务为空,则从队列中取任务,如果队列没有任务,线程就会阻塞在这里(getTask 方法里调用了队列的 take 阻塞方法)。这里从阻塞队列中获取任务并执行,而不用新建线程去执行,这就是线程池的优势。
            // task 执行完后在 finally 块中将其设置成了 null,所以第一次 worker 中 firstTask 执行完成后,后面都会从阻塞队列中获取任务来处理
            while (task != null || (task = getTask()) != null) {
                // worker 获取独占锁,准备执行任务
                w.lock();
    
                /**
                    第一个条件 runStateAtLeast(ctl.get(), STOP) 为 true,表示状态 >= STOP,而线程没有被中断,则线程需要被中断
                    第一个条件 runStateAtLeast(ctl.get(), STOP) 为 false,则再去判断当前线程是否被中断,如果被中断,则继续判断是否线程池状态 >= STOP
                    因为前面调用了 Thread.interrupted(),所以 wt.isInterrupted() 为 false,即线程没有被中断,则线程需要被中断
                */
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
    
                try {
                    // 任务执行之前做一些处理,空函数,需要用户定义处理逻辑
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务,也就是提交到线程池里的任务,且捕获异常
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        // 因为 runnable 方法不能抛出 checkedException ,所以这里
                        // 将异常包装成 Error 抛出
                        thrown = x; 
                        throw new Error(x);
                    } finally {
                        // 任务执行完之后做一些处理,默认空函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 如果在执行 task.run() 时抛出异常,是不会走到这里的
            // 所以抛出异常时,completedAbruptly 是 true,表示线程异常退出
            completedAbruptly = false;
        } finally {
            // 线程
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    getTask
    // 从线程池阻塞队列中取任务
    // 返回 null 前(线程正常销毁退出),都会进行 workerCount 减 1 操作
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            // 获取当前线程池状态
            int rs = runStateOf(c);
    
            // rs >= STOP(线程池不接收任务,也不处理阻塞队列中的任务)
            // 或者 
            // rs >= SHUTDOWN 且 阻塞队列为空(线程池不接收任务,且把阻塞队列中的任务处理完了)
            // 这时候将核心线程的数量减 1,并直接返回 null,线程不再继续处理任务
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 方法实现:do {} while (! compareAndDecrementWorkerCount(ctl.get()));
                // 不停的获取线程数量,并进行核心线程数的自减操作
                decrementWorkerCount();
                // 当前线程需要销毁
                return null;
            }
    
            // 获取当前线程池工作线程数
            int wc = workerCountOf(c);
    
            // Are workers subject to culling?
            // timed 用于判断是否需要进行超时控制。
            // allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时
            // wc > corePoolSize,表示当前线程池中的线程数量大于 corePoolSize,对于超过核心线程数量的这些线程,需要进行超时控制
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            // wc > maximumPoolSize 需要销毁线程(setMaximumPoolSize 可能会导致 maximumPoolSize 变小了)
            // (这里 wc 要么 > maximumPoolSize,要么 > corePoolSize,所以销毁线程不会对线程池造成影响)
            // timed && timedOut 说明线程空闲超时了,需要销毁线程
            if ((wc > maximumPoolSize || (timed && timedOut))
                // wc 大于 1 或 阻塞队列为空
                && (wc > 1 || workQueue.isEmpty())) {
                // 线程数自减
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                // 根据 timed 来判断 workQueue 是超时等待获取队列任务,还是一直阻塞等待任务
                Runnable r = timed
                    // 超时等待:当超过给定 keepAliveTime 时间还没有获取到任务时,则会返回 null,此时 Woker 会被销毁(getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法)
                    // keepAliveTime 就是线程的空闲时间(所以可以用来作为获取任务的等待超时时间)
                    ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
                    // 阻塞等待:一直阻塞,直到有任务进来
                    : workQueue.take();
                if (r != null)
                    return r;
                // 获取任务超时,需要重新走循环获取任务
                timedOut = true;
            } catch (InterruptedException retry) {
                // 如果获取任务时当前线程发生了中断,则设置 timedOut 为 false 并返回循环重试
                timedOut = false;
            }
        }
    }
    
    processWorkerExit
    // processWorkerExit 方法逻辑和 addWorkerFailed 方法逻辑类似
    // processWorkerExit 需要将该线程已完成的任务数加到线程池的所有已完成任务中
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果 completedAbruptly 值为 true,则说明线程执行时出现了异常,将 workerCount 减 1
        // 如果线程执行时没有出现异常,说明在 getTask() 方法中可能已经已经对 workerCount 进行了减 1 操作,这里就不必再减了
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 统计完成的任务数
            completedTaskCount += w.completedTasks;
            // 从 workers 中移除,也就表示着从线程池中移除了一个工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        // 根据线程池状态进行判断是否结束线程池
        tryTerminate();
    
        int c = ctl.get();
        
        // getTask 方法中,线程数多了要销毁线程
        // 这里线程数少了,要添加线程
        // 此时状态是 RUNNING 或者 SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 不是异常退出,说明从 getTask() 返回 null 导致的退出
            if (!completedAbruptly) {
                // 最小线程数,allowCoreThreadTimeOut 为 true,则核心线程可以被销毁,所以数量最少可以为 0,否则最少要保留 corePoolSize 个核心线程
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && !workQueue.isEmpty())
                    // 如果 allowCoreThreadTimeOut = true,并且等待队列有任务,至少保留一个线程来处理任务
                    // 修正最小核心线程数为 1
                    min = 1;
                // 走到这里,说明 allowCoreThreadTimeOut = false,则 workerCount 不少于 corePoolSize
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 异常退出
            // 线程池中,当前活跃线程数不满足大于等于 min,要给线程池添加一个线程来处理任务
            addWorker(null, false);
        }
    }
    
    hook 方法
    /* Extension hooks */
    
    protected void beforeExecute(Thread t, Runnable r) { }
    
    protected void afterExecute(Runnable r, Throwable t) { }
    
    /**
         * Method invoked when the Executor has terminated.  Default
         * implementation does nothing. Note: To properly nest multiple
         * overridings, subclasses should generally invoke
         * {@code super.terminated} within this method.
         */
    protected void terminated() { }
    
    shutdown、shutdownNow 相关方法
    /*
         * Methods for controlling interrupts to worker threads.
         */
    
    /**
         * If there is a security manager, makes sure caller has
         * permission to shut down threads in general (see shutdownPerm).
         * If this passes, additionally makes sure the caller is allowed
         * to interrupt each worker thread. This might not be true even if
         * first check passed, if the SecurityManager treats some threads
         * specially.
         */
    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }
    
    void onShutdown() {
    }
    
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }
    
    /**
    * 取出阻塞队列中没有被执行的任务并返回
         * Drains the task queue into a new list, normally using
         * drainTo. But if the queue is a DelayQueue or any other kind of
         * queue for which poll or drainTo may fail to remove some
         * elements, it deletes them one by one.
         */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        // drainTo 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数)
        // 通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            // 将 List 转换为数组,循环,取出 drainTo 方法未取完的元素
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }
    
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查当前线程是否有关闭线程池的权限
            checkShutdownAccess();
            // 将线程池状态提升为 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中断空闲线程,这里最终调用 interruptIdleWorkers(false);
            interruptIdleWorkers();
            // hook 方法,默认为空,让用户在线程池关闭时可以做一些操作
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 检查是否可以关闭线程池
        tryTerminate();
    }
    
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查线程是否具有关闭线程池的权限
            checkShutdownAccess();
            // 将线程池状态提升为 STOP
            advanceRunState(STOP);
            // 中断所有工作线程,无论是否空闲
            interruptWorkers();
            // 取出队列中没有被执行的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    
    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }
    
    public boolean isTerminating() {
        int c = ctl.get();
        return ! isRunning(c) && runStateLessThan(c, TERMINATED);
    }
    
    public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }
    
    // 等待线程池状态变为 TERMINATED 则返回,或者时间超时。由于整个过程独占锁,所以一般调用 shutdown 或者 shutdownNow 后使用
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 如果线程池状态为 TERMINATED,则返回 true
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                // 如果超时了,并且状态还不是 TERMINATED,则返回 false
                if (nanos <= 0)
                    return false;
                // 超时等待
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    RejectedExecutionHandler
    public interface RejectedExecutionHandler {
        //  r 为请求执行的任务,executor 为线程池
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    
    /* Predefined RejectedExecutionHandlers */
    
    /**
    直接在调用者线程中,运行当前被丢弃的任务
    这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降
         * A handler for rejected tasks that runs the rejected task
         * directly in the calling thread of the {@code execute} method,
         * unless the executor has been shut down, in which case the task
         * is discarded.
         */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
             * Creates a {@code CallerRunsPolicy}.
             */
        public CallerRunsPolicy() { }
    
        /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    
    /**
    直接抛出异常
         * A handler for rejected tasks that throws a
         * {@code RejectedExecutionException}.
         */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
             * Creates an {@code AbortPolicy}.
             */
        public AbortPolicy() { }
    
        /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 抛出 RejectedExecutionException 异常
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    
    /**
    丢弃无法处理的任务
         * A handler for rejected tasks that silently discards the
         * rejected task.
         */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
             * Creates a {@code DiscardPolicy}.
             */
        public DiscardPolicy() { }
    
        /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 不处理直接丢弃掉任务
        }
    }
    
    /**
    丢弃队列中最早被阻塞的线程
         * A handler for rejected tasks that discards the oldest unhandled
         * request and then retries {@code execute}, unless the executor
         * is shut down, in which case the task is discarded.
         */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
             * Creates a {@code DiscardOldestPolicy} for the given executor.
             */
        public DiscardOldestPolicy() { }
    
        /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // 从线程池中的阻塞队列中取出第一个元素(丢弃队列中最久的一个待处理任务)
                e.getQueue().poll();
                // 将请求处理的任务 r 再次放到线程池中去执行
                e.execute(r);
            }
        }
    }
    

    ScheduledThreadPoolExecutor 源码

    ScheduledThreadPoolExecutor 源码后面再分析。

  • 相关阅读:
    【Leetcode】23. Merge k Sorted Lists
    【Leetcode】109. Convert Sorted List to Binary Search Tree
    【Leetcode】142.Linked List Cycle II
    【Leetcode】143. Reorder List
    【Leetcode】147. Insertion Sort List
    【Leetcode】86. Partition List
    jenkins 配置安全邮件
    python 发送安全邮件
    phpstorm 同步远程服务器代码
    phpUnit 断言
  • 原文地址:https://www.cnblogs.com/wu726/p/15631997.html
Copyright © 2011-2022 走看看