zoukankan      html  css  js  c++  java
  • 线程池的原码分析(二)

    一、前言

      上一篇最后讲到了AbstractExecutorService,这一篇接着说ThreadPoolExecutor。

    二、ThreadPoolExecutor

      ThreadPoolExecutor这个类中实现了一系列java线程池管理最核心的方法,再来回顾一下他的属性及使用方法。

    1、属性

      corePoolSize:核心线程数
      maximumPoolSize:线程池最大线程数
      keepAliveTime:空闲线程存活的时间,如果线程空闲了keepAliveTime这么久以后,还是没有任务分配给它,那么线程会被关闭。当然,如果当前线程池中的线程数<corePoolSize,线程是不会因为空闲而关闭的
      workQueue:任务队列,用于存储要执行的任务。要求传入BlockingQueue接口的实现类,如ArrayBlockingQueue等
      threadFactory:用于创建线程的工厂,我们可以通过自定义threadFactory统一为线程设置一些属性,如线程名称等。
      handler:拒绝策略,都是RejectedExecutionHandler 的实现。线程池中已经爆满,但又有新的任务提交过来,这个时候应该执行的策略。

    2、拒绝策略

      AbortPolicy:直接抛出RejectedExecutionException异常
      DiscardPolicy:不做任何处理,直接忽略掉这个任务
      DiscardOldestPolicy:将最早入队的任务移除,再尝试提交任务,不断重试
      CallerRunsPolicy:如果提交任务失败,会由提交任务的这个线程自己来调用execute执行任务

    3、基本流程

      1)线程池刚创建的时候,里面没有任何线程,等到有任务过来的时候才会创建线程。当然也可以调用 prestartAllCoreThreads() 或者 prestartCoreThread() 方法预创建corePoolSize个线程
      2)调用execute()提交一个任务时,如果当前的工作线程数<corePoolSize,直接创建新的线程执行这个任务
      3)如果当时工作线程数量大于等于corePoolSize,会将任务放入任务队列中缓存
      4)如果队列已满,并且线程池中工作线程的数量小于maximumPoolSize,还是会创建线程执行这个任务
      5)如果队列已满,并且线程池中的线程已达到maximumPoolSize,这个时候会执行拒绝策略,JAVA线程池默认的策略是AbortPolicy,即抛出RejectedExecutionException异常

    4、其他的一些属性

    public class ThreadPoolExecutor extends AbstractExecutorService {
        /**
         * 这个ctl就是用来保存 线程池的状态(runState) 和 线程数(workerCount) 的
         * 这里使用AtomicInteger 来保证原子操作
         * 这里的ctl的初始值其实就是-1左移29位,即3个1和29个0, 
         * 111 00000000000000000000000000000
       * 低29位代表线程数(5亿多),高3位代表线程池的状态
    */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // COUNT_BITS值为29,代表着低29位用于存储线程数,高3位用于存储线程池的状态 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程池最大的容量,值为3个 0和29个1。也就是536870911 // 000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 下面这5个值代表线程池的状态,存储在高3位中 // 3个1,29个0 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 全是0 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // ~就是按位取反,CAPACITY按位取反得到111 00000000000000000000000000000, // 再和c按位与,其实就是得到高3位,代表线程池的状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // CAPACITY就是000 11111111111111111111111111111, // 直接按位与,其实就是得到低29位,代表线程池中的线程数 private static int workerCountOf(int c) { return c & CAPACITY; } // 按位或 private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } /** * 任务队列 */ private final BlockingQueue<Runnable> workQueue; /** * JAVA线程池的全局锁 */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 这个HashSet用于存放线程池中所有的工作线程, * 只有在持有全局锁(mainLock)的前提下,才能对这个集合进行存取操作 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * 这个condition是用于支持awaitTermination的 */ private final Condition termination = mainLock.newCondition(); /** * largestPoolSize记录了线程池中线程数曾经达到的最大值 */ private int largestPoolSize; /** * 已完成任务的数量 */ private long completedTaskCount; /** * 线程工厂 */ private volatile ThreadFactory threadFactory; /** * 拒绝策略 */ private volatile RejectedExecutionHandler handler; /** * 空闲线程存活时间 */ private volatile long keepAliveTime; /** * 如果这个参数为true, * 那么核心线程数内的空闲线程 空闲时间超过keepAliveTime后,也可以被回收。 */ private volatile boolean allowCoreThreadTimeOut; /** * 核心线程数 */ private volatile int corePoolSize; /** * 最大线程数 */ private volatile int maximumPoolSize; /** * 默认的拒绝策略为AbortPolicy */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); ...... }

     5、线程池的状态

    RUNNING
      (1)状态说明:能够接收新任务,以及对已添加的任务进行处理。
      (2)状态切换:线程池的初始化状态是RUNNING

    SHUTDOWN
      (1)状态说明:不接收新任务,但能处理已添加的任务。
      (2)状态切换:调用线程池的shutdown()接口时,线程池由RUNNING >> SHUTDOWN

    STOP
      (1)状态说明:不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
      (2)状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) >> STOP。

    TIDYING
      (1)状态说明:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
      (2)状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN >> TIDYING。
    当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP >> TIDYING。

    TERMINATED
      (1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
      (2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING >> TERMINATED。

    这里的RUNNING的值是一个负数,SHUTDOWN值为0,其他状态都大于0,而且它们的值是按照RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 的顺序依次递增的。也就是说状态为负数的时候是线程池最正常的状态

    三、源码分析

      1、Worker

      Worker是ThreadPoolExecutor的一个内部类,java线程池中的线程被包装成了一个个的Worker,代表线程池中的工作线程。

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            //执行任务的线程
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            //每个线程要执行的第一个任务,这个值可以为null,线程自己到BlokingQueue中获取任务
            Runnable firstTask;
            /** Per-thread task counter */
            //记录每个线程已完成的任务数
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
             //这个构造方法传入这个线程第一个要执行的任务,当然也可以传入null
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                // 通过ThreadFactory创建新的线程,注意:这里传入的是this,代表当前的Worker对象。
                // Worker实现了Runnable所以执行任务的时候最终会调用Worker.run()
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            //实现了Run方法,调用runWorker
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //下面这些方法都是Worker对AQS同步控制的实现了,要获取线程的执行权,需要先获取独占锁
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }

    2、ThreadPoolExecutor.execute()

         public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            //获取线程池状态和线程数
            int c = ctl.get();
            // 如果当前线程数小于corePoolSize,那么添加一个Worker来执行任务。
            if (workerCountOf(c) < corePoolSize) {
                //如果添加成功则返回,不成功则再次获取ctl,第二个参数true:对比核心线程数,false:对比最大线程数
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 走到这里有两种情况:
            // 1.当前线程数>=corePoolSize
            // 2.上面addWorker提交任务失败了
            //isRunning判断线程池状态如果处于RUNNING状态,调用workQueue.offer(command)进入等待队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                /*
                * 再次检查是否线程池处于运行状态,如果不是则移除任务并回调饱和策略拒绝任务。
                * 因为有可能上面if条件读到线程池处于运行状态,而后shutdown/shutdownNow方法被调用,
                * 这时候需要把尝试刚才加入任务队列中的任务移除。
                */
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                // 如果线程是处于RUNNING状态,并且当前线程池中的线程数为0,开启一个新的线程
                // 因为有可能任务添加到队列中了,但是却没有线程可执行
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 如果队列已满,执行addWorker尝试创建新的线程,
            // 如果成功,说明当前线程数小于maximumPoolSize。
            // 如果失败,说明当前线程数已达到maximumPoolSize,需要执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
         // 这个方法会创建线程并且执行任务
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                // 这个rs就是线程池的状态
                int rs = runStateOf(c);
                
                // 这里要满足以下要求才直接返回false,不接受任务
                // 1.rs大于等于SHUTDOWN,说明状态不是RUNNING状态
                // 2.并且,!(a&&b&&c)==!a||!b||!c ,rs != SHUTDOWN||firstTask != null||workQueue.isEmpty()
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    // 获取线程池中线程的数量
                    int wc = workerCountOf(c);
                    // 这里传入的core为true代表线程数上限为corePoolSize,
                    // false代表线程数上限为maximumPoolSize,如果线程数超出上限,直接返回false
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 使用CAS对线程计数+1,如果成功,说明已经满足创建线程的条件了
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    // 如果上面的CAS失败,说明有并发,再次获取ctl的值
                    c = ctl.get();  // Re-read ctl
                    // 如果线程池的状态发生了变化,例如线程池已经关闭了,
                    // 导致的CAS失败,那么回到外层的for循环(retry)
                    // 否则,说明是正常的CAS失败,那么再次进入里面的循环
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            
            // 能到这里,说明已经做好创建线程的准备了
            // worker是否已经启动的标志位
            boolean workerStarted = false;
            // workers的HashSet用于存储线程池中的所有线程,
            // 所以这个变量是代表当前worker是否已经存放到workers这个HashSet中
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 传入firstTask这个任务构造一个Worker
                w = new Worker(firstTask);
                // Worker的构造方法中会使用ThreadFactory创建新的线程,
                // 所以这里可以直接获取到对应的线程
                final Thread t = w.thread;
                // 如果创建线程成功
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    // 获取线程池的全局锁,下面涉及线程池的操作都需要在持有全局锁的前提下进行
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        // 获取线程池的状态
                        int rs = runStateOf(ctl.get());
                        // 如果rs<SHUTDOWN,线程池处于RUNNING状态
                        // 或者 线程池处于SHUTDOWN状态并且没有新的任务
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 如果线程已经启动,抛出异常
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // 将包装线程的worker加入到workers这个HashSet中
                            workers.add(w);
                            int s = workers.size();
                            // largestPoolSize记录的是线程池中线程数曾经到达的最大值
                            // 线程池中worker的数量是会变化的,所以记录下worker数的最大值
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                                
                            // 修改标志,代表当前worker已经加入到workers这个HashSet中
                            workerAdded = true;
                        }
                    } finally {
                        // 释放全局锁
                        mainLock.unlock();
                    }
                    // 如果worker添加成功,启动线程执行任务
                    if (workerAdded) {
                        // 启动线程
                        t.start();
                        // 代表worker已经启动
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果线程没有启动,这里还需要进行一些清理工作
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
        // 这个方法做下面几件事:
        // 1.将worker从workers中移除
        // 2.worker的数量-1
        // 3.检查termination
        private void addWorkerFailed(Worker w) {
            // 要操作workers这个HashSet,先获取线程池全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    // 从workers中移除
                    workers.remove(w);
                // WorkerCount -1    
                decrementWorkerCount();
                // 处理TERMINATED状态
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
        // 这个方法其实就是用来处理线程池TERMINATED状态的,
        // 线程池彻底终止就是TERMINATED状态。我们前面说了线程池处在TIDYING状态时,
        // 执行完terminated()之后,就会由 TIDYING -> TERMINATED。
        // 可能会引起线程池终止的操作都需要调用这个方法,
        // 例如减少worker的数量或者在shutdown期间从任务队列移除任务。
        final void tryTerminate() {
            for (;;) {// 循环执行
                int c = ctl.get();
                // 满足以下三个条件之一,直接return:
                // 1.线程池处于RUNNING状态,说明线程池正常
                // 2.线程池已经处于TERMINATED状态,就没必要继续往下走就
                // 3.线程池处于SHUTDOWN状态,并且任务队列不为空,需要执行完任务
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
     
                // 能到这一步说明这个时候线程池需要被终止
     
                // 如果这个时候worker的数量不为0
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    // 关闭一个worker,以确保shutdown 信号的传播
                    // 调用这个方法会终止正在等待获取任务的线程
                    // 我这个线程池都要终止了,居然还有线程在等待获取任务,这当然不行。
                    interruptIdleWorkers(ONLY_ONE);
                    // 然后就直接return了
                    return;
                }
                // 到这一步说明线程池worker和任务都清空了
                final ReentrantLock mainLock = this.mainLock;
                // 获取全局锁
                mainLock.lock();
                try {
                    // CAS尝试将线程状态转换成TIDYING
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            // 如果上面的CAS成功,执行terminated()。
                            // 这也是个钩子方法,留给子类实现的
                            terminated();
                        } finally {
                            // 执行完terminated()之后,最终将线程池状态置为TERMINATED
                            ctl.set(ctlOf(TERMINATED, 0));
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
     
        // 这个方法其实就是用来中断正在等待任务的线程,
        // 注意这里说的中断其实也只是将线程的状态置为“中断”,并不是说线程在这里就真的停止了
        // 如果onlyOne为true,这里最多会关闭一个worker,因为shutdown()方法需要中断所有的worker,
        // 这里中断一个worker能够帮助shutdown迅速的完成,而不用等待一些还在等待任务的worker结束
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            // 获取全局锁
            mainLock.lock();
            try {
                // 遍历所有的Worker,如果传入的onlyOne为true,那最多会终止一个Worker。
                // 如果传入的onlyOne为false,终止所有的Worker
                for (Worker w : workers) {
                    Thread t = w.thread;
                    // 这里要获取到worker的独占锁后才能够中断线程
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }

    总结:ThreadPoolExecutor.execute()方法提交任务,主要有三种情况,

      (1)当前线程数小于核心线程数,调用addWorker添加一个新的Worke,直接返回

      (2)当前线程数大于核心线程数,尝试直接加入等待队列,不创建新的Worker

      (3)队列满了,再次调用addWorker,如果当前线程数小于允许的最大线程数,尝试创建线程

      (4)最后以上都失败了,执行拒绝策略。

    而真正启动线程执行任务的操作就是在addWorker中,也就是t.start,实际上会调用Worker.run(),来看看这个方法。

    3、Worker.run()

        public void run() {
            runWorker(this);
        }
        // 这里就是执行任务的代码了,有一个while循环不断从队列中取出任务并执行,
        // 退出循环的条件是获取不到要执行的任务
        final void runWorker(Worker w) {
            // 当前线程
            Thread wt = Thread.currentThread();
            // new Worker的时候可以指定firstTask,代表Worker的第一个任务
            Runnable task = w.firstTask;
            // 将firstTask置为null了
            w.firstTask = null;
            // 释放Worker的独占锁,这里它释放锁的操作一定会成功,也就是将AQS中state设置为0
            w.unlock(); // allow interrupts
             // completedAbruptly这个标志位代表当前Worker是否因为执行任务出现异常而停止的
            boolean completedAbruptly = true;
            try {
                // while循环;如果firstTask不为null那就直接执行firstTask,
                // 否则就要调用getTask()从队列中获取任务,
                // 也就是说Worker的第一个任务是不需要从队列中获取的
                while (task != null || (task = getTask()) != null) {
                    // 给这个worker上独占锁
                    // Worker加锁的意义在于,在线程池的其他方法中可能会中断Worker,
                    // 为了保证Worker安全的完成任务,必须要在获取到锁的情况下才能中断Worker,
                    // 如tryTerminate(),shutdown()等都会关闭worker。
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    // 如果ctl的值大于等于STOP,说明线程池的状态是STOP,TIDYING或TERMINATED。
                    // 这个时候需要确保该线程已中断,否则就应该确保线程没有中断
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                            // 设置线程的中断状态
                        wt.interrupt();
                    try {
                        // 这个方法留给子类去实现的
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            // 真正执行任务的地方
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            // 后置处理,留给子类去实现的
                            afterExecute(task, thrown);
                        }
                    } finally {
                        // 最后将task置为null,准备接受下一个任务
                        task = null;
                        // 这个worker已完成任务数+1
                        w.completedTasks++;
                        // 释放独占锁
                        w.unlock();
                    }
                }
                // 到这一步说明没抛出异常
                completedAbruptly = false;
            } finally {
                // 执行到这里说明:要么队列中已经没有任务了,要么执行任务出现了异常。
                // 这个时候需要调用processWorkerExit关闭线程
                processWorkerExit(w, completedAbruptly);
            }
        }
        // 这个方法就是从队列中获取任务,返回null代表线程需要被关闭。一共有以下三种可能:
        // 1.阻塞获取任务直到获取成功
        // 2.获取任务超时了,也就说线程空闲了keepAliveTime这么久了,还是没有获取到任务,
        //   这个时候线程需要被关闭(这里有个前提就是线程数要大于corePoolSize)
        // 3.如果出现下面几种情况返回null,返回null说明线程需要被关闭
        //   池中worker的数量大于maximumPoolSize(由于调用setMaximumPoolSize进行了设置)
        //   线程池处于STOP状态,这个时候不能执行任务队列中的任务
        //   线程池处于SHUTDOWN状态,但是任务队列是空的
        private Runnable getTask() {
            //// 最后一次的poll操作是否超时
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                // 获取线程池的状态
                int rs = runStateOf(c);
    
                // 如果线程池的状态大于等于SHUTDOWN 并且(rs >= STOP或者任务队列为空)
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                     // 使用CAS对workerCount-1
                    decrementWorkerCount();
                    return null;
                }
                // 获取线程池中的线程数
                int wc = workerCountOf(c);
    
                // allowCoreThreadTimeOut为true,则线程池数量最后销毁到0个
                // allowCoreThreadTimeOut为false,销毁机制:超过核心线程数时,而且(超过最大值或者timeout过),就会销毁。
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                // 如果当前线程数大于maximumPoolSize,或者超时
                // 注意:如果开发者调用了setMaximumPoolSize() 将maximumPoolSize变小了,
                // 就有可能出现当前线程数大于maximumPoolSize。
                // 这个时候多余的线程肯定是获取不到任务的,需要被关闭
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
                // 到这里,说明线程数小于maximumPoolSize等于且没有超时
                try {
                    // 从任务队列中取出任务
                    // 如果timed为true,调用带超时的poll方法,否则执行take方法阻塞获取任务。
                    // timed这个变量的值取决于allowCoreThreadTimeOut || wc > corePoolSize
                    // 其实这里说的是,如果线程池中线程数量在corePoolSize以内,
                    // 且不支持回收核心线程数内的线程,这个时候线程池中的线程是不会被回收的。
                    // 所以调用take方法阻塞获取任务,直到获取成功。
                    // 否则的话,线程隔了keepAliveTime这么久还是获取不到任务,是需要被回收的
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    // 如果成功获取到任务,返回这个runnable任务,
                    // 否则就是超时了,再进入下一轮循环的时候返回null
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
        // 这个方法就是清理Worker的,
        // 如果Worker执行任务时出现了异常,那么workerCount是没处理的,还需要把workerCount减回去
         private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                 // 将worker从线程池中移除
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
            // 处理TERMINATED状态
            tryTerminate();
    
            int c = ctl.get();
            // 如果线程池状态是RUNNING或者SHUTDOWN
            if (runStateLessThan(c, STOP)) {
                // 并且不是因为执行任务出现异常而进入到这个方法
                if (!completedAbruptly) {
                    // 如果allowedCoreThreadTimeOut为true,最小值为0,否则最小值为corePoolSize
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    // 如果允许回收核心线程数内的线程,并且任务队列不为空,至少还需要1个线程
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    // 如果线程数大于等于所需的最小线程数,这个方法就结束了
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                addWorker(null, false);
            }
        }

    4、shutdown()方法

        // 关闭线程池,这个方法会中断没有执行任务的线程
        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                // 采用CAS自璇的方式将线程池状态设置为SHUTDOWN
                advanceRunState(SHUTDOWN);
                // 中断所有没有执行任务的线程
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            // 线程池关闭处理
            tryTerminate();
        }
        
        private void interruptIdleWorkers() {
            // 这里调用的是interruptIdleWorkers(false),
            // 前面说了这个方法传入true,代表中断一个没有执行任务的线程。
            // 这里是false,说明是中断所有没有执行任务的线程
            interruptIdleWorkers(false);
        }
        
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    // 这里要获取到worker的独占锁后才能够中断线程
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
        
        // 关闭线程池。这个方法会中断所有的线程,不管线程是否正在执行任务
        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                // 采用CAS自旋的方式将线程池状态设置为STOP
                advanceRunState(STOP);
                // 中断所有线程
                interruptWorkers();
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }
        
        private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    w.interruptIfStarted();
            } finally {
                mainLock.unlock();
            }
        }

    总结:线程在拿到任务的时候开始执行的时候,是会获取Worker的独占锁的。
    shutdown()方法中断worker会先调用Worker.tryLock()获取独占锁,如果线程正在执行任务,那就获取不到独占锁,也就无法中断线程。
    而shutdownNow()方法是直接中断所有线程。
    除此之外,还有个不同点在于shutdown会先将线程池状态设置为SHUTDOWN,而shutdownNow是将线程池状态设置为STOP
    不管是shutdown、shutdownNow或者前面的tryTerminate,它们所谓的中断线程,都只是调用Thread.interrupt()方法给线程设置interrupt标记,所以只有响应中断的任务在interrupt()以后才会终止,如BlockingQueue.take()方法。

    四、总结

    1、线程池什么时候会执行拒绝策略?

      (1)任务成功入队,但是线程池关闭了,并且线程池并没有移除掉这个任务,这个时候执行拒绝策。也就是说任务入队和线程池关闭并发执行了

      (2)当前线程数达到maximumPoolSize

    2、线程池中一些属性的作用? 

      (1)corePoolSize:核心线程数
      (2)maximumPoolSize:线程池最大线程数
      (3)keepAliveTime:空闲线程存活的时间,如果线程空闲了keepAliveTime这么久以后,还是没有任务分配给它,那么线程会被关闭。当然,如果当前线程池中的线程数<corePoolSize,线程是不会因为空闲而关闭的
      (4)workQueue:任务队列,用于存储要执行的任务。要求传入BlockingQueue接口的实现类,如ArrayBlockingQueue等
      (5)threadFactory:用于创建线程的工厂,我们可以通过自定义threadFactory统一为线程设置一些属性,如线程名称等。
      (6)handler:拒绝策略,都是RejectedExecutionHandler 的实现。线程池中已经爆满,但又有新的任务提交过来,这个时候应该执行的策略

    3、java线程池创建线程的流程?

      (1)线程池刚创建的时候,里面没有任何线程,等到有任务过来的时候才会创建线程。当然也可以调用 prestartAllCoreThreads() 或者 prestartCoreThread() 方法预创建corePoolSize个线程
      (2)调用execute()提交一个任务时,如果当前的工作线程数小于corePoolSize,直接创建新的线程执行这个任务
      (3)如果当时工作线程数量大于等于corePoolSize,会将任务放入任务队列中缓存
      (4)如果队列已满,并且线程池中工作线程的数量小于maximumPoolSize,还是会创建线程执行这个任务
      (5)如果队列已满,并且线程池中的线程已达到maximumPoolSize,这个时候会执行拒绝策略,JAVA线程池默认的策略是AbortPolicy,即抛出RejectedExecutionException异常

    4、线程池是怎么实现线程复用的?

      在runWorker方法中,一个线程执行完一个任务后会不断从任务队列中取出任务来执行。
      如果队列中已经没有任务了,
      allowCoreThreadTimeOut设置为false并且线程数<=corePoolSize,调用BlokingQueue.take()方法阻塞,直到获取到任务。
      allowCoreThreadTimeOut设置为true或者线程数>corePoolSize,调用BlockingQueue带超时的poll方法尝试获取任务,获取不到的话,这个线程就会被回收掉。

    5、线程执行任务过程中出现异常是怎么处理的?

      如果一个线程执行任务出现异常,那么执行任务的线程会被关闭,而不会继续执行其他任务。最后会启动一个新的线程来取代它。

    借鉴文章:https://blog.csdn.net/Epoch_Elysian/article/details/107282186

  • 相关阅读:
    springmvc的执行流程
    深入理解设计模式(五):抽象工厂模式
    深入理解设计模式(四):工厂方法模式
    深入理解设计模式(三):策略模式
    写给三十岁的自己
    asp.net引用System.Speech实现语音提示
    深入理解设计模式(序):常用的7大设计原则
    深入理解设计模式(二):简单工厂模式
    深入理解设计模式(一):单例模式
    解决基于IIS的.net core HttpWebRequest 连接特别慢
  • 原文地址:https://www.cnblogs.com/sglx/p/15272407.html
Copyright © 2011-2022 走看看