zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor源码分析

    下面是我画的线程池的原理图。

    一、属性

    public class ThreadPoolExecutor extends AbstractExecutorService {
    
        //控制标识,32位(拆分为高3位和低29位)
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        //低29位,用来表示线程数
        private static final int COUNT_BITS = Integer.SIZE - 3;
        //高3位,用来表示线程池容量
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        //运行状态标识,存储在高3位中
        //11100000000000000000000000000000(前3位为111)
        private static final int RUNNING    = -1 << COUNT_BITS;
        //00000000000000000000000000000000(前3位为000)
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        //00100000000000000000000000000000(前3位为001)
        private static final int STOP       =  1 << COUNT_BITS;
        //01000000000000000000000000000000(前3位为010)
        private static final int TIDYING    =  2 << COUNT_BITS;
        //01100000000000000000000000000000(前3位为011)
        private static final int TERMINATED =  3 << COUNT_BITS;
        
        //工作队列。可以使用ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,PriorityBlockingQueue
        private final BlockingQueue<Runnable> workQueue;
        
        //工作线程池
        private final HashSet<Worker> workers = new HashSet<Worker>();
        
        private final ReentrantLock mainLock = new ReentrantLock();
        private final Condition termination = mainLock.newCondition();
        //记录池曾经达到过的最大线程数
        private int largestPoolSize;
    
        //已完成的任务计数器。仅被工作线程的中断方法中更新。持有锁才能访问。
        private long completedTaskCount;    
        
        //线程工厂
        private volatile ThreadFactory threadFactory;
    
        //饱和策略
        private volatile RejectedExecutionHandler handler;
        
        //非核心线程的空闲时间,超出该时间线程会销毁。
        private volatile long keepAliveTime;
        //是否允许核心线程超时。默认为false。
        private volatile boolean allowCoreThreadTimeOut;
    
        //核心线程数
        private volatile int corePoolSize;
    
        /**
         * Maximum pool size. Note that the actual maximum is internally bounded by CAPACITY.
         * 最大线程数
         */
        private volatile int maximumPoolSize;
        
        //默认的饱和策略
        private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
        
        private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
        
        ……
        
    }

    线程池有5个状态,分别是:

    • RUNNING:可以接受新的任务,也可以处理阻塞队列里的任务
    • SHUTDOWN:不接受新的任务,但是可以处理阻塞队列里的任务
    • STOP:不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务
    • TIDYING:过渡状态,也就是说所有的任务都执行完了,当前线程池已空,这个时候线程池的状态将会TIDYING,并且将要调用terminated方法
    • TERMINATED:终止状态。terminated方法调用完成以后的状态

    下面是线程池的状态流转图,研究线程池源码时,尤其是涉及到线程池状态的改变时,结合着该图来看将会十分方便。

    二、线程池的创建

    下面是线程池的构造方法,实际上还有几个重载的构造方法,无非是某些参数使用了默认值而已,而下面这个是参数最完整的。

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)

    JDK还提供了一个Executors工厂类,专门用来创建线程池。(实际开发中不推荐使用该方式,而推荐直接使用创建ThreadPoolExecutor实例的方式)

        //固长线程池。队列使用LinkedBlockingQueue
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
        
        //单线程。队列使用LinkedBlockingQueue
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
        
        //缓存线程池。队列使用SynchronousQueue
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
        
        //可调度线程池。
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }

    FixedThreadPool:核心线程数和线程池容量一样,使用LinkedBlockingQueue作为任务队列,是无界的。

    SingleThreadExecutor:核心线程数和线程容量都是1,使用LinkedBlockingQueue作为任务队列,是无界的。

    CachedThreadPool:核心线程数为0,池无限大,使用SynchronousQueue作为任务队列。指定了非核心线程的超时时间60s。

    ScheduledExecutor:

    三、提交任务

    任务提交到线程池后,线程池会按照以下3个步骤来处理任务。

    1.如果当前运行的线程数少于核心线程数,则新创建一个线程来执行任务,无论是否有核心线程处于空闲状态都会这样做

    2.当正在运行的线程数量已达到了核心线程数,则直接将任务添加到任务队列

    3.如果队列已满而无法将任务添加到任务队列,则创建新的线程来执行任务。如果创建新线程会使当前运行的线程数超过maximumPoolSize,任务将被拒绝,并调用             

    RejectedExecutionHandler.rejectedExecution()方法。

    execute()方法

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            /* 分为3个步骤:
             * 1.如果少于核心线程数:创建并启动一个新线程来执行任务。
             * 2.如果任务队列未满,则将任务加入队列。
             * 3.如果队列也满了,则创建新线程来执行任务。
             */
            int c = ctl.get();
            //1.池中线程数少于核心线程数,则创建一个新线程执行任务【即使有线程空闲也不会让其直接运行任务】
            if (workerCountOf(c) < corePoolSize) {
                //创建并启动一个新线程来执行任务
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //运行到这里,说明核心线程已满
    
            //2.任务队列未满,则添加到队列中
            if (isRunning(c) && workQueue.offer(command)) {
                //再次检查池的状态
                int recheck = ctl.get();
                //如果此时池关闭了,则从任务队列中移除提交的任务【池关闭会拒绝新任务】
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //检查池中是否有工作线程。没有则新创建一个线程来执行任务
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            
            //3.核心线程和任务队列都满了,创建一个新线程来执行该任务。如果线程已饱和,则执行饱和策略
            else if (!addWorker(command, false))
                reject(command);
        }
        
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);//获取线程池状态
    
                // Check if queue empty only if necessary.+
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    //拒绝添加任务
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    //
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 1.CAS方式将Worker数量加1
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
    
                    /*
                     * 重新读取状态,如果运行状态变了,重试整个大循环。
                     * 否则说明是workCount发生了变化,重试内层循环。
                     */
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            // 运行到此处时,线程池线程数已经成功+1,下面进行实质操作。
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
                        int rs = runStateOf(c);
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // 2.向工作线程集合添加新worker,更新largestPoolSize。
                            workers.add(w);
    
                            /**防止池溢出**/
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    //3.添加worker成功后,启动线程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                //4.启动worker失败,回滚上面的操作
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

    submit()方法

    还可以使用submit来将任务提交给线程池执行。有下面3个重载的submit方法:

    • public Future<?> submit(Runnable task)
    • public <T> Future<T> submit(Runnable task, T result)
    • public <T> Future<T> submit(Callable<T> task)

    实际上都差不多,来跟踪下第一个submit方法

        public Future<?> submit(Runnable task) {
            if (task == null) 
            throw new NullPointerException();
        //将task包装成FutureTask
            RunnableFuture<Void> ftask = newTaskFor(task, null);
        //仍然是调用execute方法来执行任务
            execute(ftask);
            return ftask;
        }
        
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }

    可以看到,使用submit提交任务(无论是Runnable还是Callable),线程池会将任务包装成为一个FutureTask。

    四、执行任务和线程退出

    1.runWorker()方法

    如果任务提交成功后,由核心线程或新创建线程来执行任务。

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            // 用于标记完成任务时是否有异常。
            boolean completedAbruptly = true;
            try {
                // 不停执行任务:执行提交的初始任务(首次)或者从阻塞队列里取任务执行(后续)。
                while (task != null || (task = getTask()) != null) {
                    //加锁
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    //这里if做的事情就是判断是否需要中断当前线程。
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //调用子类实现的前置钩子方法
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //真正的运行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            //调用子类实现的后置钩子方法
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        //释放锁
                        w.unlock();
                    }
                }
                //能执行到这里说明没有发生异常,标记一下
                completedAbruptly = false;
            } finally {
                //处理工作线程退出。
                processWorkerExit(w, completedAbruptly);
            }
        }

    2.processWorkerExit()方法

    线程退出,包括执行完任务正常退出,和执行过程中异常退出。

        /**
         * Performs cleanup and bookkeeping for a dying worker. Called
         * only from worker threads. Unless completedAbruptly is set,
         * assumes that workerCount has already been adjusted to account
         * for exit.  This method removes thread from worker set, and
         * possibly terminates the pool or replaces the worker if either
         * it exited due to user task exception or if fewer than
         * corePoolSize workers are running or queue is non-empty but
         * there are no workers.
         *
         * 即将死亡的线程做清理和统计工作。该方法仅被工作线程调用。
         *
         * 该方法会从工作线程集合中移除线程。同时,如果由于用户任务异常导致线程退出,
         * 或运行的线程数少于核心线程数,或队列非空但池已空,则可能终止线程池或者替换掉线程。
         *
         * @param completedAbruptly if the worker died due to user exception
         */
        private void processWorkerExit(Worker w, boolean completedAbruptly) {
            //如果是异常退出,将工作线程数减1。如果是突然中断,工作线程数无法进行调整
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                //移除线程
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
            //这里不明白为什么要调用该方法?
            tryTerminate();
    
            int c = ctl.get();
            // 只要线程池还没达到STOP状态,任务队列中的任务仍然需要处理。
            // 因此需要判断是否需要补充线程
            if (runStateLessThan(c, STOP)) {
                //正常退出
                if (!completedAbruptly) {
                    /**
                     * 下面这段代码就是在计算是否需要补充新线程
                     */
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    //任务队列中还有任务,则至少还需要一个工作线程
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    //线程够用,无需创建新线程
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
    
                //异常退出,或者正常退出但需要补充线程。则补充一个新线程来替代原线程
                addWorker(null, false);
            }
        }

    3.addWorkder()方法

    线程异常退出,或需要补充线程,则补充一个线程。

        private boolean addWorker(Runnable firstTask, boolean core) {
            /**1.将Worker数量+1**/
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);//获取线程池状态
    
                // Check if queue empty only if necessary.+
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    //拒绝添加任务
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    //
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))//核心线程满或线程池满
                        return false;
                    // CAS方式将Worker数量加1。添加成功,跳过for循环。
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
    
                    /*
                     * 重新读取状态,如果运行状态变了,重试整个大循环。
                     * 否则说明是workCount发生了变化,重试内层循环。
                     */
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            // 运行到此处时,线程池线程数已经成功+1。
    
            boolean workerStarted = false;//线程启动标识
            boolean workerAdded = false;//线程添加成功标识
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
                        int rs = runStateOf(c);
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // 2.向工作线程集合添加新worker,更新largestPoolSize。
                            workers.add(w);
    
                            /**防止池溢出**/
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;//标识添加成功
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    //3.添加worker成功后,启动该worker线程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;//标识启动成功
                    }
                }
            } finally {
                //4.启动worker失败,回滚上面的操作
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
        
        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //1.移除worker
                if (w != null)
                    workers.remove(w);
                //2.worker数减1
                decrementWorkerCount();
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }

    五、从任务队列取任务

    getTask()方法

    从上面runWorker方法中的while循环可知,工作线程会不断从任务队列取任务来执行。源码如下

    /**
     * 工作线程从任务队列中拿取任务的核心方法。
     * 根据配置决定采用阻塞或是时限获取。
     * 在以下几种情况会返回null从而接下来线程会退出(runWorker方法循环结束):
     * 1. 当前工作线程数超过了maximumPoolSize(由于maximumPoolSize可以动态调整,这是可能的)。
     * 2. 线程池状态为STOP (因为STOP状态不处理任务队列中的任务了)。
     * 3. 线程池状态为SHUTDOWN,任务队列为空 (因为SHUTDOWN状态仍然需要处理等待中任务)。
     * 4. 根据线程池参数状态以及线程是否空闲超过keepAliveTime决定是否退出当前工作线程。
     */
    private Runnable getTask() {
        // 上次从任务队列poll任务是否超时。
        boolean timedOut = false;
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            /*
             * 如果线程池状态已经不是RUNNING状态了,则设置ctl的工作线程数-1
             * if条件等价于 rs >= STOP || (rs == SHUTDOWN && workQueue.isEmpty())
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            /*
             * allowCoreThreadTimeOut是用于设置核心线程是否受keepAliveTime影响。
             * 在allowCoreThreadTimeOut为true或者工作线程数>corePoolSize情况下,
             * 当前工作线程受keepAliveTime影响。
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            /*
             * 1. 工作线程数>maximumPoolSize,当前工作线程需要退出。
             * 2. timed && timedOut == true说明当前线程受keepAliveTime影响且上次获取任务超时。
             *    这种情况下只要当前线程不是最后一个工作线程或者任务队列为空,则可以退出。
             *
             *    换句话说就是,如果队列不为空,则当前线程不能是最后一个工作线程,
             *    否则退出了就没线程处理任务了。
             */ 
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                // 设置ctl的workCount减1, CAS失败则需要重试(因为上面if中的条件可能不满足了)。
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                // 根据timed变量的值决定是时限获取或是阻塞获取任务队列中的任务。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // workQueue.take是不会返回null的,因此说明poll超时了。
                timedOut = true;
            } catch (InterruptedException retry) {
                // 在阻塞队列上等待时如果被中断,则清除超时标识重试一次循环。
                timedOut = false;
            }
        }
    }

    六、执行饱和策略

    饱和策略的定义

    饱和策略,也可以翻译为拒绝策略。当线程池和有界队列都满时,饱和策略开始发挥作用。无界队列可以无限存储,不会发生饱和,所以不会执行饱和策略。另外,当线程池关闭时也会执行饱和策略。线程池中预定义了4种饱和策略:

    • AbortPolicy:中止策略。默认的饱和策略,会直接抛出异常。
    • CallerRunsPolicy:调用者运行策略。既不丢弃任务,也不抛异常。只要线程池未关闭,直接在调用者线程中运行当前被丢弃的任务
    • DiscardPolicy:抛弃策略。直接抛弃掉任务。
    • DiscardOldestPolicy:抛弃最老任务策略。抛弃掉最老的任务,然后重新执行当前被丢弃的任务。

    源码中饱和策略的定义:

        private volatile RejectedExecutionHandler handler;
        
        //默认的饱和策略:AbortPolicy
        private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
        
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
        
        /***预定义的饱和策略***/
        /**
         *调用者执行策略。
         */
        public static class CallerRunsPolicy implements RejectedExecutionHandler {
    
            public CallerRunsPolicy() { }
    
            //直接在调用者线程中执行任务,除非线程池关闭将任务丢弃
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    
        /**
         * 终止策略(默认的饱和策略)。
         */
        public static class AbortPolicy implements RejectedExecutionHandler {
    
            public AbortPolicy() { }
    
            //抛出RejectedExecutionException异常。
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    
        /**
         * 抛弃策略。
         */
        public static class DiscardPolicy implements RejectedExecutionHandler {
    
            public DiscardPolicy() { }
    
            /**
             * Does nothing, which has the effect of discarding task r.
             * 什么都不做。悄悄的抛弃任务。
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
        /**
         * 抛弃最老策略。
         */
        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    
            public DiscardOldestPolicy() { }
    
            /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             * 获取线程池即将执行的下一个可用任务并将其忽略,然后重新尝试执行任务,除非线程池关闭将任务丢弃
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }

    执行饱和策略

    有两种情况会执行拒绝策略:

    ①当线程池和有界队列都满时,会执行饱和策略

    ②当线程池关闭时,会执行饱和策略。

    这两种情况都在前面的execute()方法中,可以回到execute()方法源码查看。

        public void execute(Runnable command) {
            //执行饱和策略
            reject(command);
            ……
        }
        
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    
        //默认的饱和策略。
        public static class AbortPolicy implements RejectedExecutionHandler {
    
            public AbortPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                //直接抛出RejectedExecutionHandler异常
                throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
            }
        }

    七、关闭线程池

    shutdown()方法

        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                //运行状态提升为SHUTDOWN
                advanceRunState(SHUTDOWN);
                //中断所有空闲的线程(仅空闲的线程停止,正在执行任务的线程继续干活)
                interruptIdleWorkers();
                //执行ScheduledThreadPoolExecutor的钩子方法
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            //等待池空,完成池的状态跃迁(同时也会将空闲线程关闭)
            tryTerminate();
        }
    
        /**
         * 以下两种情况,转变为TERMINATED状态。
         * ①如果是SHUTDOWN状态,同时池和队列都空。
         * ②如果是STOP状态,同时池空。
         */
        final void tryTerminate() {
            /**
             * 池未空时,不断循环来终止空闲线程(相当于在等待未完成任务的线程完成任务)。
             * 一旦池空,则将池的状态跃迁至TIDYING,最终跃迁至TERMINATED
             */
            for (;;) {
                int c = ctl.get();
                // RUNNING状态
                // TIDYING/TERMINATED状态
                // SHUTDOWN状态,同时任务队列非空
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
    
                //池未空,则终止1个空闲线程
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    //for循环内一次仅终止1个空闲线程。
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
                //池已空,完成池的状态跃迁
    
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //状态跃迁至TIDYING
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            //执行钩子方法,默认do nothing
                            terminated();
                        } finally {
                            //状态跃迁至TERMINATED
                            ctl.set(ctlOf(TERMINATED, 0));
                            //通知池已彻底关闭
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }

    shutdown方法不会终止正在执行任务的线程,而只会终止所有空闲的线程,之后在tryTerminate方法中不断检查池是否已空。此时还有任务在继续进行,一旦执行任务的线程执行完,线程就会退出。

    最后当池空时,线程池状态跃迁到TIYDING,最后跃迁到TERMINATED,线程池彻底关闭。实际上这两个状态中间只是多了一个terminated()方法调用,该方法是一个钩子方法,由子类完成扩展逻辑,默认什么都不会做。

    shutdownNow方法

        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                //运行状态直接提升为STOP
                advanceRunState(STOP);
                //中断所有的工作线程(包括正在执行任务的线程也会停止)
                interruptWorkers();
                //备份任务列表(提交了但未执行的任务。不会备份正在执行但被中断的任务)
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            //等待池空,完成池的状态跃迁(同时也会将空闲线程关闭)
            tryTerminate();
            //返回提交了但未执行的任务
            return tasks;
        } 

    从源码可以看出,shutdown和shutdownNow两者的区别。

    shutdown:行为比较平缓优雅。不再接受新的任务,并等待已经提交的任务执行完,包括任务队列中还未开始执行的任务

    shutdownNow:行为比较粗暴。尝试取消所有运行中的任务,不会执行队列中的任务。最后会将任务队列中的返回。

    参考:ThreadPoolExecutor源码解读

    总结

    1.线程池的作用?

    2.有哪几种类型的线程池?

    线程池ThreadPoolExecutor通常由工厂类Executors来创建。

    Executors可以创建SingleThreadExecutor,FixedThreadPool,CachedThreadPool以及ScheduledThreadPool等不同的线程池。

    3.这几种线程池的区别?

    选择哪种线程池

    • Executors.newCachedTheadPool:对于小程序和轻载的服务器,我们可以使用它是个不错的选择
    • Executors.newFixedThreadPool:对于大负载的服务器来说,缓存的线程池就不是很好的选择了!在缓存的线程池中,被提交的任务没有排成队,而是直接交给线程执行。如果没有线程可用,则创建新的线程,如若服务器负载较重,以致它所有的cpu都完全被占用,当有更多任务时,则会创建更多的线程,情况则会变得更糟。因此,在大负载的产品服务器中,最好使用Executors.newFixedThreadPool。或者为了最大限度的控制它,可以直接使用ThreadPoolExecutor类。
    • Executors.newSingleThreadExecutor:对于在希望在另一个线程中连续运行的事物(长期存活的任务)来说,都是很有用的,例如监听进入的socket连接的任务。对希望在线程中运行的段任务也同样方便。例如,更新本地或远程日志的小任务,或者是事件分发线程。

    ------------《java编程思想》第4版和《Effective Java中文版》第2版

    4.向线程池提交任务的过程?

    5.execute和submit的区别

    使用submit提交任务时,线程池会将任务包装成FutureTask。当发生异常时,会将异常保持在future里,并包装在ExecutionException里,当调用Future.get()时,再次throw,这时可以调用ExecutionException.getCause()获取包装的exception,这种情况下,设置UncaughtExceptionHandler也不会被调用。

    对应execute的任务,会直接throw,可以设置一个UncaughtExceptionHandler

    6.有哪几种阻塞队列?

    7.有哪几种拒绝策略?默认的拒绝策略?什么时候执行拒绝策略?

    有两种情况会执行拒绝策略:

    ①当线程池和有界队列都满时,会执行拒绝策略

    ②当线程池关闭时,会执行拒绝策略。

    8.线程池中的线程执行时发生异常,会有什么结果?

    线程池会补充新的线程来执行任务。

    参考:博客

  • 相关阅读:
    CSS3 object-fit 图像裁剪
    jQuery.extend 使用函数
    ios 不支持iframe 解决方案
    详解HTML5中rel属性的prefetch预加载功能使用
    web页面加载、解析、渲染过程
    TCP的三次握手(建立连接)与 四次挥手(关闭连接)
    html---规范、细节积累-01
    pio设置单元格式
    让一个数字显示指定位数
    linux下获取微秒级精度的时间
  • 原文地址:https://www.cnblogs.com/rouqinglangzi/p/10083821.html
Copyright © 2011-2022 走看看