zoukankan      html  css  js  c++  java
  • 深入学习线程池原理

       在前面的文章:<<线程池原理初探>>中我们学习了线程池的基本用法和优点,并且从源码层面学习了线程池的内部数据结构以及运行状态表征方法,这是最基础但是又很重要的一环,有了这一步铺垫我们便可以开始进一步的源码学习之旅了。

      本文会从如下几个方面展开:

      工作线程--Worker

      如何提交任务

      如何添加Worker

      worker是如何开始工作的

      线程池如何结束Worker的工作

      终止线程池原理

      总结

    1. 工作线程--Worker

      上文说到,线程池中的工作线程是保存在一个hashSet中,这样说其实并不是很准确,因为线程池中执行任务的基本单元是一个定义在ThreadPoolExecutor中的内部类Worker,继承自AQS,并实现了Runnable接口,其本身就是一个任务,内部封装了驱动其运行的线程,而这个worker才是保存在hashSet中的。我们来看一下:

        private final class Worker extends AbstractQueuedSynchronizer implements Runnable
        {
            /** 这才是真的线程,worker只是一个runnable,需要线程来驱动,而这个线程则是封装在worker中,worker在其自己的run()方法中再去执行队列中的任务 */
            final Thread thread;
            /** 第一个要执行的任务 */
            Runnable firstTask;
            /** 已完成的任务数量 */
            volatile long completedTasks;
    
         // 构造函数 Worker(Runnable firstTask) { setState(
    -1); // 防止在runWorker之前被中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 委托给ThreadPoolExecutor中的runWorker方法 */ public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
    • 在Worker中封装了一个Thread,这个才是真正的线程池帮我们管理的线程。Worker只是一个runnable,需要线程来驱动,而这个线程又是封装在Worker内部,Worker在其自己的run()方法中再去执行队列中的任务;
    • 在Worker中封装的Thread会在Worker的初始化方法中进行赋值,通过线程池内部的ThreadFactory获取一个Thread实例,也就是说所有线程池中的线程都是通过ThreadFactory这一工厂产生的;
    • Worker继承自AQS,实现了一个简单的不可重入互斥锁;
    • 为了防止在线程实际开始执行任务之前被中断,在Worker的初始化方法中直接将锁状态变量state置为-1,在runWorker方法中会将其清除为0;

      这里其实只要重点关注Worker自身是一个任务,它将线程封装起来了,由该线程来驱动Worker的run()方法,然后Worker在其自己的run()方法中不断地从任务队列中获取任务并执行。

      关于Worker,我们先了解这么多就够了,一些更深入的细节还需要结合ThreadPoolExecutor自身的逻辑来理解才更容易弄清楚。

    2. 如何提交任务

       其实ThreadPoolExecutor的execut()方法是一个很好的看源码的入口,因为这也许是我们使用的最多的方法,并且线程池的主要逻辑也在这个方法中。该方法对于用户来说就是向线程池提交任务,至于提交任务之后的逻辑,是否要新建线程、是否将任务加入阻塞队列中、是否要拒绝任务等等,这些对用户都是透明的,这也是我们接下来要重点探索的:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 检查工作线程的数量,低于corePoolsize则添加Worker
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // isRunning()用来检查线程池是否处于运行状态
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次进行防御性检查
            if (!isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 到这里已经意味着已经饱和或者被shutdown了,尝试添加一个非核心worker,如果失败就就直接执行拒绝
        else if (!addWorker(command, false))
            reject(command);
    }

      如上,配合注释更容易理解,总结一下,一共分成3步:

    • 检查工作线程的数量,低于corePoolsize则添加Worker;
    • 判断线程池是否处于运行状态,如果在,判断任务队列是否允许插入,插入成功再次验证线程池是否处于运行状态,如果不在运行状态则移除插入的任务,然后抛出拒绝策略,否则检查存活线程的数量,如果没有线程了,就添加一个Worker;
    • 如果执行到这一步意味着线程池已经饱和或者被shutdown了,尝试添加一个非核心worker,如果失败就就直接执行拒绝;

    3. 如何添加Worker

      接下来我们再来看一下如何添加Worker,这部分逻辑是在addWorker()方法中,这部分主要负责创建新的线程并执行任务:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
         // 如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回
            if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                   firstTask == null &&
                   !workQueue.isEmpty()))
                return false;
            
            for (;;) {
                int wc = workerCountOf(c);
                // 如果当前线程数量太多则直接退出
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 做自旋,如果当前线程数量更新成功则跳出retry执行后面addworker逻辑
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 重新读取ctl,如果线程池状态改变,则从retry重新执行
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 获取线程池主锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 添加线程到workers中(线程池中)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                   // 释放锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 启动新建的线程,此时添加的worker会被驱动执行其run()方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

      主要分为如下5步:

    • 判断是否需要添加worker:
      • 如果线程池的状态值大于SHUTDOWN,则不需添加worker,直接返回false;    
      • 如果线程池的状态值等于SHUTDOWN,此时如果传入的firstTask不为空,则不需要添加worker,则直接返回false;  
      • 如果线程池的状态值等于SHUTDOWN,且传入的firstTask为空,则检查workQueue是否为空,是则不需要添加worker,直接返回false;  
      • 否则代表判断通过,继续执行后面逻辑;  
    • 做自旋,更新创建线程数量;
      • 如果此时线程数量太多(超过ctl能保存的数量或者超过指定的线程池最大线程数量),则直接返回false;  
      • 利用cas更新线程数量(ctl加1),成功则跳出自旋继续后面的操作;  
      • 如果更新失败则检查线程运行状态,如果发生改变则重新开始addWorker,否则继续自旋更新ctl;  
    • 获取线程池主锁,通过ReentrantLock锁保证线程安全,因为workers这个hashSet对于用户来说相当于共享变量,所以这里要加锁;
    • 添加新线程到workers中(一个HashSet),释放锁;
    • 如果添加成功则启动新建的线程;
    • 如果线程启动失败,代表添加也失败了,则执行回退补偿逻辑,在addWorkerFailed()方法中;

    4. Worker是如何工作的

      在addWorker中添加了新的worker之后会启动其封装的线程,该worker也会随之被线程驱动执行(因为worker继承自Runnable)。前面讲Worker的时候我们知道其run()方法中只调用了一个方法,就是定义在ThreadPoolExecutor中的runWorker(),这里才是执行worker的主要工作逻辑:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
       // 因为Worker自身就是一把简单的不可重入互斥锁(听起来好像也不简单。。),这里调用unlock()是为了将state的状态从-1改为0 w.unlock();
    // allow interrupts boolean completedAbruptly = true; try { // 如果是第一次执行任务,或者从队列中能够获取到任务,则执行 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 根据线程池的状态来判断是否需要将当前线程interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行任务开始前钩子函数 beforeExecute(wt, task); Throwable thrown = null; try { // 真正开始执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行任务后钩子函数 afterExecute(task, thrown); } } finally { task = null; // task置空,以便while循环中获取新任务 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

       这是一个被final修饰的方法,不能被重写。总结一下其逻辑(比较复杂):

    • 根据线程池的状态来判断是否需要将当前线程interrupt,如果线程池为stop且当前线程未被interrupt,则interrupt当前线程;反之如果线程池为running或shutdown,则需要确保当前线程未被interrupt(源码里是通过Thread.interrupted()来保证的,因为其可以将中断状态清零),并且再次检查线程池状态是否为stop,如果是则逻辑同上;
    • 如果是第一次执行任务,或者从队列中能够获取到任务,则继续循环执行;
    • 获取锁;
    • 执行任务开始前的钩子函数;
    • 调用task的run方法,真正开始执行任务;
    • 执行任务后钩子函数;    
    • 将task置空,释放锁,完成任务+1;
    • 执行退出worker逻辑,需要将worker从workers中移除;

      好了, 看了不少源码,我们稍微停一下:

    • 用户调用线程池的execute()方法之后,线程池根据情况有三种操作:addWorker、将任务放到阻塞队列中、拒绝;
    • 后面两种操作很简单,addWorker操作会步骤多一些,主要包括:做一些必要的判断、创建新的Worker并将其加入到workers中、将worker跑起来;
    • worker跑起来之后会不断地从阻塞队列中取任务并执行;

      上面的runWorker()方法中我们也看到了,worker跑起来之后取就进入了一个while循环中,不断地取任务并执行,好像没有看到哪里可以退出,那线程池又是如何让worker停下来的呢?我们接着往下看。

    5. 线程池如何结束Worker的工作

       在上面那节的代码中我们可以看到Worker启动之后,一直在一个while()循环中工作,如果退出了这个循环,run()方法也就邻近结束了。所以只要能够让运行中的worker退出自己的while()循环就能结束worker了,那我们就要来看一下while循环中的条件:

    while (task != null || (task = getTask()) != null) {
        。。。  
    }

      有两个条件:

    • task不为空,当work执行了一个任务之后,这个就会被置空,所以第一个条间很多情况下都是false;
    • 从getTask()获取任务,如果返回为空,那么循环条件为false,循环退出;

      这就是线程池关闭线程的开关入口,我们来看一下这个getTast()方法吧:

      private Runnable getTask() {
          boolean timedOut = false; // Did the last poll() time out?
    
          for (;;) {
              int c = ctl.get();
              int rs = runStateOf(c);
    
              // 只在必要的时候才检查任务队列是否为空
              if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                  decrementWorkerCount();
                  return null;
              }
    
              int wc = workerCountOf(c);
    
              // Are workers subject to culling?
              boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
              if ((wc > maximumPoolSize || (timed && timedOut))
                  && (wc > 1 || workQueue.isEmpty())) {
                  if (compareAndDecrementWorkerCount(c))
                      return null;
                  continue;
              }
    
              try {
                  Runnable r = timed ?
                      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                      workQueue.take();
                  if (r != null)
                      return r;
                  timedOut = true;
              } catch (InterruptedException retry) {
                  timedOut = false;
              }
          }
      }
    • 首先要判断运行状态是否大于等于SHUTDOWN(除了RUNNING之外的状态);
    • 如果是,并且满足如下两个条件之一则执行decrementWorkerCount()并返回null,认为这个线程是多余的,需要删除:
      • 线程池运行状态为SHUTDOW,且任务队列为空;  
      • 线程池运行状态大于等于STOP;  
    • 否则继续执行;
    • 如果同时满足如下条件,则利用CAS机制尝试减少运行线程数,成功则返回空,失败则跳到第1步:

      • 要么是运行线程数量大于最大线程数量maximumPoolSize,要么是存在非核心线程或者允许核心线程超时销毁并且已超时;  

      • 运行线程数大于1或任务队列为空;

    • 接着就要从任务队列取任务了:

      • 如果允许超时则调用poll取任务,这个方法会使当前线程阻塞一段指定时间;  

      • 否则调用take()取任务,这个方法会使当前线程一直阻塞,直到获取到任务或者被当前线程被中断;  

    • 如果取出任务则返回,没有的话则将timedOut置为true,标记为已超时(代表核心线程等待时间过长,需要删除),重新进入到步骤1,继续循环执行;

      这里的逻辑比较多,因为有涉及到是否允许核心线程超时,所以需要细细品味。当调用getTask()为拿到任务,就意味着当前线程该做的工作已经完成了,不用再循环取任务执行了,剩下就是执行processWorkerExit()结束工作了。

    6. 终止线程池原理

       现在提交任务、执行任务、以及停止任务的入口,这些逻辑我们都看完了,我们来看一下如何停止线程池。主要有两个方法:shutdown、shutdownNow,从名字我们可以看出区别:

    • shutdown()执行之后线程池会停止接收任务,但是还是会把任务池中的任务执行完再结束;
    • shutdownNow()执行之后线程池不仅会停止接收任务,而且会把任务池中未执行的任务都清空,直接结束;

      我们来看一下具体实现细节:

    6.1 shutdown

        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
           // 修改线程池运行状态为SHUTDOWN advanceRunState(SHUTDOWN);
           // 中断空闲线程 interruptIdleWorkers();
           // 预留的钩子函数 onShutdown();
    // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); }
         // tryTerminate(); }

      逻辑比较清晰:

    • 首先,获取线程池的主锁,只有1个线程可以操作;
    • 权限判断;
    • 利用CAS机制不断尝试修改线程池状态为SHUTDOWN,直到成功为止;
    • 中断空闲线程;
    • 释放锁;
    • 执行tryTerminate();

      先来看一下如何利用CAS修改线程池状态,如下代码是advanceRunState()的实现,可以看到在循环中不断调用原子类ctl的compareAndSet()方法来设置值,这就是利用CAS机制:

        private void advanceRunState(int targetState) {
         // 进入循环
    for (;;) { int c = ctl.get();
           // 如果状态修改成功则退出循环
    if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }

      接下来看一下如何中断空闲线程,也很简单,就是对所有worker进行遍历,判断其是否被中断,如果没有则尝试设置其中断标志。这里只是说了一下基本流程,有些细节没有提到,需要代码中体会:

        private void interruptIdleWorkers() {
            interruptIdleWorkers(false);
        }
    
        private void interruptIdleWorkers(boolean onlyOne) {
         // 获取线程池的锁,并上锁
    final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {
           // 遍历workers
    for (Worker w : workers) { Thread t = w.thread;
              // 判断worker封装的Thread实例是否被中断,如果没有则尝试获取worker自己的锁
    if (!t.isInterrupted() && w.tryLock()) { try {
                   // 设置中断状态 t.interrupt(); }
    catch (SecurityException ignore) { } finally { w.unlock(); } }
              // 根据传入的参数,只中断一个worker
    if (onlyOne) break; } } finally { mainLock.unlock(); } }

       最后我们再来看一下tryTerminate()的逻辑,配合代码看效果会更好,这里简单说一下,首先会有几轮判断,是否需要执行terminate(),接着会利用CAS机制尝试修改线程池状态为TIDYING,成功则执行terminate(),失败则循环执行:

        final void tryTerminate() {
            for (;;) {
                int c = ctl.get(); 
           /**
            * 满足如下两个条件则直接返回
            * 1. 线程池当前状态为RUNNING、TIDYING、TERMINATED
            * 2. 线程池当前状态为SHUTDOWN且任务队列不为空,那还要继续将队列中的任务执行完才能结束
            **/
    if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return;
           // 到这里代表线程池的状态为SHUTDOWN或STOP,如果还有存活线程,则尝试中断一个并返回
    if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {
              // 尝试将线程池状态修改为TIDYING,修改成功则执行terminated(),如果没有则继续循环执行
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally {
                   // 执行完terminated()之后需要确保将线程池状态修改为TERMINATED ctl.set(ctlOf(TERMINATED,
    0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }

      其实shutdown()最终还是通过设置工作线程的中断状态来实现结束中断线程的,关于这种方式我们前面也是专门写过一篇文章的:<<线程间通信>>。具体是如何结束的,在线程执行的过程中会不断的调用getTask()从任务队列获取任务,在getTask()中会对中断状态进行监控,一旦发现之后会根据具体逻辑执行对应操作,具体参考getTask()的代码。

    6.2 shutdownNow

       看完shutdown()的我们再来看一下shutdownNow()的逻辑:

        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(STOP);
                interruptWorkers();
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }

      基本流程和shutdown()类似,advanceRunState()和tryTerminate()是一样的,我们就不再赘述了,重点来看一下interruptWorkers()的逻辑:

        private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    w.interruptIfStarted();
            } finally {
                mainLock.unlock();
            }
        }

      这样看很简单,就是遍历所有worker,调用其interruptIfStarted()方法,这个方法实现在Worker中,我们来看一下这个方法,也比较清晰,就是判断一下再决定是否设置线程中断标志位,可见,其和shutdown停止线程的方式是一样的,区别主要在于设置线程状态的不同以及将任务队列中的任务丢弃,即drainQueue()方法:

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    
        private List<Runnable> drainQueue() {
            BlockingQueue<Runnable> q = workQueue;
            ArrayList<Runnable> taskList = new ArrayList<Runnable>();
            // 将阻塞队列中的任务全部移除并添加到taskList中
            q.drainTo(taskList);
            // 再检查一次队列是否有任务
            if (!q.isEmpty()) {
                for (Runnable r : q.toArray(new Runnable[0])) {
                    if (q.remove(r))
                        taskList.add(r);
                }
            }
            return taskList;
        }

    7. 总结

    • 线程池中通过阻塞队列来保存接收用户提交的任务;
    • 线程池的基本工作单元为Worker,为实现在ThreadPoolExecutor中的内部类,继承了Runnable,它封装了驱动自己运行的线程,工作时worker会不断从任务队列中获取任务并执行;
    • 线程池内部通过一个HashSet来保存worker,这才是真的“池”;
    • 用户通过调用线程池的execute()将任务提交给线程池,之后由线程池来统一分配线程执行任务;
    • 用户可以调用shutdown()或shutdownNow()来停止线程池;
      • 调用shutdown()之后线程池不再接收新的任务,但是会将任务队列中的任务执行完再逐一将线程销毁并停止线程池;  
      • 调用shutdownNow()之后线程池不仅不再接收新的任务,而且会将任务队列中未执行的任务清空丢弃,并且将待正在运行的线程执行完毕就销毁线程,然后停止线程池;  

      其实呢,在啃线程池源码的过程中,还是要费一些心思的,尤其是要弄明白如何添加任务、如何添加Worker、Worker如果工作以及如何停止Worker的工作这一整套流程,中间确实逻辑比较复杂,但是呢在探索的过程中会不断有新的发现,越啃越细,越啃越清晰。我其实也不是一两天就看明白了,最早只是大概看了一遍,然后做了一些笔记,隔了几个月之后再来看,又有新的收获,所以就有了这篇文章。看到这里说明你也看懂了,恭喜你在学习的路上又有精进了   ^_^

  • 相关阅读:
    如何判断 DataRow 中是否存在某列????
    jquery操作table中的tr,td的方法双击dblclick attr parent id原创
    oracle 取当天日期减一天 应该如何写
    走出“搜索引擎营销”三个误区
    解决方案是什么
    强制远程连接 命令
    ORACLE 异常错误处理
    HttpClient是否有默认并发数限制?
    多线程下载程序的功能需求
    STL线程库简介
  • 原文地址:https://www.cnblogs.com/volcano-liu/p/10783012.html
Copyright © 2011-2022 走看看