zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor的execute源码分析

    上一篇文章指出,ThreadPoolExecutor执行的步骤如下:

    1. 向线程池中添加任务,当任务数量少于corePoolSize时,会自动创建thead来处理这些任务;

    2. 当添加任务数大于corePoolSize且少于maximmPoolSize时,不再创建线程,而是将这些任务放到阻塞队列中,等待被执行;

    3. 接上面2的条件,且当阻塞队列满了之后,继续创建thread,从而加速处理阻塞队列;

    4. 当添加任务大于maximmPoolSize时,根据饱和策略决定是否容许继续向线程池中添加任务,默认的饱和策略是AbortPolicy(直接丢弃)。

    我们直接可以通过ThreadPoolExecutor的execute方法源码来跟踪这个流程。首先,由于在execute方法中常常会根据线程池的状态选择判断一些逻辑,因此在介绍该方法之前首先说一下线程池的几种方法。

    1. 线程池的状态:

    1. RUNNING:该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务;

    2. SHUTDOWN:该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务;

    3. STOP:该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务;

    4. TIDYING:所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法;

    5. TERMINATED:terminated()方法调用完成后变成此状态。

    几个状态相关的方法:

    runStateOf(int c) 方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态

    workerCountOf(int c) 方法:c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量

    ctlOf(int rs, int wc) 方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl

    也就是说32位含义:(高三位表示状态)+ (低29位表示线程数量)。

    接下来分析源码:

     

    2. execute代码

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. 如果运行的线程少于corePoolSize,
             * 尝试开启一个新线程去运行command,command作为这个线程的第一个任务,并运行
             *
             * 2. 如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程
             *(因为可能存在有些线程在我们上次检查后死了),或者进入这个方法后,pool被关闭了
             * 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,
             * 如果池中没有线程了,新开启 一个线程
             *
             * 3. 如果无法将任务入队列(可能队列满了),需要新开区一个线程
             * 如果失败了,说明线程池shutdown或者饱和了,所以我们拒绝任务
             */
             
            // 1.当运行的线程少于corePoolSize,
            // 则直接执行command任务,addworker(command,true)会产生一个新线程来执行这个任务
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            
            // 2.  线程池处于RUNNING状态,并将任务放入workQueue队列,但不执行addWorker(表明不创建新的线程)
            // 双重校验可防止添加任务到workQueue队列后,线程池状态由于意外等原因处于非RUNNING状态,
            // 此时就需要从workQueue队列remove掉这个任务
            // 注:offer方法不会阻塞,如果不能插入队列直接返回false。(有可能造成数据丢失?这里不会,也就是说阻塞队列满了)
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            
            // 3. 如果线程池不是running状态或者无法入队列,执行线程池的饱和策略
            else if (!addWorker(command, false))
                reject(command);
        }
    

    从上面代码可知,java线程池在任务比较少时(当运行的线程少于corePoolSize),直接通过addWorker来执行任务,当任务比较多时,使用了阻塞队列,阻塞队列里存放的是Worker对象,Worker类是ThreadPoolExecutor的一个内部类,它实现了Runable接口,具有线程的功能。同时还继承了AbstractQueuedSynchronizer(AQS),因此也具有锁的功能。那么ThreadPoolExecutor中如何去执行阻塞队列里面的Worker任务的呢?首先我们来分析一下doWorker,看它是如何执行任务,以及如何触发执行阻塞队列里面的任务的。

     

    3. doWorker代码

    doWorker的的作用首先是创建线程,然后执行任务,源码如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                // 获取线程池运行状态,
                // 线程池的运行状态:runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                // CAS算法
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 如果添加任务成功,则跳出retry,也就是跳出整个循环体
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                // 通过线程池的ThreadFactory创建一个线程,用于执行这个firstTask任务
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        // 说明:(rs == SHUTDOWN && firstTask == null)可能是workQueue中仍有未执行完成的任务,
    // 创建没有初始任务的worker线程执行 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 提前检查t线程是否启动,如果是就抛非法线程状态异常 if (t.isAlive()) throw new IllegalThreadStateException(); // workQueue队列中添加Worker对象 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 往HashSet中添加worker成功,启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

    代码看起来有点长,但只做了两件事:

    1)用循环CAS操作来将线程数加1;

    2)新建一个线程并启执行这个任务。

    代码中使用的retry,它类似与goto, 用于控制跳出循环体,retry可以随意命名,只要遵循java的命名规则即可。

    CAS会使用循环机制,当存在多线程的情况下,通过比较与交换,其它线程通过循环可以的更新最新值。关于CAS可以参考《深入浅出CAS》

    在上面源码中可以看到,addWorker会用当前firstTask创建一个Worker对象,相当于对firstTask的包装,然后用Worker对象作为firstTask创建一个Thread,该Thread保存在Worker的thread成员变量中。在addWorker中通过t.start()启动了这个线程,线程中执行runWorker方法。

     

    4. 内部类Worker

    那么ThreadPoolExecutor中如何去执行阻塞队列里面的Worker任务的呢?看到这里好像还是没有答案。那接着分析Worker这个内部类:

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             */
            Worker(Runnable firstTask) {
                // 设置AQS的同步状态,大于0代表锁已经被获取
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                // 调用ThreadPoolExecutor的runworker方法
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    在addWorker中通过t.start()启动了这个线程,线程中执行runWorker方法。

     

    5. runWorker代码

    到目前为止还是没有涉及到阻塞队列!可是到runWorker中就可以看到啦!

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    上面代码关键点是while循环getTask()方法,通过循环不断的调用getTask()从阻塞队列中获取任务,通过这个方法,它与阻塞队列建立桥梁。目前我们已经知道当添加任务数量大于coolPoolSize(且小于maximumPoolSize)的时候,并不会创建线程,但是由于在任务数量小于coolPoolSize之前调用了addWorker并触发t.star()执行,从而调用了runWorker,通过循环不断的调用getTask()从阻塞队列中获取任务,如果getTask()返回不为null,则上锁,执行任务,任务执行完成之后解锁。如果getTask()返回null,改变completedAbrutly状态,然后调用processWorkerExit() 退出worker线程。

     

    6. getTask代码

    由第5点引出了getTask方法。

     private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    getTask中主要看获取任务的代码如下:

    1. workQueue.poll():如果在keepAliveTime时间内,阻塞队列中没有任务,返回null;
    2. workQueue.take():如果阻塞队列为空,当前线程会被阻塞;当队列中有任务加入时,线程被唤醒,并返回任务。

    6. 小结

    本文只是对线程池正常的工作流程进行了分析,并没有对线程池shutdown或者stop的情况进行分析,这些部分涉及到AQS等并发技术,这部分比较复杂,感兴趣可以更加深入研究一下。

    参考:

    1. https://www.cnblogs.com/trust-freedom/p/6681948.html#top
    2. https://www.jianshu.com/p/fb6e91b013cc
  • 相关阅读:
    Date日期对象
    JAVA适配器
    java 对象的多态性
    简单轮播
    ecshop 教程地址
    瀑布流js排列
    phpcms 搜索结果页面栏目不显示解决 方法
    手机自动跳转
    字串符转换数字、取小数点后两位数的方法
    js 判断鼠标进去方向
  • 原文地址:https://www.cnblogs.com/chenjunjie12321/p/9515511.html
Copyright © 2011-2022 走看看