zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor源码解读

    1. 背景与简介

    在Java中异步任务的处理,我们通常会使用Executor框架,而ThreadPoolExecutor是JUC为我们提供的线程池实现。
    线程池的优点在于规避线程的频繁创建,对线程资源统一管理,在任务到达时能快速响应。

    本文从JUC的ThreadPoolExecutor源码出发来剖析线程池的实现原理。

    要比较轻松地理解ThreadPoolExecutor源码,最好需要对AbstractQueuedSynchronizer, BlockingQueue, FutureTask等类有比较熟悉的认知
    另外也需要对Executor框架本身有基本认识。
    关于AbstractQueuedSynchronizer的源码解读,可以参考我的AQS解读
    关于FutureTask的源码解读可以参考我的FutureTask解读

    线程池的大致处理流程如下图所示,线程池内部有一个阻塞队列作为任务队列用以存储提交的任务,线程池中的工作线程作为消费者从阻塞队列中不断获取任务执行。

    上图截取自《Java并发编程的艺术》

    从逻辑角度来说,线程池可以划分出一个核心线程池,在新任务到达时,如果核心线程池未满,会创建新线程来运行任务,即便核心线程池有空闲线程。
    如果核心线程池满了,则会将任务加入到任务队列中,最终被工作线程从队列中取出执行。
    如果任务队列已满,只要线程数未达到最大线程数限制,会创建一个新线程来运行任务;否则会调用饱和策略来处理该任务。

    我们也可以参考下面的线程池执行示意图。

    上图截取自《Java并发编程的艺术》

    1.1 线程池参数

    这里介绍ThreadPoolExecutor中几个比较关键的变量/参数:

    • corePoolSize 核心线程数:如果线程池中线程数量小于corePoolSize,即便现有线程有空闲也会创建新线程来运行新任务
    • maximumPoolSize 最大线程数:如果线程池中线程数量大于corePoolSize并且任务队列满时会创建新线程来运行新任务
    • keepAliveTime 线程存活时间:若果线程池中线程数量大于corePoolSize,则多余的线程在空闲时间超过keepAliveTime后会退出
    • allowCoreThreadTimeout 核心线程超时控制标志位:用于标识是否keepAliveTime的效果同样作用在核心线程上。

    需要注意的是线程池中的实际工作线程数可能会超过maximumPoolSize,因为这个参数是可以通过setMaximumPoolSize方法动态调整的。

    下面介绍几种Executors工具类中提供的常见的基于ThreadPoolExecutor构造的参数配置

    fixedThreadPool

    corePoolSize : n
    maximumPoolSize : n
    keepAliveTime: 0
    workerQueue: LinkedBlockingQueue (无界阻塞队列)

    singleThreadExecutor

    corePoolSize : 1
    maximumPoolSize : 1
    keepAliveTime: 0
    workerQueue : LinkedBlockingQueue (无界阻塞队列)

    cachedThreadPool

    corePoolSize : 0
    maximumPoolSize : Integer.MAX
    keepAliveTime : 60s
    workerQueue : SynchronousQueue (同步队列)

    2. 生命周期

    线程池的完整生命周期具有如下五个阶段:

    • RUNNING
      这是线程池的初始状态。此状态下线程池会接受新任务并且处理队列中等待的任务。
    • SHUTDOWN
      RUNNING状态下调用shutdown方法后进入此状态。此状态下线程池不接受新任务,但会处理队列中等待的任务。
    • STOP
      RUNNING/SHUTDOWN状态下调用shutdownNow方法后进入此状态。此状态下线程池不接受新任务,也不处理既有等待任务,并且会中断既有运行中的线程。
    • TIDYING
      SHUTDOWN/STOP状态会流转到此状态。此时所有任务都已运行完毕,工作线程数为0,任务队列都为空。从字面角度理解,此时线程池已经清干净了。
    • TERMINATED
      TIDYING状态下,线程池执行完terminated钩子方法后进入此状态,此时线程池已完全终止。

    2.1 线程池中生命周期的表示

    ctl是线程池中一个核心状态控制变量,它的类型为AtomicInteger。ctl实际上存储了两方面信息:线程数和线程池的状态。
    ctl的低29位用于表示线程数,因此范围上界约为5亿;
    而高3位用于表示5种生命周期状态,对应的值分别是(注意:在ctl中实际上下面的值左移29位放在高3位存储):

    • RUNNING -1
    • SHUTDOWN 0
    • STOP 1
    • TIDYING 2
    • TERMINATED 3

    因此如果我们想要取出线程数就可以用ctl和(1<<29)-1作位与运算,而如果想取出线程池状态的话就用(1<<29)-1取反后和ctl作位与运算。


    图为自制的线程池的生命周期状态流转示意图

    3. 源码实现

    接下来,分几部分介绍线程池。

    • 工作线程封装: 介绍线程池中的Worker类, runWorker, getTask, processWorkerExit等方法。
    • 提交任务的处理: execute, addWorker, addWorkerFailed等方法。
    • 关闭线程池: shutdown, shutdownNow, tryTerminate, interruptWorkers, interruptIdleWorkers等方法。
    • 饱和策略: 内置四种饱和策略简介。

    3.1 工作线程的封装抽象

    ThreadPoolExecutor的Worker类是一个非常重要的内部类,它是对工作线程的封装,很有必要花上功夫详细解读。
    Worker类内部包含:

    • final Thread thread 对应的工作线程对象
    • Runnable firstTask 初始任务
    • volatile long completedTasks 用于统计完成的任务
      Worker类本身继承了AbstractQueuedSynchronizer,并实现了一个简单的互斥锁Mutex Lock。

    3.1.1 runWorker

    Worker类实现了Runnable接口,并将run方法的实现委托给了外部类ThreadPoolExecutor的runWorker方法。

    /**
     * 工作线程运行核心逻辑。
     * 简单来说做的事情就是不断从任务队列中拿取任务运行。
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        // 把firstTask设置为null,从GC角度来看,这处代码很重要。
        w.firstTask = null;
        // 置互斥锁状态为0,此时可以被中断。
        w.unlock();
        // 用于标记完成任务时是否有异常。
        boolean completedAbruptly = true;
        try {
            // 循环:初始任务(首次)或者从阻塞阻塞队列里拿一个(后续)。        
            while (task != null || (task = getTask()) != null) {
               /*
                * 获取互斥锁。
                * 在持有互斥锁时,调用线程池shutdown方法不会中断该线程。
                * 但是shutdownNow方法无视互斥锁,会中断所有线程。
                */
                w.lock();
                /*
                 * 这里if做的事情就是判断是否需要中断当前线程。
                 * 如果线程池至少处于STOP阶段,当前线程未中断,则中断当前线程;
                 * 否则清除线程中断位。
                 *
                 * if条件中的Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)
                 * 做的事情说穿了就是清除中断位并确认目前线程池状态没有达到STOP阶段。
                 */  
                if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                             runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 调用由子类实现的前置处理钩子。
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 真正的执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 调用由子类实现的后置处理钩子。
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 清空task, 计数器+1, 释放互斥锁。
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            /*
             * 处理工作线程退出。
             * 上面主循环中的前置处理、任务调用、后置处理都是可能会抛出异常的。
             */
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    3.1.2 getTask

    /**
     * 工作线程从任务队列中拿取任务的核心方法。
     * 根据配置决定采用阻塞或是时限获取。
     * 在以下几种情况会返回null从而接下来线程会退出(runWorker方法循环结束):
     * 1. 当前工作线程数超过了maximumPoolSize(由于maximumPoolSize可以动态调整,这是可能的)。
     * 2. 线程池状态为STOP (因为STOP状态不处理任务队列中的任务了)。
     * 3. 线程池状态为SHUTDOWN,任务队列为空 (因为SHUTDOWN状态仍然需要处理等待中任务)。
     * 4. 根据线程池参数状态以及线程是否空闲超过keepAliveTime决定是否退出当前工作线程。
     */
    private Runnable getTask() {
        // 上次从任务队列poll任务是否超时。
        boolean timedOut = false;
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            /*
             * 如果线程池状态已经不是RUNNING状态了,则设置ctl的工作线程数-1
             * if条件等价于 rs >= STOP || (rs == SHUTDOWN && workQueue.isEmpty())
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            /*
             * allowCoreThreadTimeOut是用于设置核心线程是否受keepAliveTime影响。
             * 在allowCoreThreadTimeOut为true或者工作线程数>corePoolSize情况下,
             * 当前工作线程受keepAliveTime影响。
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            /*
             * 1. 工作线程数>maximumPoolSize,当前工作线程需要退出。
             * 2. timed && timedOut == true说明当前线程受keepAliveTime影响且上次获取任务超时。
             *    这种情况下只要当前线程不是最后一个工作线程或者任务队列为空,则可以退出。
             *
             *    换句话说就是,如果队列不为空,则当前线程不能是最后一个工作线程,
             *    否则退出了就没线程处理任务了。
             */ 
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                // 设置ctl的workCount减1, CAS失败则需要重试(因为上面if中的条件可能不满足了)。
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                // 根据timed变量的值决定是时限获取或是阻塞获取任务队列中的任务。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // workQueue.take是不会返回null的,因此说明poll超时了。
                timedOut = true;
            } catch (InterruptedException retry) {
                // 在阻塞队列上等待时如果被中断,则清除超时标识重试一次循环。
                timedOut = false;
            }
        }
    }
    

    3.1.3 processWorkerExit

    在工作线程退出的时候,processWorkerExit方法会被调用。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        /*
         * 因为正常退出,workerCount减1这件事情是在getTask拿不到任务的情况下做掉的。
         * 所以在有异常的情况下,需要在本方法里给workCount减1。
         */
        if (completedAbruptly)
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加completedTaskCount,从工作线程集合移除自己。
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        // 由于workCount减1,需要调用tryTerminate方法。
        tryTerminate();
    
        int c = ctl.get();
        // 只要线程池还没达到STOP状态,任务队列中的任务仍然是需要处理的。
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                /* 
                 * 确定在RUNNING或SHUTDOWN状态下最少需要的工作线程数。
                 *
                 * 默认情况下,核心线程不受限制时影响,
                 * 在这种情况下核心线程数量应当是稳定的。
                 * 否则允许线程池中无线程。
                 */
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果任务队列非空,至少需要1个工作线程。
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 无需补偿工作线程。
                if (workerCountOf(c) >= min)
                    return;
            }
            // 异常退出或者需要补偿一个线程的情况下,加一个空任务工作线程。
            addWorker(null, false);
        }
    }
    

    3.2 提交任务的处理

    3.2.1 execute

    /**
     * execute方法可以说是线程池中最核心的方法,
     * 在继承链上层的AbstractExecutorService中将各种接受新任务的方法最终转发给了此方法进行任务处理。
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 分类讨论:
         * 1. 如果当前线程数<核心线程数,则会开启一个新线程来执行提交的任务。
         *
         * 2. 尝试向任务队列中添加任务。这时需要再次检查方法开始到当前时刻这段间隙,
         *    线程池是否已经关闭了/线程池中没有工作线程了。
         *    如果线程池已经关闭了,需要在任务队列中移除先前提交的任务。
         *    如果没有工作线程了,则需要添加一个空任务工作线程用于执行提交的任务。
         *
         * 3. 如果无法向阻塞队列中添加任务,则尝试创建一个新的线程执行任务。
         *    如果失败,回调饱和策略处理任务。
         */
        int c = ctl.get();
        // 线程数 < corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 检查线程池是否处于运行状态,并向任务队列中添加任务
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            /*
             * 再次检查是否线程池处于运行状态,如果不是则移除任务并回调饱和策略拒绝任务。
             * 因为有可能上面if条件读到线程池处于运行状态,而后shutdown/shutdownNow方法被调用,
             * 这时候需要把尝试刚才加入任务队列中的任务移除。
             */
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果workerCount为0,需要添加一个工作线程用于执行提交的任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /*
         *  添加一个新的工作线程处理任务。
         *  如果失败,则说明线程池已经关闭或者已经饱和了,此时回调饱和策略来拒绝任务。
         */
        else if (!addWorker(command, false))
            reject(command);
    }
    

    3.2.2 addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            /*
             * 如果线程池状态至少为STOP,返回false,不接受任务。
             * 如果线程池状态为SHUTDOWN,并且firstTask不为null或者任务队列为空,同样不接受任务。
             */
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                /*
                 * CAPACITY为(1<<29)-1,这是线程池中线程数真正的上界,绝不允许超过。
                 * 因为ThreadPoolExecutor设计中是用低29位表示工作线程数的。
                 *
                 * 否则根据参数中是否以corePoolSize为上界进行判断,如果超过,则新增worker失败。
                 */
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 成功新增workCount,跳出整个循环往下走。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                /* 
                 * 重读总控状态,如果运行状态变了,重试整个大循环。
                 * 否则说明是workCount发生了变化,重试内层循环。
                 */
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
        // 运行到此处时,线程池线程数已经成功+1,下面进行实质操作。
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 由于获取锁之前线程池状态可能发生了变化,这里需要重新读一次状态。
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 向工作线程集合添加新worker,更新largestPoolSize。
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 成功增加worker后,启动该worker线程。
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // worker线程如果没有成功启动,回滚worker集合和worker计数器的变化。
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    3.2.3 addWorkerFailed

    在新增工作线程失败的情况下,调用addWorkerFailed:

    1. 从worker集合删除失败的worker。
    2. workCount减1。
    3. 调用tryTerminate尝试终止线程池。
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
    

    3.3 线程池的关闭(shutdown与shutdownNow)

    在介绍线程池关闭shutdown与shutdownNow相关源码实现时,需要先分析两个很重要的方法

    • tryTerminate
      线程池的生命周期最终态为TERMINATED,然而TERMINATED状态的演进未必是调用shutdown/shutdownNow能做到,因为TERMINATED状态下,线程池中已经没有工作线程,也没有任务队列。tryTerminate方法里面包含了一个逻辑上的责任链,将线程池状态的演进动作在线程中传播下去。

    • interruptIdleWorkers
      Worker类是线程池对工作线程的封装抽象。它主要做的事情就是不断从任务队列中取任务执行,遇到异常退出。如果在工作线程等待任务(阻塞在阻塞队列)时中断该工作线程,则工作线程会重试一次getTask的循环来获取任务,获取不到就会退出runWorker方法的大循环,从而进入processWorkerExit方法收尾。ThreadPoolExecutor中线程中断的主要方法便是interruptIdleWorkers。可以通过参数控制是否最多中断1个线程。

    3.3.1 tryTerminate

    这是线程池中一个很重要的方法。它是实现线程池状态从SHUTDOWN或者STOP流转到TIDYING->TERMINATED的桥梁方法。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /*
             * 能够进行状态流转的情况是:
             * 1. STOP状态
             * 2. SHUTDOWN并且任务队列已空。
             */
            if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            /*
             * 这时只需要所有工作线程退出即可终止线程池。
             * 如果仍然有工作线程,则中断一个空闲的线程。
             *
             * 一旦空闲线程被终止,则会进入processWorkerExit方法,
             * 在processWorkerExit方法中即将退出的工作线程会调用tryTerminate,
             * 从而将终止线程池的动作通过这样的机制在线程间传播下去。
             */
            if (workerCountOf(c) != 0) {
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 这时workerCount已经为0,任务队列也已为空,状态流转到TIDYING。
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 调用terminated()钩子方法。
                        terminated();
                    } finally {
                        // 将线程池状态拨到TERMINATED。
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 唤醒所有在线程池终止条件上等待的线程。
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // 线程池状态流转CAS失败的话重试循环。
        }
    }
    

    3.3.2 interruptIdleWorkers

    /**
     * 参数中的onlyOne表示至多只中断一个工作线程。
     * 在tryTerminate方法读取到目前线程池离可以进入终止状态只剩下workCount降为0时,
     * 会调用interruptIdeleWorkers(true)。因为有可能此时其他所有线程都阻塞在任务队列上,
     * 只要中断任意一个线程,通过processWorkerExit -> tryTerminate ->interruptIdleWorkers,
     * 可以使线程中断+退出传播下去。
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        /* 
         * 这里加全局锁的一个很重要的目的是使这个方法串行化执行。
         * 否则在线程池关闭阶段,退出的线程会通过tryTerminate调用到此方法,
         * 并发尝试中断还未中断的线程,引发一场中断风暴。
         */
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 工作线程在处理任务阶段是被互斥锁保护的,从而这里不会中断到。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 最多中断一个。
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    

    3.3.3 shutdown

    shutdown方法关闭线程池是有序优雅的,线程池进入SHUTDOWN状态后不会接受新任务,但是任务队列中已有的任务会继续处理。
    shutdown方法会中断所有未处理任务的空闲线程。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 状态切换到SHUTDOWN。
            advanceRunState(SHUTDOWN);
            // 中断所有空闲线程,或者说在任务队列上阻塞的线程。
            interruptIdleWorkers();
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        // 尝试终止线程池(状态流转至TERMINATED)。
        tryTerminate();
    }
    
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    

    3.3.4 shutdownNow

    shutdownNow方法关闭线程池相比shutdown就暴力了一点,会中断所有线程,哪怕线程正在执行任务。
    线程池进入STOP状态后,不接受新的任务,也不会处理任务队列中已有的任务。
    但需要注意的是,即便shutdownNow即便会中断正在执行任务的线程,不代表你的任务一定会挂:如果提交的任务里面的代码没有对线程中断敏感的逻辑的话,线程中断也不会发生什么。

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 状态切换到STOP
            advanceRunState(STOP);
            // 与SHUTDOWN不同的是,直接中断所有线程。
            interruptWorkers();
            // 将任务队列中的任务收集到tasks。
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 尝试终止线程池(状态流转至TERMINATED)。
        tryTerminate();
        return tasks;
    }
    
    /**
     * 此方法只会被shutdownNow方法调用,用于中断所有工作线程。
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    void interruptIfStarted() {
        Thread t;
        /*
         * 这里中断已经执行过初次unlock的工作线程(参考runWorker方法逻辑),
         * 因为如果还没有走到初次unlcok那一步的工作线程,一定会读到线程池状态至少为STOP从而退出。
         */
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
    
    
    /**
     * 将任务队列中的任务dump出来。
     * 这个方法执行完以后,任务队列其实可能还会有残留的任务。
     * 比方说:我们的任务队列用LinkedBlockingQueue,事件顺序如下:
     * 时刻1:    线程池状态为RUNNGING,线程A执行ThreadPoolExecutor#execute方法的
     *             if (isRunning(c) && workQueue.offer(command)) 
     *             isRunning(c)返回true,此时还未执行offer操作。
     * 时刻2: 线程B执行shutdownNow,切换线程池状态到STOP,接下来执行完drainQueue方法。
     * 时刻3: 线程A开始执行offer操作,往任务队列中添加了任务。  
     *
     * 对于这种情况,确实drainQueue没有按照doc描述返回所有未执行的任务,
     * 但实际上在ThreadPoolExecutor#execute方法中,向任务队列中添加完任务后有个再次检查线程池状态的步骤。
     * 此时线程A一定能够读取到线程池状态已经不是RUNNING了,在将任务从队列中移除后会使用饱和策略处理任务。
     */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }
    

    3.4 饱和策略

    当线程池由于状态或者参数配置等原因无法执行任务时,会通过reject方法调用内置的RejectedExecutionHandler(Java并发编程实战将其译为饱和策略)处理任务。
    ThreadPoolExecutor中内置四种饱和策略,并且可以通过setRejectedExecutionHandler来动态调整。
    下面简单介绍一下四种饱和策略:

    • CallerRunsPolicy
      调用线程池提交任务的线程自己运行提交的任务,前提是线程池仍然处于RUNNING状态,否则任务会被静默丢弃。
    • AbortPolicy
      抛出RejectedExecutionException异常,这是线程池默认的饱和策略。
    • DiscardPolicy
      静默丢弃任务。
    • DiscardOldestPolicy
      丢弃任务队列中首部的任务,重新执行任务。

    四种默认饱和策略的实现都比较简单,就不对代码作介绍了。

    4. 思考与总结

    线程池的大致套路读懂并不是很难,包括代码中方法、语句的作用都不难读懂。难点在于读懂整体的设计精华、每一行代码为什么这么写。
    下面自问自答一些读源码过程中的思考与总结。

    4.1 mainLock的作用

    线程池中用于保存工作线程的是一个HashSet,还有一些统计的字段比如largestPoolSize用于统计线程池中出现过的最大线程数,completedTaskCount用于统计完成的任务数。
    这些东西的更新与读取都会被mainLock保护。这里很容易有个问题,为什么不用并发容器来保存工作线程?Doug Lea在源码的doc里的描述大意是:用锁可以串行化interruptIdleWorkers方法,避免关闭线程池时大量线程并发中断其他线程。另外在shutdown/shutdownNow时由于需要遍历工作线程集合来检查权限,在检查完权限后会中断工作线程。加上锁也可以保证在检查权限与中断线程过程中,工作线程集合元素不变。

    4.2 Worker为什么要实现Mutex锁

    Worker类继承AQS实现了一个简单不可重入的互斥锁,在执行用户提交任务的开始时需要获取锁,任务结束时需要释放锁。锁在这里最主要的目的是为了保证被别的线程中断时处于空闲状态,即没有在执行任务。当然如果shutdownNow方法被调用时,所有的线程都会被中断不管是否处于空闲状态。
    很自然会想到为什么不能复用ReentrantLock组合在里面呢?实际上这里不能用ReentrantLock,因为不能允许工作线程能够多次获取锁。
    我通过翻阅Doug Lea的代码库历史,发现当时有个ThreadPoolExecutor的bug,主要的问题就在于用户提交的任务通过调用ThreadPoolExecutor#setCorePoolSize -> interruptIdleWorkers 会把任务本身对应的工作线程给中断掉,因为工作线程可以通过tryLock方法重入了锁,这是不应该出现的预期外的结果。

    Doug Lea的对应修复

    把Worker类改成了继承AQS,实现简单的Mutex锁。

    4.3 TIDYING状态的意义

    ThreadPoolExecutor很早以前是只有四种状态而没有TIDYING的。我个人对此状态存在意义的思考是,TIDYING的加入使得ThreadPoolExecutor的状态跃迁逻辑更为平滑。
    Doug Lea在某次提交中加入了这个状态。
    TIDYING相当于很早以前的TERMINATED,目前ThreadPoolExecutor中TIDYING和TERMINATED之间的流转在于是否完成了terminated钩子方法的调用。

    5. 参考资料

    • 《Java并发编程实战》
    • 《Java并发编程的艺术》
  • 相关阅读:
    Drools只执行一个规则或者执行完当前规则之后不再执行其他规则(转)
    使用redis作为缓存,数据还需要存入数据库中吗?(转)
    双亲委派
    SpringBoot整合Mybatis传参的几种方式
    Drool规则引擎详解(转)
    IDEA导出可执行的jar包
    十:SpringBoot-配置AOP切面编程,解决日志记录业务
    九:SpringBoot-整合Mybatis框架,集成分页助手插件
    七:SpringBoot-集成Redis数据库,实现缓存管理
    八:SpringBoot-集成JPA持久层框架,简化数据库操作
  • 原文地址:https://www.cnblogs.com/micrari/p/7429364.html
Copyright © 2011-2022 走看看