zoukankan      html  css  js  c++  java
  • 线程池ThreadPoolExecutor源码分析,看这一篇就够了

    前言

    多线程是我们日常工作中很少能接触到的技术,但是面试的时候100%会被问到,万一工作中用到了基本不会,本篇咱们就来深入分析线程池的实现类ThreadPoolExecutor

    1、构造方法

    构造方法中有4个方法,本质上都是调用的下面这个构造方法:

    public ThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
     if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
      throw new IllegalArgumentException();
     if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
     this.acc = System.getSecurityManager() == null ?
       null :
       AccessController.getContext();
     //线程池的核心线程数目
     this.corePoolSize = corePoolSize;
     //线程池的最大线程数目
     this.maximumPoolSize = maximumPoolSize;
     //阻塞的队列(存储的是待运行的线程)
     this.workQueue = workQueue;
     //线程空闲等待时间
     this.keepAliveTime = unit.toNanos(keepAliveTime);
     //线程工厂(主要作用是创建线程),一般是默认
     this.threadFactory = threadFactory;
     //工作队列满了时候的饱和策略
     this.handler = handler;
    }
    复制代码

    2、饱和策略

    上面的构造方法中,我们着重需要注意的是饱和策略,线程池中定义了四种饱和策略:

    1、CallerRunsPolicy

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
     public CallerRunsPolicy() { }
     //使用主线程执行新任务
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
       //此方法相同于同步方法
       r.run();
      }
     }
    }
    复制代码

    2、 AbortPolicy(线程池默认的策略)

    public static class AbortPolicy implements RejectedExecutionHandler { 
     public AbortPolicy() { }

     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      //抛出 RejectedExecutionException来拒绝新任务的处理
      throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
     }
    }
    复制代码

    3、DiscardPolicy

    public static class DiscardPolicy implements RejectedExecutionHandler {
     public DiscardPolicy() { }
     //不执行任何操作,丢弃新任务
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     }
    }
    复制代码

    4、DiscardOldestPolicy

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
     public DiscardOldestPolicy() { }
     //此策略将丢弃最早的未处理的任务
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
       e.getQueue().poll();
       e.execute(r);
      }
     }
    }
    复制代码

    3、阻塞队列

    咱们看下ThreadPoolExecutor的源码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>());
    }
    复制代码

    使用的是LinkedBlockingQueue作为阻塞队列,LinkedBlockingQueue的默认构造函数允许的队列长度是Integer.MAX_VALUE,若堆积大量的请求,可能会造成OOM

    此处就是为什么《阿里巴巴 Java 开发手册》中不推荐使用Executors工具类创建线程池的原因,要求使用 ThreadPoolExecutor 构造函数的方式,让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

    4、execute方法

    下面是执行流程图:

    对照流程图,我们再来看源码:

    //ctl中存放的是int值,int值得高低位保存了线程池运行的状态和有效线程的数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static int workerCountOf(int c) {
     return c & CAPACITY;
    }
    //任务队列
    private final BlockingQueue<Runnable> workQueue;
    public void execute(Runnable command) {
     //如果任务为null,则抛出异常
     if (command == null)
      throw new NullPointerException();
     //获取线程池状态和有效线程数
     int c = ctl.get();
     //以下有3步:
     //步骤1:
     //如果线程池工作的线程小于核心线程数
     if (workerCountOf(c) < corePoolSize) { 
      //则增加一个线程,并把该任务交给它去执行
      if (addWorker(command, true))
       //成功则返回
       return;
      //这里说明创建核心线程失败,需要再次获取临时变量c
      c = ctl.get();
     }
     //步骤2:
     // 走到这里说明创建新的核心线程失败,也就是当前工作线程数大于等于corePoolSize
     // 线程池的运行状态是RUNNING,并且尝试将新任务加入到阻塞队列,成功返回true
     if (isRunning(c) && workQueue.offer(command)) {
      //进入到这里,是已经向任务队列投放任务成功
      //再次获取线程池状态和有效线程数
      int recheck = ctl.get();
      //如果线程池状态不是RUNNING(线程池异常终止了),将线程从工作队列中移除
      if (! isRunning(recheck) && remove(command))
       //执行饱和策略
       reject(command);
      // 走到这里说明线程池状态可能是RUNNING
      // 也可能是移除线程任务失败了(失败的最大的可能是已经执行完毕了)
      //因为所有存活的工作线程有可能在最后一次检查之后已经终结,所以需要二次检查线程池工作线程的状态
      //这里博主也是看了半天,大家好好体会下
      else if (workerCountOf(recheck) == 0)
       //若当前线程池工作线程数为0,则新建一个线程并执行
       addWorker(null, false);
     }
     //步骤3:
     // 如果任务队列已满,就需要创建非核心线程
     // 如果新建非核心线程失败,则执行饱和策略
     else if (!addWorker(command, false))
      reject(command);
    }
    复制代码

    上面的方法多次调用了addWorker方法,我们跟踪进去看下源码:

    // 添加工作线程,返回true则创建和启动工作线程成功;返回false则没有新创建工作线程
    private boolean addWorker(Runnable firstTask, boolean core) {
     retry:
     for (;;) {
      //获取线程池对应的int值
      int c = ctl.get();
      //获取线程池状态
      int rs = runStateOf(c);
      // Check if queue empty only if necessary.
      if (rs >= SHUTDOWN &&
       ! (rs == SHUTDOWN &&
          firstTask == null &&
          ! workQueue.isEmpty()))
       return false;
      for (;;) {
       //获取工作线程数
       int wc = workerCountOf(c);
       //工作线程数超过允许的“最大线程数”则返回false
       //core为true,“最大线程数”就是核心线程数,则表明创建核心线程数失败
       if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
       // 成功通过CAS更新工作线程数wc,则break到最外层的循环
       if (compareAndIncrementWorkerCount(c))
        break retry;
       c = ctl.get();  // Re-read ctl
       // 如果线程的状态改变了就跳到外层循环执行
       if (runStateOf(c) != rs)
        continue retry;
       //如果CAS更新工作线程数wc失败,则可能是并发更新导致的失败,继续在内层循环重试即可
       // else CAS failed due to workerCount change; retry inner loop
      }
     }
     // 标记工作线程是否启动成功
     boolean workerStarted = false;
     //标记工作线程是否创建成功
     boolean workerAdded = false;
     //工作线程
     Worker w = null;
     try {
      //创建一个工作线程
      w = new Worker(firstTask);
      final Thread t = w.thread;
      if (t != null) {
       final ReentrantLock mainLock = this.mainLock;
       //获取锁
       mainLock.lock();
       try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int rs = runStateOf(ctl.get());
        // 再次确认"线程池状态"
        if (rs < SHUTDOWN ||
         (rs == SHUTDOWN && firstTask == null)) {
         if (t.isAlive()) // precheck that t is startable
          throw new IllegalThreadStateException();
         //把创建的工作线程实例添加到工作线程集合
         workers.add(w);
         /更新当前工作线程的峰值容量largestPoolSize
         int s = workers.size();
         if (s > largestPoolSize)
          largestPoolSize = s;
         workerAdded = true;
        }
       } finally {
        //释放锁
        mainLock.unlock();
       }
       //如果加入线程池成功
       if (workerAdded) {
        //启动线程
        t.start();
        workerStarted = true;
       }
      }
     } finally {
      //如果线程启动失败,则需要从工作线程集合移除对应线程
      if (! workerStarted)
       addWorkerFailed(w);
     }
     return workerStarted;
    }
    复制代码

    5、shutdown方法

    线程池不用了,要关闭线程池,下面是源码:

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        // 获取锁
        mainLock.lock();
        try {
            //校验是否有权限。
            checkShutdownAccess();
            //设置SHUTDOWN状态。
            advanceRunState(SHUTDOWN);
            //中断线程池中所有空闲线程。
            interruptIdleWorkers();
            //钩子函数
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            //释放锁
            mainLock.unlock();
        }
        //尝试终止线程池
        tryTerminate();
    }
    复制代码

    结束语

    本篇详细的分析了ThreadPoolExecutor的execute方法,耗费了不少时间。如果本文对你哪怕是有一点点的帮助,就值了。

  • 相关阅读:
    Linux mail 命令使用
    django+nginx+xshell简易日志查询,接上<关于《rsyslog+mysql+loganalyzer搭建日志服务器<个人笔记>》的反思>
    django admin后台提示没有static样式相关的文件
    nginx+uwsgi<django web环境的搭建>
    CentOS 6.5升级Python和安装IPython
    关于《rsyslog+mysql+loganalyzer搭建日志服务器<个人笔记>》的反思
    记一次创建LVM的日志记录
    django TEMPLATES
    Only the sqlmigrate and sqlflush commands can be used when an app has migrations.
    rsyslog+mysql+loganalyzer搭建日志服务器<个人笔记>
  • 原文地址:https://www.cnblogs.com/itlaoge/p/14219652.html
Copyright © 2011-2022 走看看