zoukankan      html  css  js  c++  java
  • 线程池原理

    一、线程池的作用

    线程池类似于数据库链接池、Redis链接池等池化技术。池化技术的优点如下:

    1. 统一管理资源,线程是操作系统一个重要监控管理指标,过多的线程会导致占用内存、上下文切换频繁等问题,所以需要管理起来线程,而每处都用new Thread()方法来创建线程,那线程资源散落在应用程序各地,没法管理。

    2. 不需要每次要用到线程时都再次创建一个新的线程,可以做到线程重用。线程池默认初始化时是没有创建线程的(也可以在创建线程池时自动创建好核心线程),线程池里的线程的初始化与其他线程一样,但是在完成任务以后,该线程不会自行销毁,而是以挂起的状态返回到线程池。直到应用程序再次向线程池发出请求时,线程池里挂起的线程就会再度激活执行任务。这样既节省了建立线程所造成的性能损耗,也可以让多个任务反复重用同一线程,从而在应用程序生存期内节约大量开销。

    二、Java中提供的创建线程池的API

    为了方便大家对于线程池的使用,在 Executors 里面提供了几个线程池的工厂方法,这样很多新手就不需要了解太多关于 ThreadPoolExecutor 的知识了,他们只需要直接使用 Executors 的工厂方法,就可以使用线程池。但是作为有目标的青年,还是要了解下里面的概念和坑。

    先来解释一下每个参数的作用,稍后我们在分析源码的过程中再来详细了解参数的意义。 

    public ThreadPoolExecutor(int corePoolSize, // 核心线程数
                              int maximumPoolSize,// 最大线程数
                              long keepAliveTime,// 非核心线程数空闲时,回收时长
                              TimeUnit unit,// 回收时长单位
                              BlockingQueue<Runnable> workQueue,// 阻塞队列
                              RejectedExecutionHandler handler/** 拒绝策略*/) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), handler);
    }
    1. FixedThreadPool:
    核心线程数=最大线程数,阻塞队列用的是LinkedBlockingQueue(且默认队列长度是Integer.MAX_VALUE),这样的话就造成了阻塞队列是无界队列,不会有非核心线程和拒绝策略。这个线程池执行任务的流程如下:

    1. 线程数少于核心线程数(也就是设置的线程数)时,新建线程执行任务;

    2. 线程数等于核心线程数后,将任务加入阻塞队列;

    3. 由于队列容量非常大,所以可以一直添加;

    4. 执行完任务的线程反复去队列中取任务执行;

    用途:FixedThreadPool 用于负载比较大的服务器,为了资源的合理利用,需要限制当前线程数量

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                // 不会有非核心线程,所以回收时间间隔为0
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
    }

    2. CachedThreadPool:核心线程数为0,然后任务进入SynchronousQueue阻塞队列,最后在由非核心线程来处理其余的任务(在60秒内非核心线程处理完后可以继续服用)。(先来的先做,后来的全部找外包干,来多少活就找多少外包,反正老子有的是钱,结果最后老板跑路,发生了著名的OOM事件)

    最大线程数和非核心线程数是Integer.MAX_VALUE,不会有拒绝策略。所有线程执行完任务的线程有 60 秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就被回收。

    它的执行流程如下:
    1. 没有核心线程,直接向 SynchronousQueue 中提交任务
    2. 如果有空闲的非核心线程,就去取出任务执行;如果没有空闲的非核心线程,就新建一个
    3. 执行完任务的非核心线程有 60 秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就被回收 

    /*
     * 核心线程数为0(没有核心线程,直接向 SynchronousQueue 中提交任务),
     * 最大线程数是Integer.MAX_VALUE,不会有拒绝策略,导致大量线程的创建出现 CPU 使用过高或者 OOM 的问题
     * 执行完任务的线程有 60 秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就被回收
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<>());
    }

    3. newSingleThreadExecutor():核心线程数=最大线程数=1,使用无界阻塞队列。(老子就一个人,慢慢做,先来先做,后来的全部去排队)

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>()));
    }

    三、线程池的实现原理分析:

    我们先看下线程池原理分析(FixedThreadPool)。
    举个例子:好比现在有家宾馆,一共有500个床位(核心线程数),200个预约名额(阻塞队列),外加100个临时床位以备不时之需(临时线程数)。好了,现在宾馆开业,先来了300个客人,OK,直接搞定。后来又来了300个客人,这下搞了,其中200个客人直接搞定,还有100个客人呢?我那100个临时床位可是以备不时之需的,先不给他们,这100个客人给我排队预约(进入阻塞队列),后来又TM来了200个客人,这200个客人中有100个我可以让他们去预约排队,那还剩下100个人呢,我只得操家伙拿出压箱底的那100个临时床位来伺候了,那如果后面在来客人怎么办?拒绝策略伺候!!!!!
    这个例子大致意思描述对了,但是里面还是有些不准确的地方,线程池默认初始化时,里面是没有现成的,而例子中宾馆刚开业其实已经准备好床位了。但是这点不影响大家理解,凑活着用呗。

    源码分析:

    ThreadPoolExecutor的execute()方法:

    /**
    * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    * 线程池初始化时是没有创建线程的,线程池里的线程的初始化与其他线程一样,但是在完成任务以后,该线程不会自行销毁,
    * 而是以挂起的状态返回到线程池。直到应用程序再次向线程池发出请求时,线程池里挂起的线程就会再度激活执行任务。
    * 这样既节省了建立线程所造成的性能损耗,也可以让多个任务反复重用同一线程,从而在应用程序生存期内节约大量开销
    *
    * 默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
    * 在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
    * prestartCoreThread():初始化一个核心线程
    * prestartAllCoreThreads():初始化所有核心线程
    * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    */
    public void execute(Runnable command) {
      if (command == null)
        throw new NullPointerException();

      int c = ctl.get();

      // 1.当前池中线程比核心数少,新建一个线程执行任务
      if (workerCountOf(c) < corePoolSize) {
        // 创建新的线程并执行任务,如果成功就返回
        if (addWorker(command, true))
          return;
        c = ctl.get();
      }

      // 2.核心池已满但任务队列未满,将任务添加到队列中
      if (isRunning(c) && workQueue.offer(command)) {
        //重新获取ctl
        int recheck = ctl.get();
        //任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了
        //如果线程池处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务
        if (!isRunning(recheck) && remove(command))
          reject(command);
        //如果之前的线程已被销毁完,新建一个线程
        else if (workerCountOf(recheck) == 0)
          addWorker(null, false);
      /*
      * 如果执行到这里,有两种情况:
      * 1. 线程池已经不是RUNNING状态;
      * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
      */
      } else if (!addWorker(command, false))
        // 如果线程池是非RUNNING状态或者加入阻塞队列失败,则尝试创建新非核心线程(外包)直到maxPoolSize
        // 创建非核心线程失败,则启动拒绝策略
        reject(command);
    }

    其中ctl就是一个AutomicInteger的变量,用于存储线程数量(低29位)和线程池的状态(高3位)。

    线程池状态如下:

    /**
     * 即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
     * 111 0 0000 0000 0000 0000 0000 0000 0000
     * -1 原码:0000 ... 0001 反码:1111 ... 1110 补码:1111 ... 1111
     * 左移操作:后面补 0
     * 111 0 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int RUNNING = -1 << COUNT_BITS;
    /**
     * 即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
     * 000 0 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    /**
     * 即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
     * 001 0 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int STOP = 1 << COUNT_BITS;
    /**
     * 即高3位为010,所有任务都已终止,workerCount为零,过渡到状态TIDYING的线程将运行terminated()钩子方法;
     * 010 0 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int TIDYING = 2 << COUNT_BITS;
    /**
     * 即高3位为011,terminated()方法执行完毕;
     * 011 0 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int TERMINATED = 3 << COUNT_BITS;
    ThreadPoolExecutor的addWorker():
    1)采用循环 CAS 操作来将线程数加 1;
    2)新建一个线程并启用;
    /**
     * firstTask参数用于表示怎么获取线程处理的任务,true为传入的任务,false表示从阻塞队列获取任务
     * core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,
     * false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // 内嵌循环,通过CAS worker + 1
        retry:
        for (; ; ) {
            // 获取当前线程池状态与线程数
            int c = ctl.get();
            // 获取当前线程状态
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            /**
             * 这个if判断
             * 如果线程池处于SHUTDOWN,STOP,TIDYING,TERMINATED的时候,则表示此时不再接收新任务;
             * 接着判断以下3个条件,只要有1个不满足,则返回false:
             * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
             * 2. firsTask为空
             * 3. 阻塞队列不为空
             *
             * 首先考虑rs == SHUTDOWN的情况
             * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
             * 然后,如果firstTask为空,并且workQueue也为空,则返回false,
             * 因为队列中已经没有任务了,不需要再添加线程了
             */
            if (rs >= SHUTDOWN &&
                    !(rs == SHUTDOWN &&
                            // 不在接受新的任务
                            firstTask == null &&
                            // 队列中已经没有任务了,不需要再添加线程了
                            !workQueue.isEmpty()))
                return false;
            // 增加工作线程数
            for (; ; ) {
                // 线程数量
                int wc = workerCountOf(c);
                // 如果当前线程数大于线程最大上限CAPACITY  return false
                // 创建核心线程则与 corePoolSize 比较,否则与 maximumPoolSize 比较
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 尝试增加workerCount,如果成功,则跳出第一个for循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        // 创建一个新的线程并执行
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            // 新建线程,将线程封装成Worker
            w = new Worker(firstTask);
            // 每一个Worker对象都会创建一个线程
            final Thread t = w.thread;
            if (t != null) {
                // 将任务添加到workers Queue中
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    // 线程池状态
                    int rs = runStateOf(c);
    
                    // rs < SHUTDOWN表示是RUNNING状态;
                    if (rs < SHUTDOWN ||
                            // rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                            // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                            (rs == SHUTDOWN && firstTask == null)) {
                        // 当前线程已经启动,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // workers是一个HashSet<Worker>
                        workers.add(w);
                        // 设置最大的池大小largestPoolSize,workerAdded设置为true
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 启动线程
                if (workerAdded) {
                    // 启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 线程启动失败
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    Worker 类说明
    1. 每个worker,都是一条线程,同时里面包含了一个firstTask,即初始化时要被首先执行的任务;2. 最终执行任务的,是 runWorker()方法;

    Worker 类继承了 AQS并实现了 Runnable 接口,注意其中的 firstTask 和 thread 属性:

    firstTask 用它来保存传入的任务;thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程。

    在调用构造方法时,需要传入任务,这里通过 getThreadFactory().newThread(this) 来新建一个线程,newThread 方法传入的参数是 this,因为 Worker 本身继承了 Runnable 接口,所以一个 Worker 对象在启动的时候会调用 Worker 类中的 run 方法。

    /**
     * 1. 如果 task 不为空,则开始执行 task
     * 2. 如果 task 为空则通过 getTask()再去取任务,并赋值给 task;如果取到的 Runnable 不为空则执行该任务
     * 3. 执行完毕后,通过 while 循环继续 getTask()取任务
     * 4. 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕
     */
    final void runWorker(Worker w) {
        // 获取当前线程
        Thread wt = Thread.currentThread();
        // 获取第一个任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 释放锁,运行中断
        w.unlock(); // allow interrupts
        //是否突然完成,如果是由于异常导致的进入finally,那么completedAbruptly==true就是突然完成的
        boolean completedAbruptly = true;
        try {
            // 调用getTask()方法从阻塞队列中获取新任务,如果阻塞队列为空则根据是否超时来判断是否需要阻塞
            while (task != null || (task = getTask()) != null) {
                /**
                 * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
                 * 上锁,不是为了防止并发执行任务,为了在shutdown()时不终止正在运行的 worker
                 * !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
                 */
                w.lock();
                /**
                 * 如果线程池正在停止,那么要保证当前线程是中断状态;
                 * 如果不是的话,则要保证当前线程不是中断状态;
                 */
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        /**
                         * 线程池为 stop 状态时不接受新任务,不执行已经加入任务队列的任务,还中断正在执
                         * 行的任务,所以对于 stop 状态以上是要中断线程的
                         */
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    // 设置中断标志
                    wt.interrupt();
                try {
                    //自定义实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        //自定义实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 完成任务数 + 1
                    w.completedTasks++;
                    // 释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 获取不到任务时,主动回收自己
            // 线程回收的工作是在processWorkerExit方法完成的
            processWorkerExit(w, completedAbruptly);
        }
    }
     
  • 相关阅读:
    微服务-SpringCloud学习系列(二):注册中心Eureka
    Spring Security系列(一)简介
    程序人生(一)--习惯与性格
    JavaEE系列(一)--Filter技术
    JavaEE系列(一)--Servlet技术
    微服务-SpringCloud学习系列(一):认识微服务
    mongoDB安装
    php遍历目录下的文件
    mysql创建视图
    ssh 安全策略
  • 原文地址:https://www.cnblogs.com/panning/p/13060460.html
Copyright © 2011-2022 走看看