zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor代码解析

    派生体系
    java.util.concurrent
      ThreadPoolExecutor
        AbstractExecutorService
          ExecutorService
            Executor
     
    这个类是Executor框的核心实现,它的名字向我们表明,它是使用thread pool实现的。这个thread pool主要解决了两个问题:
    1. 执行大量的单个异步任务,一般情况下,它能提升整体的性能。
    2. 执行由多个任务组成的任务集合,通过Future列表返回每个任务的执行结果。
     
    设计原理

     
    重要概念
    为了能够在更多上下文环境中使用,ThreadPool定义了一些概念,这些概念都直接或间接对应着可调节参数,如果不了解这些概念含义,很难正确地使用这些参数。下面来看一下这些概念及其含义:
     
    当前,核心,最小,最大线程数(poolSize, corePoolSize, minimumPoolSize, maximumPoolSize)
    poolSize: 当前处于运行和空闲状态的总线程数。
    corePoolSize: 核心线程数, 当poolSize<=corePoolSize时,存在的线程称为coreThread。
    minimumPoolSize: 最小线程数,当minimumPoolsize = allowCoreThreadTimeOut ? 0 : corePoolSize ,
    maximunPoolSize: 最大线程数。
    ThreadPool在运行过程中会自动的调节线程数量(poolSize), 一般来说,poolSize处于[corePoolSize maximumPoolSize]区间之内。
    当用户调用execute提交一个任务时,如果poolSize<corePoolSize, 会创建一个新线程处理这个任务。如果如果poolSize处于[corePoolSize maximumPoolSize]区间内,只有队列满是才会创建新线程。无论如何,poolSize不会大于maximumPoolSize。
    默认情况下,ThreadPool没有收到任何任务时pooSize = 0, 只有当ThreadPool开始收到任务之后才会创建线程。但是可以通过覆盖prestartCoreThread或prestartAllCoreThreads方法改变这种行为,提前创建线程。
     
    线程工厂--ThreadFactory
    ThreadPool使用实现了ThreadFactory接口的实现创建新线程。Executors工厂类提供了defaultThreadFactory方法,该方法返回一个默认的ThreadFactory实例。使用这个实例创建的线程具有相同的优先级,是非后台线程,命名上使用相同的前缀。如果不满意这这些行为,可以自己实现一个ThreadFactory交给ThreadPool使用。
     
    保持存活时间(keepAliveTime)
    如果poolSize > corePoolSize, 当一个线程的空闲时间大于keepAliveTime, 它会被终止掉。默认情况下当poolSize <= corePoolSize时,keepAliveTime不会有影响,如果调用 allowCoreThreadTimeOut(true), 可以让keepAliveTime在这个时间也起作用。
     
    任务排队(queuing)
    任何BlockingQueue的实例都可以用于保存排队的任务。不同的BlockingQueue实现决定了不同的排队策略:
    SynchronousQueue: 同步队列,当提交一个任务时,要求ThreadPool中当前有至少一个空闲线程,或者可以创建新的线程(poolSize < maximumPoolSize)立即执行这个任务,否则ThreadPool会拒绝这个任务。
    LinkedBlockingQueue: 无限制的队列(只受限于能够使用的内存), 不会处于full状态, offer方法不会返回false,这意味这ThreadPool的pooSize<=corePoolSize, 不会创建大于corePoolSize的线程数。
    ArrayBlockingQueue: 有限制的队列, 受限于它的capacity。当poolSize == corePoolSize且队列没满时, 新提交的任务会追加到队列中排队执行。 当poolSize在[corePoolSize maximumPooSize)区间同时队被填列满时,将会创建新的线程。直到poolSize == maximumPoolSize位置。 如果队列被填满同时pooSize == maximumPoolSize,新的任务会被拒绝。
     
    拒绝任务(rejected tasks)
    当ThreadPool遇到以下两种情况时会触发拒绝任务策略:
    1. 正常情况下BlockingQueue被填满,同时poolSize == maximumPoolSize。
    2. 被关闭
    ThreadPool使用RejectedExecutionHandler处理丢弃动作,默认定义了4中丢弃策略:
    ThreadPoolExecutor.AbortPolicy: 抛出RejectedExecutionException异常。
    ThreadPoolExecutor.CallerRunsPolicy: 自己执行这个被抛弃的任务。
    ThreadPoolExecutor.DiscardPolicy: 悄无声息的丢弃掉这人任务。
     
    状态
    ThreadPool定义了5状态
    RUNNING: 接受新提交的任务,执行队列中发任务。
    SHUTDOWN: 不接受新提交的任务,但仍然会执行队列中的人。
    STOP: 不接受新提交的任务,不执行队列中的任务,而且会打断正在执行中的任务。
    TIDYING: 所有的任务都终止了,并且线程数为0,当所有的线程都过渡到TIDYING状态后会调用treminated方法。
    TERMINATED: treminated方法调用已经完成。
     
    状态之间的转换关系
    RUNNING --> SHUTDOWN
    调用shutdown()
    (RUNNING或SHUTDOWN) -- > STOP
    调用shutdownNow()
    SHUTDOWN --> TIDYING
    队列为空,同时线程数为0
    TIDYING --> TREMINATED
    treminated()执行完成。
     
    向ThreadPool提交任务: execute
     
    ThreadPoolExecutor实例创建之后,在没有调用execute提交任务之前,ThreadPool中是没有线程的,线程的创建是依赖exeute来驱动的。可以说,exeute是ThreadPoolExecutor运行的触发器,所有我选择先从exeute方法开始分析代码。
    public void execute(Runnable command) {
      if (command == null)
        throw new NullPointerException();
      int c = ctl.get();
      if (workerCountOf(c) < corePoolSize) { //如果线程数小于 corePoolSize, 创建一个新线程。
        if (addWorker(command, true))
         return;
        c = ctl.get();
      }
      if (isRunning(c) && workQueue.offer(command)) { //如果处于RUNNGIN状态把任务放到队列中
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command)) //再次检查线程状态,如果不是RUNNING状态,把任务从队列删除,然后拒绝这个任务
          reject(command);
        else if (workerCountOf(recheck) == 0) //如果线程数为0,创建一个新线程
          addWorker(null, false);
      }
      /*如果运行到这里说明当前不是出于RUNNING状态,或处于RUNNING状态但队列已经被填满
      *尝试创建新的线程执行这个任务,如果失败,拒绝这任务
      */
      else if (!addWorker(command, false))
        reject(command);
    }
    以上就是exeute代码,它很简单,但其中ctl成员变量比较费解。ctl是AtomicInteger类型,它被用来打包保存ThreadPoolExecutor的状态和线程数。
    AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    它初始化时,把状态设置成RUNNING,下面来看看它的结构
     
    高位 ---- > 低位
    运算状态(run state)
    线程数(workerCount)
    31 -- 29
    28 -- 0
    状态位
    RUNNING
    1
    1
    1
     
    SHUTDOWN
     
    0
    0
    0
     
    STOP
    0
    0
    1
     
    TIDYING
    0
    1
    0
     
    TREMINATED
    0
    1
    1
     
    知道了这些数据的保存方式,把他们取出来,只需要一些简单的位运算就可以了。
    状态的大小关系 RUNNING < SHUTDOWN < STOP < TIDYING < TREMINATED,
    runStateOf(clt.get()) < SHUTDOWN RUNNING状态
    runStateOf(clt.get()) >= SHUTDOWN 非RUNNING状态
    这个大小关系要记住,这样理解代码会更快。
     
    创建新线程
    ThreadPool把线程封装成Worker对对象,添加worker就是添加线程,addWorker方法做的事情就是添加线程。
    private boolean addWorker(Runnable firstTask, boolean core) {
        /*这段代码的作用是确保满足一下条件的任意一个时才创建新线程
       *1.  处于RUNNING 状态, 可以接受新任务,可以继续执行队列中的任务
       *2. 处于SHUTDOWN状态,  队列不是空,且当前没有提交新任务
       */
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
     
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&   //非RUNNINGG状态
                ! (rs == SHUTDOWN &&  
                   firstTask == null &&  //当前提交的新任务
                   ! workQueue.isEmpty())) // 队列不是空
                return false;
     
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize)) //如果当前调用创建的是core线程,  确保当前线程数 <corePoolSize, 否则确保当前线程数< maximumPoolSize
                    return false;
                if (compareAndIncrementWorkerCount(c))  //原子操作,增加线程数
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //执行到这里表示已经通过检查可以创建新线程,并且线程数已经加1
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
     
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {  //再次检查,确保当前仍然满足允许创建线程的条件
                        if (t.isAlive()) // 确保Thread还没有调用start()
                            throw new IllegalThreadStateException();
                        workers.add(w); //把worker线程放进HashSet中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  //启动新线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
     
    线程的主循环
    Worker实现了Runnable接口
    private final class Worker extends AbstractQueuedSynchronizer  implements Runnable
    构造方法
    Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
    }
    创建线程时把Worker实例本身当做线程的Runnable产生,所以当线程启动后,将会调用Worker的run方法。
    public void run() {
            runWorker(this);
    }
    线程的主循环就在runWorker方法中实现
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;  //如果firstTask!=null, 先执行firstTask
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) { //如果没有firstTask,  从队列中取出一个task, 如果没有取到,退出线程
                w.lock();
            //如果处于状态>=STOP(前面已经讲过状态直接的大小关系), 确保线程处于interrupted状态
           //否则清除线程的interrupted状态
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task); //执行任务前调用的方法,默认什么都没干,用户可以根据需要覆盖它
                    Throwable thrown = null;
                    try {
                        task.run(); //执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); //任务完成之后调用的方法, 默认什么都没干,用户可以根据需要覆盖它
                    }
                } finally {
                    task = null; //把当前task置空,这样才能调用getTask从队列里取出任务
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
           //正常退出线程 completedAbruptly是true, 异常导致的线程退出为false
            processWorkerExit(w, completedAbruptly);
        }
    }
     
    从队列中得到排队的任务
    在runWorker主循环中,除了第一次的任务从worker的firsTask(在它不是null的情况下)取之外, 后面每次都是调用getTask从队列中取出一个任务。
    下面是getTask的代码分析
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
     
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); //得到当前状态
     
            // 如果当前状态 > SHUTDOWN 退出线程
        // 如果当前状态 == SHUTDOWN 且 队列为空,退出线程
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();  //减少当前线程数
                return null;
            }
     
            int wc = workerCountOf(c); //得到当前的线程数
     
            //线程是否允许超时的条件: 设置允许coreThread超时,或者当前线程数 > corePoolSize
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        //线程退出需要同时满足以下两个条件条件: 
        //1. 当前线程数>maximumPooSize 或 允许超时同时检查到已经超时
        //2. 当前线程数>1 或 队列为空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c)) //减少当前线程数, 这个方法确保多线程环境下不会过多地结束线程。
                    return null;
                continue;
            }
     
            try {
               //取出一个任务。如果允许超时,调用poll,否则调用take
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true; //已经超时,运行到这里表明poll超时返回
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    getTask的功能除了取出一个任务以外,它还负责在条件满足的情况下正常地结束一个线程
     
    线程结束
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 如果线程是由于异常原因结束的,这里要纠正线程数
            decrementWorkerCount();
     
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);  //把线程从HashSet中删除
        } finally {
            mainLock.unlock();
        }
     
        tryTerminate(); //尝试终止整个ThreadPool
     
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {  //如果当前状态<STOP
            if (!completedAbruptly) { //如果不是异常结束
                 //计算最小线程数min
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty()) 
                    min = 1;
                if (workerCountOf(c) >= min)  //如果当前线程数>=min直接返回
                    return; // replacement not needed
            }
            //创建新线程, 条件:
            //当前线程正常结束
            //当前线程异常结束,但当前线程数小于最小线程数
            addWorker(null, false);
        }
    }
     
    上面的代码实现了线程的生命周期的管理,线程只有在ThreadPoolExecutor的状态处于RUNNGIN或SHUTDOWN时才可以存在。下面是这两种状态下线程的生存状态:
    RUNNING:
        允许coreThread超时: 线程空闲(意味着队列为空)时间超过 keepAliveTime, 线程会被结束, 直到线程数为0。
        不允许coreThread超时:  线程空闲时间超过 keepAliveTime, 线程会被结束,直到线程数为corePoolSize。
    SHUDOWN:
          当线程把已经在队列里的所有任务执行完毕后,所有线程都会进入退出流程,最终退出。
     
    整个ThreadPoolExecutor的状态变迁
    前面已经讲过,ThreadPool的状态和线程数被打包方进一个32整数中:
    AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    初始化把状态设置成RUNNING, 线程为0
    调用shutdown时把状态从RUNNING置为SHUTDOWN,  随后过渡到TIDYING->TREMINATED。
    当调用shutdownNow时把状态从(RUNNING 或 SHUTDOWN) 设置为STOP,  随后过渡到TIDYING->TREMINATED。
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);    //只有当前状态<SHUTDOWN时才执行状态设置的动作
            interruptIdleWorkers();  //打断所有空闲的的线程,让这些线程有机会自己结束
            onShutdown(); // 回调方法,默认什么都没做,子类可以覆盖
        } finally {
            mainLock.unlock();
        }
        tryTerminate(); //尝试执行ThreadPool的结束操作
    }
    shutdownNow和shutdown的操作大致一样,不同的是它把状态设置成STOP,还会返回队列中没有来得及执行的任务list。
    tryTerminate方法作用是尝试结束整个ThreadPool, 它不一定会执行真正的结束动作。它在三个地方被调用, worker线程结束时,shudown中,shutdownNow中。
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
        //满足以下三个条件中的任何一个就立即返回
        //1. 处于RUNNGING状态
        //2. 状态>= TIDYING
        //3. 处于SHUTDOWN状态,且队列不是空
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
        //如果处于STOP状态,且线程数不为0,通知一个处于空闲的线程结束自己
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
        //执行到这里表示目前状态>=SHUTDOWN,线程数已经是0
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //总有一个线程会运行到这里,把状态置为 TIDYING
                    try {
                        terminated(); //调用回调方面,默认什么都没干,子类可以覆盖
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));  //把状态置为TREMINATED, 自此整个ThreadPool才算终结
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    tryTerminate之所以要在三个地方调用,是为了保证当调用shutdown或shutdownNow之后,总有一个线程会完成最后的终结工作。
     
    参数设置
    分析完前面代码后,再来使用它,它的参数怎么设置自然就了然于心。
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
    public void allowCoreThreadTimeOut(boolean value)
    public void setCorePoolSize(int corePoolSize) 
    public void setKeepAliveTime(long time, TimeUnit unit)
    public void setMaximumPoolSize(int maximumPoolSize)
    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) 
    public void setThreadFactory(ThreadFactory threadFactory)

  • 相关阅读:
    python中不可变数据类型和可变数据类型
    悲观锁与乐观锁
    MySql的隔离级别和锁的关系
    关于content-type请求头的说明
    数据库事务的四大特性以及事务的隔离级别
    [Vue] : 路由
    [Vue] : 组件
    [Vue] : vue-resource 实现 get, post, jsonp请求
    [Vue] : 动画
    [Vue] : 自定义指令
  • 原文地址:https://www.cnblogs.com/brandonli/p/10023970.html
Copyright © 2011-2022 走看看