zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor

    1 简介

    线程池提供了一种限制和管理资源的方法,包括线程、任务组、并且保存了一些基本的统计信息,如已完成的任务的数量等等。

    为了可扩展性,该类提供了很多可调参数以及钩子方法(hook方法)。建议使用工厂类Executors提供的创建线程池的方法。该类类图如下:
    image

    1.1 ctl字段

    state字段打包了两个含义:worker线程数量、线程池状态。

    • 高3bit位:线程池状态,runState
    • 低29bit位:worker线程数量,workerCount

    workerCount之所以放在低位,原因在于低位方便进行增减操作。线程池的状态值是有顺序的,因此可以有序的比较。runState的状态流转如下:

    • RUNNING -> SHUTDOWN,调用shutdown()方法,可能隐含在finalize()方法中
    • RUNNING or SHUTDOWN -> STOP,调用shutdownNow()
    • STOP -> TIDYING,当线程池是empty时
    • TIDYING -> TERMINATED,当terminate()钩子方法执行完成
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    //  运行状态描述 已知:RUNNING=111, SHUTDOWN=0, STOP=001, TIDYING=010, TERMINATED=011
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    //// Packing and unpacking ctl
    // 获取ctl中的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取到线程池中的worker线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 将rs和wc打包成ctl值
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
    

    1.2 mainLock字段

    这个锁用于控制worker集合的访问控制,这里之所以使用lock而不是用安全集合,原因在于lock可以让访问序列化,这样就避免了多线程并发调用interruptIdleWorkers()时产生中断风暴。

    1.3 线程池变量

    任务队列
    任务使用阻塞队列实现,使用workQueue.isEmpty()判断队列是否为空,而非workQueue.poll()==null判断,这样的判空方式更加通用,避免特殊队列,如DelayQueue:

    private final BlockingQueue<Runnable> workQueue;    // 任务队列。
    

    线程池
    线程池使用HashSet存储,并通过mainLock加锁控制其访问

    private final ReentrantLock mainLock = new ReentrantLock(); 
    private final Condition termination = mainLock.newCondition();  // 用于支持awaitTermination
    private final HashSet<Worker> workers = new HashSet<Worker>(); // 仅能在mainLock下访问
    

    线程池控制

    线程池提供了变量用于控制,线程池的大小、已完成任务数量等等。

    private int largestPoolSize;  // 追踪最大达到的线程池大小,仅能在mainLock下访问
    private long completedTaskCount;  // 完成任务的计数器,仅在工作线程终止时更新,仅在mainLock下访问
    ///// 所有的控制参数都被定义为volatile,以便正在进行的动作是基于最新的值 ///////////
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    // 当线程池中线程数量小于corePoolSize时,是否允许线程池中的线程超时
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    

    2 Worker线程

    worker用于表示线程池中的工作线程。ThreadPoolExecutor自己封装了其用于运行任务的工作线程。

    • Runnable:实现了线程的特性
    • AQS:实现了锁的特性

    Worker实现AbstractQueuedSynchronizer类,用以简化获取和释放围绕每个任务执行的锁。Worker本身实现了非重入功能,因为我们不希望工作任务在调用像setCorePoolSize这样的池控制方法时重新获取锁。另外,为了在线程实际开始执行任务前抑制,将lock状态初始化为负值,并在启动时清除它(在runWorker中)。非重入锁的特性会让Worker执行任务的时候先加锁,如果想要中断工作线程,需要先获取锁,否则无法中断,工作线程执行完任务才会释放锁。

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
    
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;
    
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    

    3 execute()

    该方法用于执行提交的任务。根据线程池的线程数量、线程池状态,来决定将提交的任务,放入任务队列抑或添加worker线程、抑或拒绝任务:

    • 放入任务队列

      ① worker线程数量<corePoolSize,但addWorker()添加线程池失败,且线程池为运行态
      ② worker线程数量>=corePoolSize,且线程池为运行态;

    • 线程池中添加worker线程,执行任务

      ① worker线程数量<corePoolSize
      ② 当任务队列添加新task成功,且线程池仍在运行状态,但此时线程池worker线程数量为0
      ③ 当任务队列添加新task成功,且线程池状态已经不在运行态,但从任务队列中移除新任务失败,但此时线程池worker线程数量为0

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {    // 若worker线程数量小于corePoolSize,addWorker()添加工作线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();    // 用于双重校验
            if (! isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) {    // 若线程池中的线程已经全部结束(可参考后续代码),则重新向线程池添加worker线程
                addWorker(null, false);
            }
        } else if (!addWorker(command, false)) {
            reject(command);
        }
    }
    

    3.1 addWork

    该方法用于向线程池中添加一个worker线程,并启动该工作线程:

    1. 校验线程池状态、校验线程池中线程数量等
    2. 将ctl字段中workerCount数量自增
    3. 创建Worker线程
    4. 使用mainLock加锁控制:双重校验(线程池状态),将Worker线程放入线程池中个(呼应上文利用mainLock控制线程池访问)
    5. 若worker线程向线程池添加成功,则启动worker线程

    注意:若3~4步骤中执行失败,将会执行回滚操作

    /*
    入参:core,为true表示线程池的最大边界是corePoolSize;为false表示线程池的最大边界是maximumPoolSize
    返回值为false的情况:
        ① 线程池已经关闭;②线程池中线程数量超过边界值;③ ThreadFactory创建线程失败
    */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 这里解释一下第二个判断条件。该条件是我们在execute()方法执行到addWorker中的情况③。
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))  // 将ctl字段中的workerCount部分自增1
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        // 走到此处表明,ctl字段中的workerCount的数量已经增加。接下来流程:
        // (1) 创建worker线程;(2) 若创建失败回滚; (3) 创建成功,双重校验线程池状态(失败回滚),向线程池添加并启动worker线程
        boolean workerStarted = false;  // 线程池是否启动成功
        boolean workerAdded = false;    // worker线程是否成功添加到线程池中
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 此处说明一下判断2,参考在execute()方法执行到addWorker的case情况3。
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 预先检测线程是否可以启动,如果线程已经是启动的线程,说明这个线程有问题(因为到目前为止,该线程还未调用start()方法进行启动呢)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    // Worker线程创建失败的回滚操作: 1. 从Pool中移除worker; 2. workerCount减1; 3. 尝试终止
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
    

    3.2 执行任务

    任务的执行,使用的是Worker线程池的run()方法。在addWorker()方法中,运行任务的核心代码如下:

    Worker w = new Worker(firstTask);  
    final Thread t = w.thread;
    workers.add(w);
    t.start();
    
    ///// Worker类的内容
    // 在创建Worker线程的时候,内部会利用ThreadFactory创建一个Thread,该Thread最终执行start()方法执行Worker#run()方法
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    // worker线程执行任务的核心在runWorker()代码中
    public void run() {
        runWorker(this);
    }
    

    3.2.1 runWorker()

    上述代码可知,执行任务的核心方法是runWorker()方法:
    在每个任务的执行过程中,会留空beforeExecute(wt, task)和afterExecute(task, thrown)用于任务执行的前置后置处理。该方法的执行流:

    1. for()循环调用getTask()获取任务
    2. getTask()得到任务task != null,正常执行
    3. getTask()得到任务task == null,则调用processWorkerExit()然后worker线程执行结束。注意,到这里该worker线程真的就结束了。

    注意:当执行getTask()==null,worker线程执行结束,但理论上如果线程池状态仍然没有终止,此时processWorkerExit()在处理worker线程结束的最后阶段,会判定是否为线程池重新生成新的worker线程,静待新的任务到来。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();     // allow interrupts.
        boolean completedAbruptly = true;   // 任务运行期间是否出行异常
        try {
            while (task != null || (task = getTask()) != null) {
                // 非重入锁加锁,利用非重入锁加锁,可以避免在运行期间,对该worker线程的其他操作,如shutdown()
                w.lock();
                /* Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)
                是一个双重校验,对于不是STOP状态但被中断的worker线程要清除中断状态,避免在getTask()的时候返回空。 这种case在这种情况下可能发生。调用shutdownNow()方法的时候,刚好在执行getTask()且正常返回任务,此时shutdownNow()方法会正常调用中断worker线程的方法。因此这里会对中断状态清除,使用 Thread.interrupted()*/
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))  && !wt.isInterrupted()) {
                    // 此处worker线程中断自身,BlockingQueue的take()/poll()方法会使用lock.lockInterruptibly()方式加锁
                    // 假如第一次循环到此处,worker线程执行中断;执行完任务后,再次getTask(),即便该方法没有返回null(正常不可能),也会在take()时因为加锁判断中断而结束
                    wt.interrupt();  
                }
                try {
                    beforeExecute(wt, task); 
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    3.2.2 getTask()

    注意,如果线程池中线程数量小于核心线程数量时,allowCoreThreadTimeOut=true,则在使用poll()获取任务超时后,getTask()方法将会返回null,此时Worker线程将会执行结束。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 如果线程池状态已经到了该结束的时候,则直接返回null。注:线程池状态>=STOP时,将不会再处理任务队列中的任务。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {  
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
            // 该状态用于判断worker线程调用BlockingQueue获取task时,是否可以超时
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  
            // 线程池数量大于maximumPoolSize,该线程获取任务直接返回null。
            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                // 阻塞队列上尝试获取数据
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    4 shutdown

    4.1 shutdown()

    用于停止线程池,使用mainLock加锁,用于控制多线程同时调用shutdown()方法关闭线程池时,让线程能有序的进行:

    • 将线程池设置为SHUTDOWN状态,即调整ctl字段
    • 中断所有空闲线程:阻塞在任务队列上的线程
    • 执行onShutdown()的钩子方法
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    // mainLock避免多线程同时中断时的中断风暴
    // worker.tryLock(),只对空闲线程加锁,该锁不可重入。工作中的线程也会worker.lock():参考runWorker()方法
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 此处中断的是worker线程
                // 当worker线程获取任务为空,会在BlockQueue上阻塞进入等待队列。
                // 这里的中断将会唤醒阻塞中的worker线程,getTask()方法判断线程池状态为SHUTDOWN,worker线程执行结束
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    

    尝试进行一次worker线程池的终止操作:

    • 如果任务队列仍然存在任务,或线程池状态不对,则返回
    • 如果线程池中仍然有线程存在,则尝试中断一个活跃线程,然后退出
    • 终止线程池操作:

      ①将线程池设置为TIDYING
      ②执行terminated()方法,此为钩子方法
      ③将线程池设置为TERMINATED

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    

    4.2 shutdownNow()

    这个方法没啥可说的,可对比runWorker()方法来看。

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 这里和runWorker()方法关联
            // runWorker()在执行任务之前都会判断线程池是否为STOP。
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();   // 只对已经开始执行任务的所有worker线程发起中断
        } finally {
            mainLock.unlock();
        }
    }
    
  • 相关阅读:
    PHP页面静态化
    PHP实现单文件、多文件上传 封装 面向对象实现文件上传
    PHP MYSQL
    MySQL 数据表
    MySQL基础
    DOM事件处理程序-事件对象-键盘事件
    JS--显示类型转换Number—隐式类型转换
    JS的数据类型
    JS属性读写操作+if判断注意事项
    Javascript进阶篇——总结--DOM案例+选项卡效果
  • 原文地址:https://www.cnblogs.com/wolfdriver/p/10491191.html
Copyright © 2011-2022 走看看