zoukankan      html  css  js  c++  java
  • 《java.util.concurrent 包源码阅读》11 线程池系列之ThreadPoolExecutor 第一部分

    先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么:

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }

    // 如果任务可以被加入到任务队列中,即等待的任务数还在允许的范围内, // 再次检查线程池是否被关闭,如果关闭的话,则移除任务并拒绝该任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果任务数超过了现有worker线程的承受范围,尝试新建worker线程 // 如果无法添加新的worker线程,则会拒绝该任务 else if (!addWorker(command, false)) reject(command); }

    在执行任务时,需要经常检查线程池的状态,那么接下来说说线程池是如何进行状态控制的。上面的代码有个成员变量叫做ctl,它用于标记线程池状态和worker线程的数量,是一个AutomaticInteger对象。

        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    ctl是一个32位的整数,最高的3位表示状态:

    111为running,

    000为shutdown,

    001为stop,

    010为tidying,

    011为ternimated。

    因此状态值就是这三位加上29个0,因此running的值是个负整数(最高位为1),其他状态都是正整数,后面判断状态会比较值的大小时会用到这点。

    剩下的29位表示worker线程的数量(因此最大允许的线程数就是2的29方减1)。

    这里是说说这几个状态的意义,这几个状态发生的顺序正好就是上面列出的顺序:

    running表示正常运行状态

    shutdown状态意味着发出了一个shutdown信号,类似于你点击了windows的关机按钮

    stop表示shutdown信号收到,等于windows响应了这个信号,发出正在关机的信息

    tidying发生在stop之后做出的响应,表示这个时候在清理一些资源,

    ternimated发生在tidying完成之后,表示关闭完成。

    接着来看添加一个worker线程时都发生了什么:

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 返回false的情况:
                // 1. rs>shutdown,即shutdown和running以外的状态
                // 2. shutdown的状态
                //     1)firstTask不为null,即有task分配
                //     2)没有task,但是workQueue(等待任务队列)为空
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    // 1. 如果没有设定线程数的限制,worker线程数不能大于最大值(2的29次方-1)
                    // 2. 如果是固定尺寸的线程池,不能大于固定尺寸
                    // 3. 如果是可扩展的线程池,不能大于规定的线程数的上限
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 用CAS操作增加线程数量,如果失败,重新循环
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    loop
                }
            }
    
            // 新建worker线程
            Worker w = new Worker(firstTask);
            Thread t = w.thread;
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);
    
    
                // 检查以下任一状态是否出现:
                // 1. 创建线程失败
                // 2. rs>shutdown,即shutdown和running以外的状态
                // 3. rs==shutdown,有任务分配
                if (t == null ||
                    (rs >= SHUTDOWN &&
                     ! (rs == SHUTDOWN &&
                        firstTask == null))) {
                    decrementWorkerCount();
                    tryTerminate();
                    return false;
                }
    
                workers.add(w);
    
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
            } finally {
                mainLock.unlock();
            }
    
            t.start();
            // 这里考虑一种极少出现的情况,如果worker线程调用start没有完成时,
            // 线程池进入Stop状态,这个时候会调用Thread#interrupt中断每个
            // worker线程,但是 interrupt对没有start的线程不一定起作用,这样
            // 就会漏掉了对这个thread的interrupt,因此在worker线程start之后
            // 检查以下,如果stop了,而这个线程却没有被interrupt,补上这个漏掉
            // 的interrupt。
            if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
                t.interrupt();
    
            return true;
        }

    这篇文章主要讲线程池如何处理任务,下一篇文章将会讲worker线程是如何工作的。

  • 相关阅读:
    <%%>与<scriptrunat=server>,<%=%>与<%#%>的区别
    UVA 11134
    Codeforces Round #219 (Div. 1)(完全)
    Delphi版浏览器(持续更新)
    Spring3.0 入门进阶(三):基于XML方式的AOP使用
    UVa 10801 Lift Hopping / floyd
    poj 3501 Escape from Enemy Territory 二分+bfs
    vim 编辑器
    RRT路径规划算法(matlab实现)
    A*寻路算法C++简单实现
  • 原文地址:https://www.cnblogs.com/wanly3643/p/3911765.html
Copyright © 2011-2022 走看看