zoukankan      html  css  js  c++  java
  • 由浅入深了解线程池之源码初探

    前面初步了解了下线程池,包括如何定义一个线程池,线程池的常用构造参数以及jdk的默认实现,今天想结合源码来聊一聊线程池加载任务的顺序、线程池如何回收线程等问题;

    前置知识了解

    ctl参数

    • private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        一个线程安全的Integer参数,其维护了两个参数,其高三位表示线程池的状态,低29位表示有效(活跃)线程数((2^29)-1)

    线程池的状态

    • RUNNING
        线程池的初始状态,表示正在运行,可以正常接收任务、处理已添加过的任务;
    • SHUTDOWN
        不再接收新任务,但是还能处理已经添加的任务;调用shutdown方法进入此状态
    • STOP
        不再接收新任务,也不处理已添加的任务,并且会尝试中断正在处理的任务,调用shutdownNow()进入此状态
    • TIDYING
        当所有的任务已终止,ctl记录的活跃线程个数为0时线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()方法可以重写自定义其执行内容;当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由SHUTDOWN变为TIDYING,当线程池在STOP状态下并且线程池中执行的任务为空时,就会由STOP变为TIDYING。
    • TERMINATED
        表示线程池彻底终止;在TIDYING状态回调完可以通过重载terminated方法后进入

    源码初探

    构造方法

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            // 不合理参数校验
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            // 任务队列、线程工厂以及拒绝策略的非空校验
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            // 成员变量的赋值
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            // 将keepAliveTime转换成纳秒
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    构造方法就进行了一些校验和赋值操作

    execute方法

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            // ctl是一个线程安全的Integer,共32位,高3位表示线程池状态,低29位表示活跃线程数的个数;
            int c = ctl.get();
            // 如果活跃线程的个数小于核心线程,则直接创建线程执行任务
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 如果线程池是RUNNING且当前任务可以成功加入任务队列,在双重check后则尝试加入线程池执行
            if (isRunning(c) && workQueue.offer(command)) {
                /**
                 * 重新获取ctl并再次检查线程池状态是为了防止并发环境下,command(任务)加入队列后线程池的状态可能由RUNNING改为STOP或
                 * SHUTDOWN。当这种情况发生时需要将任务从任务队列移除并且拒绝
                 */
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                // 如果线程池的活跃线程数为0了,则向池内添加非核心线程任务(由于核心线程允许为0所以会存在活跃线程为0的情况)
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 如果线程池不是RUNNING状态或者加入队列失败了,这时会尝试直接向池内添加线程任务;如果addWorker失败则直接拒绝
            else if (!addWorker(command, false))
                reject(command);
        }
    

    execute方法指明了线程池执行任务顺序的大致方向,做了一些线程状态的校验拒绝工作,但是真正干活的还是下面addWorker方法

    worder类

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** 执行任务的线程 */
            final Thread thread;
            /** 线程需要执行的任务,可能为null */
            Runnable firstTask;
            /** 完成的任务数 */
            volatile long completedTasks;
    
            /**
             * 构建Worker(addWorker方法里添加worker时会调用)
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                // 设置AQS的state为-1(即同步状态)
                setState(-1); // inhibit interrupts until runWorker
                // 给任务赋值
                this.firstTask = firstTask;
                // 通过线程工厂创建执行任务的线程
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** 重写Runnable里的run方法  */
            public void run() {
                runWorker(this);
            }
    
    
            /**
             * 是否获取到锁
             * 0表示处于未被锁定状态
             * 1表示被锁定状态
             * @return
             */
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            /**
             * 使用AQS设置线程状态
             * @param unused
             * @return
             */
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            /**
             * 释放锁
             * @param unused
             * @return
             */
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    Worker类既继承了AQS又实现了Runnable接口,所以使其既是一个同步器又是一个线程类

    addWorker方法

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                // 获取线程状态
                int c = ctl.get();
                int rs = runStateOf(c);
                /*******************************第一部分(校验线程池当前状态是否能新增线程任务)**********************/
                /**
                 * rs >= SHUTDOWN 表示STOP、TIDYING、TERMINATED状态
                 * !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())表示三者只要有一个不成立则条件成立
                 * 总结:
                 * 1、当rs是非RUNNING状态直接返回false
                 * 2、当rs是非RUNNING状态且firstTask不等于空时返回false;
                 * 3、当rs是非RUNNING状态且workQueue等于空时返回false;
                 * 此处自己琢磨半天总结,如有错误还望大佬指正
                 */
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    // 获取线程池的活跃线程数
                    int wc = workerCountOf(c);
                    // 如果线程池的活跃线程数大于线程池的最大容量CAPACITY(536870911)或者大于等于核心线程数或最大线程数时返回false
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 通过cas添加活跃线程数量,添加成功则认为当前环境允许添加线程任务所以跳过校验直接进行添加和执行的操作
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // cas失败则继续内存自旋直到成功为止
                }
            }
            /*******************************第二部分(校验通过进行添加和执行线程的操作)**********************/
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 通过用户指定的线程工厂构建一个活跃线程(没指定走默认工厂)
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    // 添加活跃线程需要保证线程安全,所以需要加锁
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // 再次获取线程池的状态并检查
                        int rs = runStateOf(ctl.get());
                        /**
                         * 当线程池的状态为RUNNING或者是SHUTDOWN但是任务为空则进行添加活跃(工作)线程操作
                         */
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // 事先检查线程是否能够启动
                                throw new IllegalThreadStateException();
                            // 添加活跃(工作)线程
                            workers.add(w);
                            // 实时更新线程池的最大活跃线程数
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    // 如果活跃(工作)线程添加成功则启动线程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果线程启动失败则从活跃线程中移除
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    addWorker方法定义了如何将线程任务添加到线程池中去并执行

    runWorker方法

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            // 设置state=0调用tryRelease尝试释放锁,允许中断任务的执行
            w.unlock(); // allow interrupts
            // 下面的循环出现异常则该值为true否则会被值为false
            boolean completedAbruptly = true;
            try {
                // 如果task不为null,或者task为空但是从任务队列里能取到任务则一直循环取出任务并执行
                while (task != null || (task = getTask()) != null) {
                    // 加锁;
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    /**
                     * 如果线程池正在被终止(处于STOP/TIDYING/TERMINATED状态),则需要确保线程被中断
                     * 如果线程池为RUNNING/SHUTDOWN状态则清楚中断状态
                     */
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        // 设置线程为中断
                        wt.interrupt();
                    try {
                        // 无内容的钩子方法留给子类实现
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            // 无内容的钩子方法留给子类实现
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        // 完成任务数加1
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                /**
                 * 线程退出
                 * 如果completedAbruptly=true表示task.run()出了异常,如果如果completedAbruptly=false
                 * 表示while循环拿不到任务了
                 */
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    runWorker方法不断循环从队列里取任务并执行,

    线程池加载任务的顺序

      结合上面的execute方法我们大概可以得出线程池加载任务的顺序:

    • 尝试开启核心线程并执行任务
    • 如果核心线程数已经达标并且线程池是RUNNING状态则尝试加任务加入任务队列
    • 如果线程池不是RUNNING状态或者加入队列失败了,这时会尝试直接向池内添加线程任务;如果addWorker失败则直接拒绝;添加失败的原因有可能是达到了线程池的最大线程数也有可能是线程池处于非RUNNING

    线程池如何回收线程

    • RUNNING状态下
      在RUNNING状态下可以通过上面的runWorker来进行分析,当当前执行的任务为空或者任务队列里没有任务时会进行工作线程回收
    • SHUTDOWN状态下
      在调用shutdown()之后,会向所有的空闲工作线程发送中断信号

    一些感悟

      差不多两天时间的源码读下来越发有种蒙蔽的感觉,线程池涉及的知识点还是比较广泛的,包括AQS、多线程的知识、CAS等等,此篇博文暂且先到这了,其中不完善的地方只能后续慢慢学习补充了,不过从源码中倒是让我学到了一个编码小技巧,
    就是当你的代码里有多个变量需要同时进行判断时,并不用让变量一字排开进行或、与的操作,而是对变量进行巧妙的设计,将这些变量定义成整形,相互之间的大小根据业务需求来定,然后可能就一个范围的判断即可替代之前的一堆或与操作了

    希望每get一个知识点都能坚持用博客记录下来,加油!
  • 相关阅读:
    sublime显示当前文件的编码格式
    关于jquery中html()、text()、val()的区别
    bit,Byte,B,KB,MB,GB
    python之序列操作
    编程常用密匙
    js数组操作
    ob函数的使用
    php使用zlib实现gzip压缩
    js兼容性汇总
    centos7下源码编译安装mysql5.7
  • 原文地址:https://www.cnblogs.com/darling2047/p/15798570.html
Copyright © 2011-2022 走看看