zoukankan      html  css  js  c++  java
  • Threadpoolexecutor源码分析

    前言

      为了对线程的统一管理分配,引入了线程池。我们来看看它的源码吧!

    接口、类的关系

    我们来看看Executor、ExecutorService、AbstractExecutorService

    Executor接口

    Executor接口只有一个方法execute,传入线程任务参数

    ExecutorService接口

    public interface ExecutorService extends Executor {
    
        void shutdown();
    
        List<Runnable> shutdownNow();
    
        boolean isShutdown();
    
        boolean isTerminated();
    
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        <T> Future<T> submit(Callable<T> task);
    
        <T> Future<T> submit(Runnable task, T result);
    
        Future<?> submit(Runnable task);
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
       
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

    ExecutorService接口继承Executor接口,并增加了submit、shutdown、invokeAll等等一系列方法。

    AbstractExecutorService抽象类

    public abstract class AbstractExecutorService implements ExecutorService {
    
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                  boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {...}
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {... }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {...}
    
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {...}
    
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {...}
    
    }

    AbstractExecutorService抽象类实现ExecutorService接口,并且提供了一些方法的默认实现,例如submit方法、invokeAny方法、invokeAll方法。

    像execute方法、线程池的关闭方法(shutdown、shutdownNow等等)就没有提供默认的实现。

    ThreadPoolExecutor

    属性

    我们先看看与线程池状态相关的属性

    //记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //线程数量统计位数29  Integer.SIZE=32 
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //容量 000 11111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    //运行中 111 00000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    //关闭 000 00000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //停止 001 00000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
    //整理 010 00000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    //终止 011 00000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    //获取运行状态(获取前3位)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //获取线程个数(获取后29位)
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    • RUNNING:接受新任务并且处理阻塞队列里的任务
    • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
    • STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
    • TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
    • TERMINATED:终止状态。terminated方法调用完成以后的状态

     

    与线程工作相关的属性

    构造器

    参数介绍

     

    普通方法

    submit 提交任务AbstractExecutorService中的方法

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;

    流程步骤如下

    1. 调用submit方法,传入Runnable或者Callable对象
    2. 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
    3. 将传入的对象转换为RunnableFuture对象
    4. 执行execute方法,传入RunnableFuture对象
    5. 返回RunnableFuture对象

    execute 执行

    public void execute(Runnable command) {
       //传进来的线程为null,则抛出空指针异常
       if (command == null)
           throw new NullPointerException();
      
       //获取当前线程池的状态+线程个数变量
       int c = ctl.get();
       /**
        * 3个步骤
        */
       //1.判断当前线程池线程个数是否小于corePoolSize,小于则调用addWorker方法创建新线程运行,且传进来的Runnable当做第一个任务执行。
       //如果调用addWorker方法返回false,则直接返回
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
               return;
           c = ctl.get();
       }
    
       //2.如果线程池处于RUNNING状态,则添加任务到阻塞队列
       if (isRunning(c) && workQueue.offer(command)) {
    
           //二次检查
           int recheck = ctl.get();
           //如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
           if (! isRunning(recheck) && remove(command))
               reject(command);
    
           //否者如果当前线程池线程空,则添加一个线程
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
       }
       //3.新增线程,新增失败则执行拒绝策略
       else if (!addWorker(command, false))
           reject(command);
    }

    其实从上面代码注释中可以看出就三个判断,

    1. 核心线程数是否已满
    2. 队列是否已满
    3. 线程池是否已满

    然后根据这三个条件进行不同的操作,下图是Java并发编程的艺术书中的线程池的主要处理流程,或许会比较容易理解些

     

    下面是整个流程的详细步骤

    1. 调用execute方法,传入Runable对象
    2. 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
    3. 获取当前线程池的状态和线程个数变量
    4. 判断当前线程数是否小于核心线程数,是走流程5,否则走流程6
    5. 添加线程数,添加成功则结束,失败则重新获取当前线程池的状态和线程个数变量,
    6. 判断线程池是否处于RUNNING状态,是则添加任务到阻塞队列,否则走流程10,添加任务成功则继续流程7
    7. 重新获取当前线程池的状态和线程个数变量
    8. 重新检查线程池状态,不是运行状态则移除之前添加的任务,有一个false走流程9,都为true则走流程11
    9. 检查线程池线程数量是否为0,否则结束流程,是调用addWorker(null, false),然后结束
    10. 调用!addWorker(command, false),为true走流程11,false则结束
    11. 调用拒绝策略reject(command),结束

      execute方法中主要使用到addWorker方法,addWorker方法用于创建线程,并且通过core参数表示该线程是否是核心线程,如果返回true则表示创建成功,否则失败。addWorker的代码如下所示:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) { // 外层无限循环
                // 获取线程池控制状态
                int c = ctl.get();
                // 获取状态
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&            // 状态大于等于SHUTDOWN,初始的ctl为RUNNING,小于SHUTDOWN
                    ! (rs == SHUTDOWN &&        // 状态为SHUTDOWN
                       firstTask == null &&        // 第一个任务为null
                       ! workQueue.isEmpty()))     // worker队列不为空
                    // 返回
                    return false;
    
                for (;;) {
                    // worker数量
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||                                // worker数量大于等于最大容量
                        wc >= (core ? corePoolSize : maximumPoolSize))    // worker数量大于等于核心线程池大小或者最大线程池大小
                        return false;
                    if (compareAndIncrementWorkerCount(c))                 // 比较并增加worker的数量
                        // 跳出外层循环
                        break retry;
                    // 获取线程池控制状态
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs) // 此次的状态与上次获取的状态不相同
                        // 跳过剩余部分,继续循环
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            // worker开始标识
            boolean workerStarted = false;
            // worker被添加标识
            boolean workerAdded = false;
            // 
            Worker w = null;
            try {
                // 初始化worker
                w = new Worker(firstTask);
                // 获取worker对应的线程
                final Thread t = w.thread;
                if (t != null) { // 线程不为null
                    // 线程池锁
                    final ReentrantLock mainLock = this.mainLock;
                    // 获取锁
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        // 线程池的运行状态
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||                                    // 小于SHUTDOWN
                            (rs == SHUTDOWN && firstTask == null)) {            // 等于SHUTDOWN并且firstTask为null
                            if (t.isAlive()) // precheck that t is startable    // 线程刚添加进来,还未启动就存活
                                // 抛出线程状态异常
                                throw new IllegalThreadStateException();
                            // 将worker添加到worker集合
                            workers.add(w);
                            // 获取worker集合的大小
                            int s = workers.size();
                            if (s > largestPoolSize) // 队列大小大于largestPoolSize
                                // 重新设置largestPoolSize
                                largestPoolSize = s;
                            // 设置worker已被添加标识
                            workerAdded = true;
                        }
                    } finally {
                        // 释放锁
                        mainLock.unlock();
                    }
                    if (workerAdded) { // worker被添加
                        // 开始执行worker的run方法
                        t.start();
                        // 设置worker已开始标识
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted) // worker没有开始
                    // 添加worker失败
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

    说明:此函数可能会完成如下几件任务

      ① 原子性的增加workerCount。

      ② 将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中。

      ③ 启动worker对应的线程,并启动该线程,运行worker的run方法。

      ④ 回滚worker的创建动作,即将worker从workers集合中删除,并原子性的减少workerCount。

    Worker对象

      Worker是定义在ThreadPoolExecutor中的finnal类,其中继承了AbstractQueuedSynchronizer类和实现Runnable接口,其中的run方法如下

    public void run() { runWorker(this); }

      线程启动时调用了runWorker方法,关于类的其他方面这里就不在叙述。

    runWorker

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            //循环获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 当线程池是处于STOP状态或者TIDYING、TERMINATED状态时,设置当前线程处于中断状态
                // 如果不是,当前线程就处于RUNNING或者SHUTDOWN状态,确保当前线程不处于中断状态
                // 重新检查当前线程池的状态是否大于等于STOP状态
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //提供给继承类使用做一些统计之类的事情,在线程运行前调用
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //提供给继承类使用做一些统计之类的事情,在线程运行之后调用
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //统计当前worker完成了多少个任务
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //整个线程结束时调用,线程退出操作。统计整个线程池完成的任务个数之类的工作
            processWorkerExit(w, completedAbruptly);
        }
    }

      说明:此函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成)。在执行给定任务时,会调用钩子函数,利用钩子函数可以完成用户自定义的一些逻辑。在runWorker中会调用到getTask函数和processWorkerExit钩子函数,其中,getTask函数源码如下

    getTask

    getTask方法的主要作用其实从方法名就可以看出来了,就是获取任务

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        //循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            //线程线程池状态和队列是否为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //线程数量
            int wc = workerCountOf(c);
    
            
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            //(当前线程数是否大于最大线程数或者)
            //且(线程数大于1或者任务队列为空)
            //这里有个问题(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                //获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

     说明:此函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。在该函数中还会响应shutDown和、shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnalbe对象。

      processWorkerExit函数是在worker退出时调用到的钩子函数,而引起worker退出的主要因素如下

      ① 阻塞队列已经为空,即没有任务可以运行了。

      ② 调用了shutDown或shutDownNow函数

      processWorkerExit的源码如下

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // 如果被中断,则需要减少workCount    // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
            // 获取可重入锁
            final ReentrantLock mainLock = this.mainLock;
            // 获取锁
            mainLock.lock();
            try {
                // 将worker完成的任务添加到总的完成任务中
                completedTaskCount += w.completedTasks;
                // 从workers集合中移除该worker
                workers.remove(w);
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            // 尝试终止
            tryTerminate();
            // 获取线程池控制状态
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) { // 小于STOP的运行状态
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty()) // 允许核心超时并且workQueue阻塞队列不为空
                        min = 1;
                    if (workerCountOf(c) >= min) // workerCount大于等于min
                        // 直接返回
                        return; // replacement not needed
                }
                // 添加worker
                addWorker(null, false);
            }
        }

    关闭线程池

    shutdown

    当调用shutdown方法时,线程池将不会再接收新的任务,然后将先前放在队列中的任务执行完成。

    下面是shutdown方法的源码

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    说明:此函数会按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。首先会检查是否具有shutdown的权限,然后设置线程池的控制状态为SHUTDOWN,之后中断空闲的worker,最后尝试终止线程池。
    尝试终止线程池tryTerminate的源码如下

    final void tryTerminate() {
            for (;;) { // 无限循环,确保操作成功
                // 获取线程池控制状态
                int c = ctl.get();
                if (isRunning(c) ||                                            // 线程池的运行状态为RUNNING
                    runStateAtLeast(c, TIDYING) ||                            // 线程池的运行状态最小要大于TIDYING
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))    // 线程池的运行状态为SHUTDOWN并且workQueue队列不为null
                    // 不能终止,直接返回
                    return;
                if (workerCountOf(c) != 0) { // 线程池正在运行的worker数量不为0    // Eligible to terminate
                    // 仅仅中断一个空闲的worker
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
                // 获取线程池的锁
                final ReentrantLock mainLock = this.mainLock;
                // 获取锁
                mainLock.lock();
                try {
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 比较并设置线程池控制状态为TIDYING
                        try {
                            // 终止,钩子函数
                            terminated();
                        } finally {
                            // 设置线程池控制状态为TERMINATED
                            ctl.set(ctlOf(TERMINATED, 0));
                            // 释放在termination条件上等待的所有线程
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }

    说明:如果线程池的状态为SHUTDOWN并且线程池和阻塞队列都为空或者状态为STOP并且线程池为空,则将线程池控制状态转化为TERMINATED;

    否则,将中断一个空闲的worker,其中,interruptIdleWorkers的源码如下

    private void interruptIdleWorkers(boolean onlyOne) {
            // 线程池的锁
            final ReentrantLock mainLock = this.mainLock;
            // 获取锁
            mainLock.lock();
            try {
                for (Worker w : workers) { // 遍历workers队列
                    // worker对应的线程
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) { // 线程未被中断并且成功获得锁
                        try {
                            // 中断线程
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            // 释放锁
                            w.unlock();
                        }
                    }
                    if (onlyOne) // 若只中断一个,则跳出循环
                        break;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
        }

    shutdownNow

    立即停止所有的执行任务,并将队列中的任务返回

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    说明:此函数将会中断正在等待任务的空闲worker。

      shutdownNow函数与shutdown函数相似,shutdownNow会尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表,但是其会终止所有的worker,而并非空闲的worker。

    shutdown和shutdownNow区别

    shutdown和shutdownNow这两个方法的作用都是关闭线程池,流程大致相同,只有几个步骤不同,如下

    1. 加锁
    2. 检查关闭权限
    3. CAS改变线程池状态
    4. 设置中断标志(线程池不在接收任务,队列任务会完成)/中断当前执行的线程
    5. 调用onShutdown方法(给子类提供的方法)/获取队列中的任务
    6. 解锁
    7. 尝试将线程池状态变成终止状态TERMINATED
    8. 结束/返回队列中的任务

     对于其他的函数,有兴趣的读者可以自行分析,下面通过一个示例来详细讲解ThreadPoolExecutor的内部工作机制

    示例

      通过上面的分析,对于一些重要的函数有了一个整体的认识,下面通过一个示例,看看这些函数之间是如何串联起来的,并且分析分ThreadPoolExecutor的工作机制。

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class FixedThreadPoolDemo {
        public static void main(String[] args) throws InterruptedException {
            ExecutorService service = Executors.newFixedThreadPool(2);
            MyRunnable mr1 = new MyRunnable(10, "mr1");
            MyRunnable mr2 = new MyRunnable(5, "mr2");
            MyRunnable mr3 = new MyRunnable(10, "mr3");
            
            service.submit(mr1);
            service.submit(mr2);
            service.submit(mr3);
            
            service.shutdown();
        }
        
        static class MyRunnable implements Runnable {
            private int count;
            private String name;
            
            public MyRunnable(int count, String name) {
                this.count = count;
                this.name = name;
            }
            
            public void run() {
                for (int i = 0; i < count; i++) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(name);
                }
            }
        }
    }

    运行结果

    mr1
    mr2
    mr2
    mr1
    mr2
    mr1
    mr1
    mr2
    mr2
    mr1
    mr3
    mr1
    mr3
    mr1
    mr3
    mr1
    mr1
    mr3
    mr3
    mr1
    mr3
    mr3
    mr3
    mr3
    mr3

      说明:在程序中,使用了一个FixedThreadPool线程池(即corePoolSize与maximumPoolSize相等,且为2),之后在线程池提交了3个线程(Runnalbe对象),之后调用了shutdown来关闭线程池。

     ① 执行es.submit(mr1),其主要的函数调用如下

      说明:在调用了es.submit(mr1)后,最终线程池中会新建一个worker,并且此时workQueue阻塞队列为空(没有元素),并且值得注意的是,在runWorker函数中,有一个while循环,当某个任务完成后,会从workQueue阻塞队列中取下一个任务。

      ② 执行es.submit(mr2),其主要的函数调用与执行es.submit(mr1)相同,但是此时的线程池状态有所不同,其状态如下

      说明:此时,线程池会有两个worker,两个worker会分别封装mr1和mr2,并且workQueue阻塞队列还是为空(没有元素)。

      ③ 执行es.submit(mr3),其主要的函数调用如下

      说明:此时,由于线程池的worker的数量已经达到了corePoolSize大小,所以,此时会将mr3放入到workQueue阻塞队列中,此时,线程池还是只有两个worker,并且阻塞队列已经存在一个mr3元素。

      ④ mr2定义的逻辑运行完成,则会从workQueue中取下一个任务(mr3)。主要的函数调用如下(从runWorker开始)

      说明:此时,会运行用户再mr3中自定义的逻辑。此时,线程池中还是有两个worker,并且workQueue的大小为0,没有元素。

      ⑤ mr1定义的逻辑运行完成,则还是会从workQueue中取下一个任务(null)。主要的函数调用如下(从runWorker开始)

      说明:此时,由于是阻塞队列,并且队列中没有元素,所以调用take会使当前线程(worker对应的Thread)被阻塞。

      ⑥ mr3定义的逻辑运行完成,其过程和mr1完成时相同,会使另外一个worker对应的Thread被阻塞。

      ⑦ 执行es.shutdown,则主要的函数调用如下

      说明:在执行shutdown后,会中断两个worker对应的Thread线程。由于中断了worker对应的Thread线程,则之前由于take操作(响应中断)而阻塞也会被中断。

      ⑧ 其中一个worker对应的线程响应中断,从getTask函数开始(因为在getTask中被阻塞)。

      说明:此时,在getTask函数中,会将workerCount的值减一,并且返回null。接着在runWorker函数中退出while循环,并进入processWorkerExit函数进行worker退出线程池的处理,之后会再次调用addWorker,但是此时,不会添加成功。此时,线程池只有一个worker,并且workQueue的大小还是为0。

      ⑨ 另外一个worker对应的线程响应中断,从getTask函数开始(因为在getTask中被阻塞)。与上一个worker的处理过程相同,不再累赘。线程池的状态如下

      说明:之后整个程序就运行结束了,最后的状态为workQueue阻塞队列大小为0,线程池没有worker,workerCount为0。

      最后,给出ThreadPoolExecutor的示意图

      说明:用户自定义的任务会进入阻塞队列或者直接进入线程池(进入线程池后,新建线程直接运行),worker会从阻塞队列中不断的取任务,直到阻塞队列中没有任务。

      关于ThreadPoolExecutor还有如下几点需要注意的

      ① corePoolSize,表示核心大小,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。

      ② maxPoolSzie,表示阻塞队列的大小,如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当阻塞队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池(如本例的FixThreadPool)。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。

      ③ largestPoolSize,表示曾经同时存在在线程池的worker的大小,为workers集合(维护worker)的大小。

      ④ 关于shutdown函数和shutdownNow函数的区别,shutdown会设置线程池的运行状态为SHUTDOWN,并且中断所有空闲的worker,由于worker运行时会进行相应的检查,所以之后会退出线程池,并且其会继续运行之前提交到阻塞队列中的任务,不再接受新任务。而shutdownNow则会设置线程池的运行状态为STOP,并且中断所有的线程(包括空闲和正在运行的线程),在阻塞队列中的任务将不会被运行,并且会将其转化为List<Runnable>返回给调用者,也不再接受新任务,其不会停止用户任务(只是发出了中断信号),若需要停止,需要用户自定义停止逻辑。

     相关图谱

     

     参考: https://www.baidu.com/link?url=15vsGagw5xe17Hm0yjB0icWgT8TnMcdHo1oY1WeqrP4kyaWAoBpR4641RFvCwg10GQz9qgE4C3BS9y9tc5z59K&wd=&eqid=aa55d5fa00011996000000025e660100

  • 相关阅读:
    Raid卡在Write back 与Write through 时的性能差异
    mysql 的outfile以及infile 语法简单备份恢复表
    @SneakyThrows
    java中的mmap实现--转
    以ATT&CK为例构建网络安全知识图
    横向移动攻击点与识别
    Tomcat开启JMX监控
    mysql serverTimezone
    自增还是UUID?数据库主键的类型选择,为啥不能用uuid做MySQL的主键?
    数据库:查询结果中增加数据库不存在的字段的方法
  • 原文地址:https://www.cnblogs.com/FondWang/p/12449299.html
Copyright © 2011-2022 走看看