zoukankan      html  css  js  c++  java
  • 并发系列(6)之 ThreadPoolExecutor 详解

    本文将主要介绍我们平时最常用的线程池 ThreadPoolExecutor ,有可能你平时没有直接使用这个类,而是使用 Executors 的工厂方法创建线程池,虽然这样很简单,但是很可能因为这个线程池发生 OOM ,具体情况文中会详细介绍;

    二、ThreadPoolExecutor 概览

    ThreadPoolExecutor 的继承关系如图所示:

    executor

    其中:

    • Executor:定义了 executor(Runnable command) 异步接口,但是没有强制要求异步;
    • ExecutorService:提供了生命周期管理的方法,以及有返回值的任务提交;
    • AbstractExecutorService:提供了 ExecutorService 的默认实现;

    1. 主体结构

    public class ThreadPoolExecutor extends AbstractExecutorService {
      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  // 状态控制变量,核心
      private final BlockingQueue<Runnable> workQueue;                         // 任务等待队列
      private final HashSet<Worker> workers = new HashSet<Worker>();           // 工作线程集合
      private volatile ThreadFactory threadFactory;       // 线程构造工厂
      private volatile RejectedExecutionHandler handler;  // 拒绝策略
      private volatile long keepAliveTime;                // 空闲线程的存活时间(非核心线程)
      private volatile int corePoolSize;                  // 核心线程大小
      private volatile int maximumPoolSize;               // 工作线程最大容量
    
      public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
      }
      ...
    }
    

    这里已经可以大致看出 ThreadPoolExecutor 的结构了:

    threadpool1

    2. Worker 结构

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
      final Thread thread;  // 持有线程,只有在线程工厂运行失败时为空
      Runnable firstTask;   // 初始化任务,不为空的时候,任务直接运行,不在添加到队列
      volatile long completedTasks;  // 完成任务计数
    
      Worker(Runnable firstTask) {
        setState(-1);   // AQS 初始化状态
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
      }
    
      public void run() {
        runWorker(this);  // 循环取任务执行
      }
      ...
      // AQS 锁方法
    }
    

    这里很容易理解的是 threadfirstTask;但是 Worker 还继承了 AQS 做了一个简易的互斥锁,主要是在中断或者 worker 状态改变的时候使用;具体 AQS 的详细说明可以参考,AbstractQueuedSynchronizer 源码分析


    3. ctl 控制变量

    ctl 控制变量(简记 c)是一个 AtomicInteger 类型的变量,由两部分信息组合而成(两个值互补影响,又可以通过简单的大小比较判断状态):

    • 线程池的运行状态 (runState,简记 rs),由 int 高位的前三位表示;
    • 线程池内有效线程的数量 (workerCount,简记 wc),由 int 地位的29位表示;

    源码如下:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;       // 用来表示线程数量的位数
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;  // 线程最大容量
    
                                                             // 状态量
    private static final int RUNNING    = -1 << COUNT_BITS;  // 高位 111,第一位是符号位,1表示负数
    private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 高位 000
    private static final int STOP       =  1 << COUNT_BITS;  // 高位 001
    private static final int TIDYING    =  2 << COUNT_BITS;  // 高位 010
    private static final int TERMINATED =  3 << COUNT_BITS;  // 高位 011
    
    private static int runStateOf(int c)     { return c & ~CAPACITY; }  // 运行状态,取前3位
    private static int workerCountOf(int c)  { return c & CAPACITY; }   // 线程数量,取后29位
    private static int ctlOf(int rs, int wc) { return rs | wc; }        // 状态和数量合成
    
    private static boolean runStateLessThan(int c, int s) { return c < s; } // 状态比较
    private static boolean runStateAtLeast(int c, int s) { return c >= s; } 
    private static boolean isRunning(int c) { return c < SHUTDOWN; } // RUNNING 是负数,必然小于 SHUTDOWN
    

    代码中可以看到状态判断的时候都是直接比较的,这是因为 TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING ;他们的状态变迁关系如下:

    threadpool2

    其中:

    • RUNNING:运行状态,可接收新任务;
    • SHUTDOWN:不可接收新任务,继续处理已提交的任务;
    • STOP:不接收、不处理任务,中断正在进行的任务
    • TIDYING:所有任务清空,线程停止;
    • TERMINATED:钩子方法,执行后的最终状态;

    三、ThreadPoolExecutor 源码分析

    1. 增加工作线程

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 这里正常情况下,只要大于SHUTDOWN,则必然不能添加线程;但是这里做了一个优化,
                // 如果线程池还在继续处理任务,则可以添加线程加速处理,
                // SHUTDOWN 表示不接收新任务,但是还在继续处理,
                // firstTask 不为空时,是在添加线程的时候,firstTask 不入队,直接处理
                // workQueue 不为空时,则还有任务需要处理
                // 所以连起来就是 rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||  // 容量超出,则返回
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;  // 线程数增加成功,则跳出循环
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)  // 如果线程状态改变时,重头开始重试
                        continue retry;
                }
            }
            // 此时线程计数,增加成功
          
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {  // 线程创建失败时,直接退出
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) { // 这里同样检查上面的优化条件
                            if (t.isAlive()) // 如果线程已经启动,则状态错误;
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize) largestPoolSize = s;  // 记录工作线程的最大数,统计峰值用
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();  // 启动线程
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted) addWorkerFailed(w); // 添加失败清除
            }
            return workerStarted;
        }
    

    2. 提交任务

    public void execute(Runnable command) {
      if (command == null) throw new NullPointerException();
      int c = ctl.get();
      if (workerCountOf(c) < corePoolSize) {   // 如果小于核心线程,直接添加
        if (addWorker(command, true)) return;
        c = ctl.get();
      }
      if (isRunning(c) && workQueue.offer(command)) {  // 任务入队
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))  // 再次检查,状态不是RUNNING的时候,拒绝并移除任务
          reject(command);
        else if (workerCountOf(recheck) == 0)  // 这里是防止状态为SHUTDOWN时,已经添加的任务无法执行
          addWorker(null, false);
      }
      else if (!addWorker(command, false))  // 任务入队失败时,直接添加线程,并运行
        reject(command);
    }
    

    流程图如下:

    threadpool2

    所以影响任务提交的因数就有:

    • 核心线程的大小;
    • 是否为阻塞队列;
    • 线程池的大小;

    3. 处理任务

    工作线程启动之后,首先处理 firstTask 任务(特别注意,这个任务是没有入队的),然后从 workQueue 中取出任务处理,队列为空时,超时等待 keepAliveTime

    final void runWorker(Worker w) {
      Thread wt = Thread.currentThread();
      Runnable task = w.firstTask;
      w.firstTask = null;
      w.unlock(); // allow interrupts
      boolean completedAbruptly = true;
      try {
        while (task != null || (task = getTask()) != null) {  // 获取任务
          w.lock();
          // 总体条件表示线程池停止的时候,需要中断线程,
          // 如果没有停止,则清除中断状态,确保未中断
          if ((runStateAtLeast(ctl.get(), STOP) ||
             (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
            !wt.isInterrupted()) 
            wt.interrupt();
          try {
            beforeExecute(wt, task); // 回调方法
            Throwable thrown = null;
            try {
              task.run();
            } catch (RuntimeException x) {
              thrown = x; throw x;
            } catch (Error x) {
              thrown = x; throw x;
            } catch (Throwable x) {
              thrown = x; throw new Error(x);
            } finally {
              afterExecute(task, thrown);  // 回调方法
            }
          } finally {
            task = null;
            w.completedTasks++;
            w.unlock();
          }
        }
        completedAbruptly = false;
      } finally {
        processWorkerExit(w, completedAbruptly);  // 退出时清理
      }
    }
    
    private Runnable getTask() {
      boolean timedOut = false; // Did the last poll() time out?
    
      for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
    
        // 此处保证 SHUTDOWN 状态继续处理任务,STOP 状态停止处理
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
          decrementWorkerCount();
          return null;
        }
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  // 是否关闭空闲线程
    
        if ((wc > maximumPoolSize || (timed && timedOut))  // 如果线程大于最大容量,或者允许关闭,且第一次没取到
          && (wc > 1 || workQueue.isEmpty())) {            // 返回空,最后由 processWorkerExit 清理
          if (compareAndDecrementWorkerCount(c))
            return null;
          continue;
        }
    
        try {
          // 是否超时获取
          Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
          if (r != null)
            return r;
          timedOut = true;
        } catch (InterruptedException retry) {
          timedOut = false;
        }
      }
    }
    

    4. 停止线程池

    public void shutdown() {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        checkShutdownAccess();     // 检查停止权限
        advanceRunState(SHUTDOWN); // 设置线程池状态
        interruptIdleWorkers();    // 设置所有线程中断
        onShutdown();              // hook for ScheduledThreadPoolExecutor
      } finally {
        mainLock.unlock();
      }
      tryTerminate();              // 继续执行等待队列中的任务,完毕后设置 TERMINATED 状态
    }
    
    public List<Runnable> shutdownNow() {
      List<Runnable> tasks;
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();   // 清空所有等待队列的任务,并返回
      } finally {
        mainLock.unlock();
      }
      tryTerminate();
      return tasks;
    }
    

    可以看到 shutdownNow 只比 shutdown 多了,清空等待队列,但是正在执行的任务还是会继续执行;

    四、拒绝策略

    之前提到了,提交任务失败的时候,会执行拒绝操作,在 JDk 中为我们提供了四种策略:

    • AbortPolicy:直接抛出 RejectedExecutionException 异常,这是默认的拒绝策略;
    • CallerRunsPolicy:由调用线程本身运行任务,以减缓提交速度;
    • DiscardPolicy:不处理,直接丢弃掉;
    • DiscardOldestPolicy:丢弃最老的任务,并执行当前任务;

    五、Executors 工厂方法

    另外就是根据线程池参数的不同,Executors 为我们提供了4种典型的用法:

    SingleThreadExecutor:单线程的线程池,提交任务顺序执行;

    public static ExecutorService newSingleThreadExecutor() {
      return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
    }
    

    如代码所示,就是最大线程、核心线程都是1,和无界队列组成的线程池,提交任务的时候就会,直接将任务加入队列顺序执行;

    FixedThreadPool:固定线程数量线程池:

    public static ExecutorService newFixedThreadPool(int nThreads) {
      return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
                                    new LinkedBlockingQueue<Runnable>());
    }
    

    SingleThreadExecutor 一样,只是线程数量由用户决定;

    CachedThreadPool:动态调节线程池;

    public static ExecutorService newCachedThreadPool() {
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, 
                                    new SynchronousQueue<Runnable>());
    }
    

    这里核心线程为0,队列是 SynchronousQueue 容量为1的阻塞队列,而线程数最大,存活60s,所以有任务的时候直接创建新的线程,超时空闲60s;

    ScheduledThreadPool:定时任务线程池,功能同 Timer 类似,具体细节后续还会讲到;

    总结

    • 决定线程池运行逻辑的主要有三个变量,核心线程大小,队列容量,线程池容量
    • 最后发现其实 Executors 提供的几种实现,都很典型;但是却容易发生 OOM ,所以最好还是自己手动创建比较好;
  • 相关阅读:
    svm 中采用自动搜索参数的方式获得参数值
    OpenCV中的SVM参数优化
    openCV训练程序申请内存不足
    opencv计算运行时间
    马氏距离(Mahalanobis distance)
    Azure网络排查基本梳理
    让Flow成为获取信息的利器(1)
    Azure VM培训简要总结和学习材料梳理
    Powershell利用$_变量批量部署Azure虚拟机
    Azure存储基本介绍
  • 原文地址:https://www.cnblogs.com/sanzao/p/10712778.html
Copyright © 2011-2022 走看看