zoukankan      html  css  js  c++  java
  • 并发系列(一)——线程池源码(ThreadPoolExecutor类)简析

    前言

      本文主要是结合源码去线程池执行任务的过程,基于JDK 11,整个过程基本与JDK 8相同。

      个人水平有限,文中若有表达有误的,欢迎大伙留言指出,谢谢了!

    一、线程池简介

      1.1 使用线程池的优点

        1)通过复用已创建的线程,降低资源的消耗(线程的创建/销毁是要消耗资源的)、提高响应速度;

        2)管理线程的个数,线程的个数在初始化线程池的时候指定;

        3)统一管理线程,比如停止,stop()方法;

      1.2 线程池执行任务过程

        线程池执行任务的过程如下图所示,主要分为以下4步,其中参数的含义会在后面详细讲解:

        1)判断工作的线程是否小于核心线程数据(workerCountOf(c) < corePoolSize),若小于则会新建一个线程去执行任务,这一步仅仅的是根据线程个数决定;

        2)若核心线程池满了,就会判断线程池的状态,若是running状态,则尝试加入任务队列,若加入成功后还会做一些事情,后面详细说;

        3)若任务队列满了,则加入失败,此时会判断整个线程池线程是否满,若没有则创建非核心线程执行任务;

        4)若线程池满了,则根据拒绝测试处理无法执行的任务;

        整体过程如下图:

    二、ThreadPoolExecutor类解析

      2.1 ThreadPoolExecutor的构造函数

        ThreadPoolExecutor类一共提供了4个构造函数,涉及5~7个参数,下面就5个必备参数的构造函数进行说明:

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }

        1)corePoolSize :初始化核心线程池中线程个数的大小;

        2)maxmumPoolSize:线程池中线程大小;

        3)keepAliveTime:非核心线程的超时时长;

          非核心线程空闲时常大于该值就会被终止。

        4)unit :keepAliveTime的单位,类型可以参见TimeUnit类;

        5)BlockingQueue workQueue:阻塞队列,维护等待执行的任务;

      2.2  私有类Worker

        在ThreadPoolExecutor类中有两个集合类型比较重要,一个是用于放置等待任务的workQueue,其类型是阻塞对列;一个是用于用于存放工作线程的works,其是Set类型,其中存放的类型是Worker。

        进一步简化线程池执行过程,可以理解为works中的工作线程不停的去阻塞对列中取任务,执行结束,线程重新加入大works中。

        为此,有必要简单了解一下Work类型的组成。

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /** Thread this worker is running in.  Null if factory fails. */
            //工作线程,由线程的工厂类初始化
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
            //不可重入的锁
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            .......
        }

        Worker类继承于队列同步器(AbstractQueueSynchronizer),队列同步器是采取锁或其他同步组件的基础框架,其主要结构是自旋获取锁的同步队列和等待唤醒的等待队列,其方法因此可以分为两类:对state改变的方法 和 入、出队列的方法,即获取获取锁的资格的变化(可能描述的不准确)。关于队列同步器后续博客会详细分析,此处不展开讨论。

        Work类中通过CAS设置状态失败后直接返回false,而不是判断当前线程是否已获取锁来实现不可重入的锁,源码注释中解释这样做的原因是因为避免work tash重新获取到控制线程池全局的方法,如setCorePoolSize。

      2.3  拒绝策略类

        ThreadPoolExecutor的拒绝策略类是以私有类的方式实现的,有四种策略:

        1)AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认拒绝处理策略)。

          2)DiscardPolicy:抛弃新来的任务,但是不抛出异常。

          3)DiscardOldestPolicy:抛弃等待队列头部(最旧的)的任务,然后重新尝试执行程序(失败则会重复此过程)。

          4)CallerRunsPolicy:由调用线程处理该任务。

        其代码相对简单,可以参考源码。

    三、任务执行过程分析

      3.1 execute(Runnable)方法

        execute(Runnable)方法的整体过程如上文1.2所述,其实现方式如下:

    public void execute(Runnable command) {
            //执行的任务为空,直接抛出异常
            if (command == null)
                throw new NullPointerException();
            //ctl是ThreadPoolExecutor中很关键的一个AtomicInteger,主线程池的控制状态
            int c = ctl.get();
            //1、判断是否小于核心线程池的大小,若是则直接尝试新建一个work线程
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //2、大于核心线程池的大小或新建work失败(如创建thread失败),会先判断线程池是否是running状态,若是则加入阻塞对列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //重新验证线程池是否为running,若否,则尝试从对列中删除,成功后执行拒绝策略
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //若线程池的状态为shutdown则,尝试去执行完阻塞对列中的任务
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //3、新建非核心线程去执行任务,若失败,则采取拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }

      3.2 addWorker(Runnable,boole)方法

        execute(Runnable)方法中,新建(非)核心线程执行任务主要是通过addWorker方法实现的,其执行过程如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
            //此处反复检查线程池的状态以及工作线程是否超过给定的值
            retry:
            for (int c = ctl.get();;) {
                // Check if queue empty only if necessary.
                if (runStateAtLeast(c, SHUTDOWN)
                    && (runStateAtLeast(c, STOP)
                        || firstTask != null
                        || workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                //核心和非核心线程的区别
                    if (workerCountOf(c)
                        >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateAtLeast(c, SHUTDOWN))
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                //通过工厂方法初始化,可能失败,即可能为null
                final Thread t = w.thread;
                if (t != null) {
                //获取全局锁
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
                        //线程池处于running状态
                        //或shutdown状态但无需要执行的task,个人理解为用于去阻塞队列中取任务执行
                        if (isRunning(c) ||
                            (runStateLessThan(c, STOP) && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //执行任务,这里会执行thread的firstTask获取阻塞对列中取任务
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                //开始失败,则会从workers中删除新建的work,work数量减1,尝试关闭线程池,这些过程会获取全局锁
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

      3.3  runWorker(this) 方法

         在3.2 中当新建的worker线程加入在workers中成功后,就会启动对应任务,其调用的是Worker类中的run()方法,即调用runWorker(this)方法,其过程如下:

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
            //while()循环中,前者是新建线程执行firstTask,对应线程个数小于核心线程和阻塞队列满的情况,
            //getTask()则是从阻塞对列中取任务执行
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    //仅线程池状态为stop时,线程响应中断,这里也就解释了调用shutdown时,正在工作的线程会继续工作
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        try {
                        //执行任务
                            task.run();
                            afterExecute(task, null);
                        } catch (Throwable ex) {
                            afterExecute(task, ex);
                            throw ex;
                        }
                    } finally {
                        task = null;
                        //完成的个数+1
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                //处理后续工作
                processWorkerExit(w, completedAbruptly);
            }
        }

       3.4 processWorkerExit(Worker,boole)方法

        当任务执行结果后,在满足一定条件下会新增一个worker线程,代码如下:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                //对工作线程的增减需要加全局锁
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
            //尝试终止线程池
            tryTerminate();
    
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
            //线程不是中断,会维持最小的个数
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                //执行完任务后,线程重新加入workers中
                addWorker(null, false);
            }
        }

      至此,线程池执行任务的过程分析结束,其他方法的实现过程可以参考源码。

    Ref:

    [1]http://concurrent.redspider.group/article/03/12.html

    [2]《Java并发编程的艺术》

  • 相关阅读:
    JAR 归档文件是与平台无关的文件格式
    事件的监听是由awt完成的
    AWT和Swing之间的基本区别
    Swing AWT一套新的图形界面系统
    AWT控件称为重量级控件
    java做web项目比较多
    Swing文本域的编辑
    AWT中文译为抽象窗口工具包
    swing包含了各种组件的类
    Java中的Swing及AWT又称GUI编程
  • 原文地址:https://www.cnblogs.com/love-yh/p/13111234.html
Copyright © 2011-2022 走看看