zoukankan      html  css  js  c++  java
  • jdk1.8 ThreadPoolExecutor实现机制分析

    ThreadPoolExecutor几个重要的状态码字段

    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    
    // runState is stored in the high-order bits
    /*
    * RUNNING:可以接受新的任务,也可以处理阻塞队列里的任务
    * SHUTDOWN:不接受新的任务,但是可以处理阻塞队列里的任务
    * STOP:不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务
    * TIDYING:过渡状态,也就是说所有的任务都执行完了,当前线程池已经没有有效的线程,这个时候线程池的状态将会TIDYING,并且将要调用terminated方法
    * TERMINATED:终止状态。terminated方法调用完成以后的状态
    * */
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;
    

    那么一般的正常线程池的状态是:RUNNING-> SHUTDOWN-> STOP-> TIDYING-> TERMINATED
    下面我就把这些状态码用控制台跑了下,以下是我的java程序,就是把上面的几个变量执行一遍:

    private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY = (1 << COUNT_BITS) - 1;
        private static final int RUNNING = -1 << COUNT_BITS;
        private static final int SHUTDOWN = 0 << COUNT_BITS;
        private static final int STOP = 1 << COUNT_BITS;
        private static final int TIDYING = 2 << COUNT_BITS;
        private static final int TERMINATED = 3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c) {
            return c & ~CAPACITY;
        }
    
        private static int workerCountOf(int c) {
            return c & CAPACITY;
        }
    
        private static int ctlOf(int rs, int wc) {
            return rs | wc;
        }
    
        public static void main(String[] args) throws InterruptedException {
            System.out.println(COUNT_BITS);//29位
            /*
            *下面操作很多都是根据这个29位来移动的,因为普通整数是四个字节32位,RUNNING是前三位是1后29位是0(11100000000000000000000000000000)
            * 而容量CAPACITY恰好和RUNNING相反前3位是0后29位是1所以他们求与刚好是0,故第一次调用workerCountOf的时候得到的线程数是0
            * 而状态是CAPACITY求非再与RUNNING求与,这个得到的是11100000000000000000000000000000
            * */
            System.out.println("CAPACITY_二进制:" + Integer.toBinaryString(CAPACITY));
            System.out.println("CAPACITY_十进制值:" + CAPACITY);
            System.out.println("RUNNING_二进制:" + Integer.toBinaryString(RUNNING));
            System.out.println("RUNNING_十进制值:" + RUNNING);
            System.out.println("SHUTDOWN_二进制:" + Integer.toBinaryString(SHUTDOWN));
            System.out.println("SHUTDOWN_十进制值:" + SHUTDOWN);
            System.out.println("STOP_二进制:" + Integer.toBinaryString(STOP));
            System.out.println("STOP_十进制值:" + STOP);
            System.out.println("TIDYING_二进制:" + Integer.toBinaryString(TIDYING));
            System.out.println("TIDYING_十进制值:" + TIDYING);
            System.out.println("TERMINATED_二进制:" + Integer.toBinaryString(TERMINATED));
            System.out.println("TERMINATED_十进制值:" + TERMINATED);
    
            AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
            System.out.println("c:" + Integer.toBinaryString(ctl.get()));
            System.out.println("runStateOf:" + Integer.toBinaryString(runStateOf(ctl.get())));
            System.out.println("workerCountOf_正在执行的线程数:" + Integer.toBinaryString(workerCountOf(ctl.get())));
    
            int c = ctl.get();
            int rs = runStateOf(c);
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN)) {
                System.out.println("--------");
            }
    
            //下面对c进行加1,来判断正在执行的线程数
            ctl.compareAndSet(c, c + 1);
            c = ctl.get();
            System.out.println("c2:" + Integer.toBinaryString(c));
            System.out.println("runStateOf2:" + Integer.toBinaryString(runStateOf(c)));
            System.out.println("workerCountOf2_正在执行的线程数:" + Integer.toBinaryString(workerCountOf(c)));
        }
    

    下面是执行的结果:

    29
    CAPACITY_二进制:11111111111111111111111111111
    CAPACITY_十进制值:536870911
    RUNNING_二进制:11100000000000000000000000000000
    RUNNING_十进制值:-536870912
    SHUTDOWN_二进制:0
    SHUTDOWN_十进制值:0
    STOP_二进制:100000000000000000000000000000
    STOP_十进制值:536870912
    TIDYING_二进制:1000000000000000000000000000000
    TIDYING_十进制值:1073741824
    TERMINATED_二进制:1100000000000000000000000000000
    TERMINATED_十进制值:1610612736
    c:11100000000000000000000000000000
    runStateOf:11100000000000000000000000000000
    workerCountOf:0
    c2:11100000000000000000000000000001
    runStateOf2:11100000000000000000000000000000
    workerCountOf2:1
    

    其他几个比较重要的字段说明:
    1、corePoolSize(来自构造函数的初始化),我觉得一般不要去设置,因为你可能根本就不知道多少cpu并行。
    2、maximumPoolSize默认的最大的线程数,初始化是32位整数的最大值Integer.MAX_VALUE
    3、Worker这是一个类对象,但是它继承AbstractQueuedSynchronizer,一般的继承AbstractQueuedSynchronizer是用来实现同步操作的,但是想了下Worker是线程安全的,且放在HashSet集合中,不会重复不必要用同步锁吧,锁的操作主要体现在runWorker,在执行worker的线程的时候会给自己加上同步锁,但是这样做有什么好处呢?这样后面回收worker的时候可能会用到,具体体现在interruptIdleWorkers。另外Worker还继承了Runnable接口,所以Worker应该只接受继承Runnable接口的对象。
    4、其他的以后想到再补吧。


    **代码执行顺序依次分析**

    execute这个方法做为ThreadPoolExecutor的入口方法,这个重要的方法。先看代码,jdk的源代码的注释已经说得很清楚了:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    

    从代码来看是分三个步骤来做,判断是否取corePollSize,如果不是或者已经满了就跳到第二步。第二步先判断是否线程池还是Running状态,如果是的话把等待要创建的线程的command加入到阻塞队列BlockingQueue,这是一个接口,实现方法在SynchronousQueue,SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。这个队列的实现方式不是本章讨论的内容。然后再double-check判断线程池的状态,如果不是Running状态的话就把command移除队列。如果不是就执行addWorker(null,false),但是现在的task是null的,所以再后面runWorker会有一步判断worker这个对象的firstTask是否为空,如果为空会尝试到队列取,取的方法是getTask()。其实上面三个步骤最终都要把command添加到HashSet里面。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//cas对当前的c加1直接结果是线程数加1,如果成功则跳出循环
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;//这边用ReentrantLock来防止并发,因为HashSet的线程不安全的
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//把Worker对象添加到worker是HashSet
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//thread开始执行,执行之后是调用到runWorker方法
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    这个方法的名字已经说明了一切就是要添加worker对象到workers这个HashSet( private final HashSet workers = new HashSet())。Worker类继承AbstractQueuedSynchronizer,我们知道AQS是线程的同步对象,之前我已经有研究过这个对象了,所以Worker继承了AQS这个对象之后用来保证thread的同步,因为在进入workers的线程都会进入到AbstractQueuedSynchronizer的等待队列。先看下Worker的对象关系图:

    接下来看runWorker这个方法,看下ThreadpoolExecutor是怎么处理Worker对象的。runWorker的作用是把workers中的worker取出来,然后执行这个worker。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    上面我们有讨论到getTask这个方法,什么时候会执行这个方法呢?其实则execute的时候的第二种情况会添加一个null的worker对象到workers中,但是这个task其实是存在的,存在BlockingQueue这个队列中。那么我们现在就要从BlockingQueue队列中取task。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    runWorker的最后会进行垃圾回收,回收已经执行完的worker。看processWorkerExit函数:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        tryTerminate();
    
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            /*如果completedAbruptly=true在执行runWorker的时候异常说明这个task没有被正常执行,添加一个null的worker到wokers( addWorker(null, false))
            * 如果如果completedAbruptly=false就要判断构造null的task对象来添加到workers了,就如注释所言:
            * replaces the worker if either
            * it exited due to user task exception or if fewer than
            * corePoolSize workers are running or queue is non-empty but
            * there are no workers.
            */
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    

    但是processWorkerExit中还尝试了终止线程的任务tryTerminate,tryTerminate放在这边也是不错的选择因为刚执行完worker释放,可以判断线程池的状态。我们可以看到线程池的状态改变是通过CAS来执行的

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //1、如果线程池处理Running状态就返回;
            // 2、如果线程池处理TIDYING或者TERMINATED状态说明已经STOP了直接返回。
            // 3、如果线程池处理SHUTDOWN,且阻塞队列还有值也直接返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //到了这步说明上面三点都不满足,如果还有正在执行的线程就尝试终止(interruptIdleWorkers)
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            //到了这步说明所有的线程都已经终止了,修改线程池的状态为TIDYING,TERMINATED
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    

    以上这个方法有一个蛮重要的方法,我们刚开始的时候有提到过,那就是interruptIdleWorkers方法。这个方法会尝试回收没有正在执行的闲置的Worker对象。

      /**
         * Interrupts threads that might be waiting for tasks (as
         * indicated by not being locked) so they can check for
         * termination or configuration changes. Ignores
         * SecurityExceptions (in which case some threads may remain
         * uninterrupted).
         *
         * @param onlyOne If true, interrupt at most one worker. This is
         * called only from tryTerminate when termination is otherwise
         * enabled but there are still other workers.  In this case, at
         * most one waiting worker is interrupted to propagate shutdown
         * signals in case all threads are currently waiting.
         * Interrupting any arbitrary thread ensures that newly arriving
         * workers since shutdown began will also eventually exit.
         * To guarantee eventual termination, it suffices to always
         * interrupt only one idle worker, but shutdown() interrupts all
         * idle workers so that redundant workers exit promptly, not
         * waiting for a straggler task to finish.
         */
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    

    我把注释也贴上来是因为注释写的比较清楚,这个方法主要是中断不用的Worker,判断条件是没被中断的线程且worker没有再执行(见tryLock),如果worker在执行那么state>0,这个是AQS的一个状态码。
    到这里就把一些主要的方法分析完了,比较乱,有点记流水账,欢迎园友交流讨论。

  • 相关阅读:
    Vue基础---官网
    echarts 4.0+画全国地图(省 市 区),地图撒点---vue项目中实战
    comeBack
    vue项目知识点汇总
    开发小总结
    Angular ---小demo体验angular项目开发知识点
    Angular ---小demo体验angular项目开发知识点2--angular中的表单验证、路由、路由守卫、http、http拦截器
    TypeScipt介绍
    Angular基础知识---模块、装饰器、组件、如何创建组件、事件处理机制、插值表达式(双括号)
    mybatis传递多个参数的三种方式
  • 原文地址:https://www.cnblogs.com/huaizuo/p/5623821.html
Copyright © 2011-2022 走看看