zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor详解

      ThreadPoolExecutor是JDK并发包提供的一个线程池服务,基于它可以很容易将一个Runnable接口的任务放入线程池中。ThreadPoolExecutor的构建参数:

    1. public ThreadPoolExecutor(int corePoolSize,  
    2.                           int maximumPoolSize,  
    3.                           long keepAliveTime,  
    4.                           TimeUnit unit,  
    5.                           BlockingQueue<Runnable> workQueue,  
    6.                           RejectedExecutionHandler handler) {  
    7.     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
    8.          Executors.defaultThreadFactory(), handler);  
    9. }  

    1. 参数解释

    • corePoolSize:核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量
    • maximumPoolSize: 线程池维护线程的最大数量
    • keepAliveTime:线程池维护线程所允许的空闲时间,当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
    • unit: 线程池维护线程所允许的空闲时间的单位、可选参数值为:TimeUnit中的几个静态属性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
    • workQueue: 线程池所使用的缓冲队列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
    • handler: 线程池中的数量大于maximumPoolSize,对拒绝任务的处理策略,默认值ThreadPoolExecutor.AbortPolicy()。:

    2. execute方法JDK 实现

    1. public void execute(Runnable command) {  
    2.     if (command == null)  
    3.         throw new NullPointerException();  
    4.     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
    5.         if (runState == RUNNING && workQueue.offer(command)) {  
    6.             if (runState != RUNNING || poolSize == 0)  
    7.                 ensureQueuedTaskHandled(command);  
    8.         }  
    9.         else if (!addIfUnderMaximumPoolSize(command))  
    10.             reject(command); // is shutdown or saturated  
    11.     }  
    12. }  

      一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个Runnable类型的对象,任务的执行方法就是run()方法,如果传入的为null,侧抛出NullPointerException。如果当前线程数小于corePoolSize,调用addIfUnderCorePoolSize方法,addIfUnderCorePoolSize方法首先调用mainLock加锁,再次判断当前线程数小于corePoolSize并且线程池处于RUNNING状态,则调用addThread增加线程

    addIfUnderCorePoolSize方法实现:

    1. private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
    2.     Thread t = null;  
    3.     final ReentrantLock mainLock = this.mainLock;  
    4.     mainLock.lock();  
    5.     try {  
    6.         if (poolSize < corePoolSize && runState == RUNNING)  
    7.             t = addThread(firstTask);  
    8.     } finally {  
    9.         mainLock.unlock();  
    10.     }  
    11.     if (t == null)  
    12.         return false;  
    13.     t.start();  
    14.     return true;  
    15. }  

    addThread方法首先创建Work对象,然后调用threadFactory创建新的线程,如果创建的线程不为null,将Work对象的thread属性

    设置为此创建出来的线程,并将此Work对象放入workers中,然后在增加当前线程池的中线程数,增加后回到

    addIfUnderCorePoolSize方法,释放mainLock,最后启动这个新创建的线程来执行新传入的任务。

    addThread方法实现:

    1.     private Thread addThread(Runnable firstTask) {  
    2.         Worker w = new Worker(firstTask);  
    3.         Thread t = threadFactory.newThread(w);<span style="color:#ff0000;"></span>  
    4.         if (t != null) {  
    5.             w.thread = t;  
    6.             workers.add(w);  
    7.             int nt = ++poolSize;  
    8.             if (nt > largestPoolSize)  
    9.                 largestPoolSize = nt;  
    10.         }  
    11.         return t;  
    12.     }  

      ThreadFactory 接口默认实现DefaultThreadFactory

    1. public Thread newThread(Runnable r) {  
    2.     Thread t = new Thread(group, r,  
    3.                           namePrefix + threadNumber.getAndIncrement(),  
    4.                           0);  
    5.     if (t.isDaemon())  
    6.         t.setDaemon(false);  
    7.     if (t.getPriority() != Thread.NORM_PRIORITY)  
    8.         t.setPriority(Thread.NORM_PRIORITY);  
    9.     return t;  
    10. }  

      从addThread方法看得出,Worker对象包装了参数传入的任务,threadFactory新创建的线程包装了Worker对象,在执行新创建线程的run方法时,调用到了Worker对象的run方法.

      Worker的run方法

    1. public void run() {  
    2.     try {  
    3.         Runnable task = firstTask;  
    4.         firstTask = null;  
    5.         while (task != null || (task = getTask()) != null) {  
    6.             runTask(task);  
    7.             task = null;  
    8.         }  
    9.     } finally {  
    10.         workerDone(this);  
    11.     }  
    12. }  

      从以上方法可以看出,Worker所在的线程启动后,首先执行创建其时传入的Runnable任务,执行完成后,循环调用getTask来获取新的任务,在没有任务的情况下,退出此线程。

      getTask方法实现:

    1. Runnable getTask() {  
    2.     for (;;) {  
    3.         try {  
    4.             int state = runState;  
    5.             if (state > SHUTDOWN)  
    6.                 return null;  
    7.             Runnable r;  
    8.             if (state == SHUTDOWN)  // Help drain queue  
    9.                 r = workQueue.poll();  
    10.             else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
    11.                 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
    12.             else  
    13.                 r = workQueue.take();  
    14.             if (r != null)  
    15.                 return r;  
    16.             if (workerCanExit()) {  
    17.                 if (runState >= SHUTDOWN) // Wake up others  
    18.                     interruptIdleWorkers();  
    19.                 return null;  
    20.             }  
    21.             // Else retry  
    22.         } catch (InterruptedException ie) {  
    23.             // On interruption, re-check runState  
    24.         }  
    25.     }  
    26. }  

      getTask就是通过WorkQueue的poll或task方法来获取下一个要执行的任务。

      回到execute方法 ,execute 方法部分实现:

    1. if (runState == RUNNING && workQueue.offer(command)) {  
    2.             if (runState != RUNNING || poolSize == 0)  
    3.                    ensureQueuedTaskHandled(command);  
    4.            }  
    5.            else if (!addIfUnderMaximumPoolSize(command))  
    6.                reject(command); // is shutdown or saturated  


      如果当前线程池数量大于corePoolSize或addIfUnderCorePoolSize方法执行失败,则执行后续操作;如果线程池处于运 行状态并且workQueue中成功加入任务,再次判断如果线程池的状态不为运行状态或当前线程池数为0,则调用 ensureQueuedTaskHandled方法

    ensureQueuedTaskHandled方法实现:

    1. private void ensureQueuedTaskHandled(Runnable command) {  
    2.     final ReentrantLock mainLock = this.mainLock;  
    3.     mainLock.lock();  
    4.     boolean reject = false;  
    5.     Thread t = null;  
    6.     try {  
    7.         int state = runState;  
    8.         if (state != RUNNING && workQueue.remove(command))  
    9.             reject = true;  
    10.         else if (state < STOP &&  
    11.                  poolSize < Math.max(corePoolSize, 1) &&  
    12.                  !workQueue.isEmpty())  
    13.             t = addThread(null);  
    14.     } finally {  
    15.         mainLock.unlock();  
    16.     }  
    17.     if (reject)  
    18.         reject(command);  
    19.     else if (t != null)  
    20.         t.start();  
    21. }  

      ensureQueuedTaskHandled方法判断线程池运行,如果状态不为运行状态,从workQueue中删除, 并调用reject做拒绝处理。

    reject方法实现:

    1. void reject(Runnable command) {  
    2.     handler.rejectedExecution(command, this);  
    3. }  

    再次回到execute方法,

    1. if (runState == RUNNING && workQueue.offer(command)) {  
    2.                if (runState != RUNNING || poolSize == 0)  
    3.                    ensureQueuedTaskHandled(command);  
    4.            }  
    5.            else if (!addIfUnderMaximumPoolSize(command))  
    6.                reject(command); // is shutdown or saturated  

    如线程池workQueue offer失败或不处于运行状态,调用addIfUnderMaximumPoolSize,addIfUnderMaximumPoolSize方法

    基本和addIfUnderCorePoolSize实现类似,不同点在于根据最大线程数(maximumPoolSize)进行比较,如果超过最大线程数,

    返回false,调用reject方法,下面是addIfUnderMaximumPoolSize方法实现:

    1. private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {  
    2.        Thread t = null;  
    3.        final ReentrantLock mainLock = this.mainLock;  
    4.        mainLock.lock();  
    5.        try {  
    6.            if (poolSize < maximumPoolSize && runState == RUNNING)  
    7.                t = addThread(firstTask);  
    8.        } finally {  
    9.            mainLock.unlock();  
    10.        }  
    11.        if (t == null)  
    12.            return false;  
    13.        t.start();  
    14.        return true;  
    15.    }  

    3. 添加任务处理流程   当一个任务通过execute(Runnable)方法欲添加到线程池时:   如果当前线程池中的数量小于corePoolSize,并线程池处于Running状态,创建并添加的任务。   如果当前线程池中的数量等于corePoolSize,并线程池处于Running状态,缓冲队列 workQueue未满,

    那么任务被放入缓冲队列、等待任务调度执行。   如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于

    maximumPoolSize,新提交任务会创建新线程执行任务。如果当前线程池中的数量大于corePoolSize,

    缓冲队列workQueue已满,并且线程池中的数量等maximumPoolSize,新提交任务由Handler处理。

    当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。

    4. RejectedExecutionHandler  默认有四个选择:

      ThreadPoolExecutor.AbortPolicy():当线程池中的数量等于最大线程数时、直接抛出抛出java.util.concurrent.RejectedExecutionException异常

    1. public static class AbortPolicy implements RejectedExecutionHandler {  
    2.     /** 
    3.      * Creates an {@code AbortPolicy}. 
    4.      */  
    5.     public AbortPolicy() { }  
    6.   
    7.     /** 
    8.      * Always throws RejectedExecutionException.
    9.      * @param r the runnable task requested to be executed 
    10.      * @param e the executor attempting to execute this task 
    11.      * @throws RejectedExecutionException always. 
    12.      */  
    13.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    14.         throw new RejectedExecutionException("Task " + r.toString() +  
    15.                                              " rejected from " +  
    16.                                              e.toString());  
    17.     }  
    18. }  

      ThreadPoolExecutor.CallerRunsPolicy() :当线程池中的数量等于最大线程数时、重试执行当前的任务,交由调用者线程来执行任务

    1. public static class CallerRunsPolicy implements RejectedExecutionHandler {  
    2.      /** 
    3.       * Creates a {@code CallerRunsPolicy}. 
    4.       */  
    5.      public CallerRunsPolicy() { }  
    6.   
    7.      /** 
    8.       * Executes task r in the caller's thread, unless the executor 
    9.       * has been shut down, in which case the task is discarded.
    10.       * @param r the runnable task requested to be executed 
    11.       * @param e the executor attempting to execute this task 
    12.       */  
    13.      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    14.          if (!e.isShutdown()) {  
    15.              r.run();  
    16.          }  
    17.      }  
    18.  }  

      ThreadPoolExecutor.DiscardOldestPolicy() :当线程池中的数量等于最大线程数时、抛弃线程池中最后一个要执行的任务,并执行新传入的任务

    1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {  
    2.       /** 
    3.        * Creates a {@code DiscardOldestPolicy} for the given executor. 
    4.        */  
    5.       public DiscardOldestPolicy() { }  
    6.   
    7.       /** 
    8.        * Obtains and ignores the next task that the executor 
    9.        * would otherwise execute, if one is immediately available, 
    10.        * and then retries execution of task r, unless the executor 
    11.        * is shut down, in which case task r is instead discarded. 
    12.        * @param r the runnable task requested to be executed 
    13.        * @param e the executor attempting to execute this task 
    14.        */  
    15.       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    16.           if (!e.isShutdown()) {  
    17.               e.getQueue().poll();  
    18.               e.execute(r);  
    19.           }  
    20.       }  
    21.   }  

    ThreadPoolExecutor.DiscardPolicy() :当线程池中的数量等于最大线程数时,不做任何动作

      1. public static class DiscardPolicy implements RejectedExecutionHandler {  
      2.     /** 
      3.      * Creates a {@code DiscardPolicy}. 
      4.      */  
      5.     public DiscardPolicy() { }  
      6.   
      7.     /** 
      8.      * Does nothing, which has the effect of discarding task r. 
      9.      * 
      10.      * @param r the runnable task requested to be executed 
      11.      * @param e the executor attempting to execute this task 
      12.      */  
      13.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
      14.     }  
  • 相关阅读:
    UVA 679 Dropping Balls 由小见大,分析思考 二叉树放小球,开关翻转,小球最终落下叶子编号。
    2017-5-14 湘潭市赛 Similar Subsequence 分析+四维dp+一些简单优化
    Problem #3263 丽娃河的狼人传说 区间满足灯数,r排序后贪心。
    2017-5-14 湘潭市赛 Longest Common Subsequence 想法题
    2017-5-14 湘潭市赛 Parentheses 转化思想+贪心 使括号序列合法的最小花费。满足前面左括号的数量>=有括号的数量。
    deque双端队列用法
    Shell字符串截取
    keepAlived发生故障切换VIP—邮件通知方案2
    KeepAlived发生故障切换VIP—邮件通知方案1
    缺少依赖 libmysqlclient.so.18(64bit)的解决办法
  • 原文地址:https://www.cnblogs.com/wxgblogs/p/5458905.html
Copyright © 2011-2022 走看看