zoukankan      html  css  js  c++  java
  • 含源码解析,深入Java 线程池原理

    从池化技术到底层实现,一篇文章带你贯通线程池技术。

    1、池化技术简介

    在系统开发过程中,我们经常会用到池化技术来减少系统消耗,提升系统性能。
    在编程领域,比较典型的池化技术有:
    线程池、连接池、内存池、对象池等。

    对象池通过复用对象来减少创建对象、垃圾回收的开销;连接池(数据库连接池、Redis连接池和HTTP连接池等)通过复用TCP连接来减少创建和释放连接的时间。线程池通过复用线程提升性能。简单来说,池化技术就是通过复用来提升性能。

    线程、内存、数据库的连接对象都是资源,在程序中,当你创建一个线程或者在堆上申请一块内存的时候都涉及到很多的系统调用,也是非常消耗CPU的。如果你的程序需要很多类似的工作线程或者需要频繁地申请释放小块内存,在没有对这方面进行优化的情况下,这部分代码很可能会成为影响你整个程序性能的瓶颈。

    如果每次都是如此的创建线程->执行任务->销毁线程,会造成很大的性能开销。复用已创建好的线程可以提高系统的性能,借助池化技术的思想,通过预先创建好多个线程,放在池中,这样可以在需要使用线程的时候直接获取,避免多次重复创建、销毁带来的开销。

    (1)线程池的优点

    • 线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用。
    • 可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃。

    (2)线程池的风险

    虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。

    • 死锁

    任何多线程应用程序都有死锁风险。当一组进程或线程中的每一个都在等待一个只有该组中另一个进程才能引起的事件时,我们就说这组进程或线程 死锁了。死锁的最简单情形是:线程 A 持有对象 X 的独占锁,并且在等待对象 Y 的锁,而线程 B 持有对象 Y 的独占锁,却在等待对象 X 的锁。除非有某种方法来打破对锁的等待(Java 锁定不支持这种方法),否则死锁的线程将永远等下去。

    • 资源不足

    线程池的一个优点在于:相对于其它替代调度机制(有些我们已经讨论过)而言,它们通常执行得很好。但只有恰当地调整了线程池大小时才是这样的。

    线程消耗包括内存和其它系统资源在内的大量资源。除了
    Thread 对象所需的内存之外,每个线程都需要两个可能很大的执行调用堆栈。除此以外,JVM 可能会为每个 Java
    线程创建一个本机线程,这些本机线程将消耗额外的系统资源。最后,虽然线程之间切换的调度开销很小,但如果有很多线程,环境切换也可能严重地影响程序的性能。

    如果线程池太大,那么被那些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,而且使用超出比您实际需要的线程可能会引起资源匮乏问题,因为池线程正在消耗一些资源,而这些资源可能会被其它任务更有效地利用。

    除了线程自身所使用的资源以外,服务请求时所做的工作可能需要其它资源,例如 JDBC 连接、套接字或文件,这些也都是有限资源,有太多的并发请求也可能引起失效,例如不能分配 JDBC 连接。

    • 并发错误

    线程池和其它排队机制依靠使用
    wait() 和 notify()
    方法,这两个方法都难于使用。如果编码不正确,那么可能丢失通知,导致线程保持空闲状态,尽管队列中有工作要处理。使用这些方法时,必须格外小心;即便是专家也可能在它们上面出错。而最好使用现有的、已经知道能工作的实现,例如在
    util.concurrent 包。

    • 线程泄漏

    各种类型的线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种情况。发生线程泄漏的一种情形出现在任务抛出一个 RuntimeException 或一个 Error 时。

    如果池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减少一个。当这种情况发生的次数足够多时,线程池最终就为空,而且系统将停止,因为没有可用的线程来处理任务。

    • 请求过载

    仅仅是请求就压垮了服务器,这种情况是可能的。在这种情形下,我们可能不想将每个到来的请求都排队到我们的工作队列,因为排在队列中等待执行的任务可能会消耗太多的系统资源并引起资源缺乏。在这种情形下决定如何做取决于您自己;在某些情况下,您可以简单地抛弃请求,依靠更高级别的协议稍后重试请求,您也可以用一个指出服务器暂时很忙的响应来拒绝请求。

    2、 如何配置线程池大小配置

    一般需要根据任务的类型来配置线程池大小:

    • 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
    • 如果是IO密集型任务,参考值可以设置为2*NCPU

    当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

    3、线程池的底层原理

    (1)线程池的状态

    线程池和线程一样拥有自己的状态,在ThreadPoolExecutor类中定义了一个volatile变量runState来表示线程池的状态,线程池有四种状态,分别为RUNNING、SHURDOWN、STOP、TERMINATED。

    • 线程池创建后处于RUNNING状态。
    • 调用shutdown后处于SHUTDOWN状态,线程池不能接受新的任务,会等待缓冲队列的任务完成。
    • 调用shutdownNow后处于STOP状态,线程池不能接受新的任务,并尝试终止正在执行的任务。
    • 当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    其中ctl这个AtomicInteger的功能很强大,其高3位用于维护线程池运行状态,低29位维护线程池中线程数量

    RUNNING:-1<<COUNT_BITS,即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务

    SHUTDOWN:0<<COUNT_BITS,即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务

    STOP:1<<COUNT_BITS,即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务

    TIDYING:2<<COUNT_BITS,即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法

    TERMINATED:3<<COUNT_BITS,即高3位为100,低29位为0,terminated()方法调用完成后变成此状态

    这些状态均由int型表示,大小关系为 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这个顺序基本上也是遵循线程池从 运行 到 终止这个过程。

    • runStateOf(int c) 方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态

    • workerCountOf(int c)方法:c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量

    • ctlOf(int rs, int wc)方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl

    (2)为什么ctl负责两种角色

    在Doug Lea的设计中,ctl负责两种角色可以避免多余的同步逻辑。

    很多人会想,一个变量表示两个值,就节省了存储空间,但是这里很显然不是为了节省空间而设计的,即使将这辆个值拆分成两个Integer值,一个线程池也就多了4个字节而已,为了这4个字节而去大费周章地设计一通,显然不是Doug Lea的初衷。

    在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个改而另一个没有改的情况,如果将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的。

    (3)线程池工作流程

    预先启动一些线程,线程无限循环从任务队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行某个任务发生异常而终止,那么重新创建一个新的线程而已,如此反复。

    一个任务从提交到执行完毕经历过程如下:

    第一步:如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;

    第二步:如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

    第三步:如果线程池中的线程数量大于等于corePoolSize,且队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务

    第四步:如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

    流程图如下:

    4、ThreadPoolExecutor解析

    ThreadPoolExecutor继承自AbstractExecutorService,同时实现了ExecutorService接口,也是Executor框架默认的线程池实现类,一般我们使用线程池,如没有特殊要求,直接创建ThreadPoolExecutor,初始化一个线程池,如果需要特殊的线程池,则直接继承ThreadPoolExecutor,并实现特定的功能,如ScheduledThreadPoolExecutor,它是一个具有定时执行任务的线程池。

    (1)Executor框架

    在深入源码之前先来看看J.U.C包中的线程池类图:

    它们的最顶层是一个Executor接口,它只有一个方法:

    public interface Executor {
        void execute(Runnable command);
    }
    
    

    它提供了一个运行新任务的简单方法,Java线程池也称之为Executor框架。

    ExecutorService扩展了Executor,添加了操控线程池生命周期的方法,如shutDown(),shutDownNow()等,以及扩展了可异步跟踪执行任务生成返回值Future的方法,如submit()等方法。

    (2)Worker解析

    Worker类继承了AQS,并实现了Runnable接口,它有两个重要的成员变量:firstTask和thread。firstTask用于保存第一次新建的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。

     private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁。添加步骤做完后就启动线程来执行任务了。

     /**
         * Set containing all worker threads in pool. Accessed only when
         * holding mainLock.
         */
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
    

    (3)如何在线程池中添加任务

    线程池要执行任务,那么必须先添加任务,execute()虽说是执行任务的意思,但里面也包含了添加任务的步骤在里面,下面源码:

    public void execute(Runnable command) {
      // 如果添加订单任务为空,则空指针异常
      if (command == null)
        throw new NullPointerException();
      // 获取ctl值
      int c = ctl.get();
      // 1.如果当前有效线程数小于核心线程数,调用addWorker执行任务(即创建一条线程执行该任务)
      if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
          return;
        c = ctl.get();
      }
      // 2.如果当前有效线程大于等于核心线程数,并且当前线程池状态为运行状态,则将任务添加到阻塞队列中,等待空闲线程取出队列执行
      if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
          reject(command);
        else if (workerCountOf(recheck) == 0)
          addWorker(null, false);
      }
      // 3.如果阻塞队列已满,则调用addWorker执行任务(即创建一条线程执行该任务)
      else if (!addWorker(command, false))
        // 如果创建线程失败,则调用线程拒绝策略
        reject(command);
    }
    

    addWorker添加任务,方法源码有点长,按照逻辑拆分成两部分讲解:

    java.util.concurrent.ThreadPoolExecutor#addWorker:

    retry:
    for (;;) {
      int c = ctl.get();
      // 获取线程池当前运行状态
      int rs = runStateOf(c);
    
      // 如果rs大于SHUTDOWN,则说明此时线程池不在接受新任务了
      // 如果rs等于SHUTDOWN,同时满足firstTask为空,且阻塞队列如果有任务,则继续执行任务
      // 也就说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了
      if (rs >= SHUTDOWN &&
          ! (rs == SHUTDOWN &&
             firstTask == null &&
             ! workQueue.isEmpty()))
        return false;
    
      for (;;) {
        // 获取有效线程数量
        int wc = workerCountOf(c);
        // 如果有效线程数大于等于线程池所容纳的最大线程数(基本不可能发生),不能添加任务
        // 或者有效线程数大于等于当前限制的线程数,也不能添加任务
        // 限制线程数量有任务是否要核心线程执行决定,core=true使用核心线程执行任务
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
          return false;
        // 使用AQS增加有效线程数量
        if (compareAndIncrementWorkerCount(c))
          break retry;
        // 如果再次获取ctl变量值
        c = ctl.get();  // Re-read ctl
        // 再次对比运行状态,如果不一致,再次循环执行
        if (runStateOf(c) != rs)
          continue retry;
        // else CAS failed due to workerCount change; retry inner loop
      }
    }
    
    

    这里特别强调,firstTask是开启线程执行的首个任务,之后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,需要注意。

    以上for循环代码主要作用是判断ctl变量当前的状态是否可以添加任务,特别说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了;同时增加工作线程数量使用了AQS作同步,如果同步失败,则继续循环执行。

    // 任务是否已执行
    boolean workerStarted = false;
    // 任务是否已添加
    boolean workerAdded = false;
    // 任务包装类,我们的任务都需要添加到Worker中
    Worker w = null;
    try {
      // 创建一个Worker
      w = new Worker(firstTask);
      // 获取Worker中的Thread值
      final Thread t = w.thread;
      if (t != null) {
        // 操作workers HashSet 数据结构需要同步加锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          // Recheck while holding lock.
          // Back out on ThreadFactory failure or if
          // shut down before lock acquired.
          // 获取当前线程池的运行状态
          int rs = runStateOf(ctl.get());
          // rs < SHUTDOWN表示是RUNNING状态;
          // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
          // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
          // rs是RUNNING状态时,直接创建线程执行任务
          // 当rs等于SHUTDOWN时,并且firstTask为空,也可以创建线程执行任务,也说说明了SHUTDOWN状态时不再接受新任务
          if (rs < SHUTDOWN ||
              (rs == SHUTDOWN && firstTask == null)) {
            if (t.isAlive()) // precheck that t is startable
              throw new IllegalThreadStateException();
            workers.add(w);
            int s = workers.size();
            if (s > largestPoolSize)
              largestPoolSize = s;
            workerAdded = true;
          }
        } finally {
          mainLock.unlock();
        }
        // 启动线程执行任务
        if (workerAdded) {
          t.start();
          workerStarted = true;
        }
      }
    } finally {
      if (! workerStarted)
        addWorkerFailed(w);
    }
    return workerStarted;
    }
    
    

    以上源码主要的作用是创建一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中,这里需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁。添加步骤做完后就启动线程来执行任务了,继续往下看。

    (4)前置和后置钩子

    如果需要在任务执行前后插入逻辑,你可以实现ThreadPoolExecutor以下两个方法:

    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
    

    这样一来,就可以对任务的执行进行实时监控。

    5、线程池总结

    线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池)

    所谓线程池本质是一个Worker对象的hashSet,多余的任务会放在阻塞队列中,只有当阻塞队列满了后,才会触发非核心线程的创建,非核心线程只是临时过来打杂的,直到空闲,然后自己关闭。
    线程池提供了两个钩子(beforeExecute,afterExecute)给我们,我们继承线程池,在执行任务前后做一些事情。

  • 相关阅读:
    C#调用Windows Api播放midi音频
    AutoCAD.net利用Xaml创建Ribbon界面
    WCF 不支持泛型协议 及 通过父类给子类赋值 通过反射加工
    windows右键菜单自动打包发布nuget,没有CI/CD一样方便!
    体验用yarp当网关
    .Net5 中使用Mediatr 中介者模式下的CQRS
    Vue-Router 路由属性解析
    Vue 3.0+Vite 2.0+Vue Router 4.0.6+Vuex 4.0.0+TypeScript +Yarn
    程序设计语言与语言处理程序基础.md
    Visual Studio 2019 舒适性设置
  • 原文地址:https://www.cnblogs.com/binyue/p/12273165.html
Copyright © 2011-2022 走看看