zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor详解

      ThreadPoolExecutor是JDK并发包提供的一个线程池服务,基于它可以很容易将一个Runnable接口的任务放入线程池中。ThreadPoolExecutor的构建参数:

    1. public ThreadPoolExecutor(int corePoolSize,  
    2.                           int maximumPoolSize,  
    3.                           long keepAliveTime,  
    4.                           TimeUnit unit,  
    5.                           BlockingQueue<Runnable> workQueue,  
    6.                           RejectedExecutionHandler handler) {  
    7.     this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
    8.          Executors.defaultThreadFactory(), handler);  
    9. }  

    1. 参数解释

    • corePoolSize:核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量
    • maximumPoolSize: 线程池维护线程的最大数量
    • keepAliveTime:线程池维护线程所允许的空闲时间,当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
    • unit: 线程池维护线程所允许的空闲时间的单位、可选参数值为:TimeUnit中的几个静态属性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
    • workQueue: 线程池所使用的缓冲队列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
    • handler: 线程池中的数量大于maximumPoolSize,对拒绝任务的处理策略,默认值ThreadPoolExecutor.AbortPolicy()。:

    2. execute方法JDK 实现

    1. public void execute(Runnable command) {  
    2.     if (command == null)  
    3.         throw new NullPointerException();  
    4.     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
    5.         if (runState == RUNNING && workQueue.offer(command)) {  
    6.             if (runState != RUNNING || poolSize == 0)  
    7.                 ensureQueuedTaskHandled(command);  
    8.         }  
    9.         else if (!addIfUnderMaximumPoolSize(command))  
    10.             reject(command); // is shutdown or saturated  
    11.     }  
    12. }  

      一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个Runnable类型的对象,任务的执行方法就是run()方法,如果传入的为null,侧抛出NullPointerException。如果当前线程数小于corePoolSize,调用addIfUnderCorePoolSize方法,addIfUnderCorePoolSize方法首先调用mainLock加锁,再次判断当前线程数小于corePoolSize并且线程池处于RUNNING状态,则调用addThread增加线程

    addIfUnderCorePoolSize方法实现:

    1. private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
    2.     Thread t = null;  
    3.     final ReentrantLock mainLock = this.mainLock;  
    4.     mainLock.lock();  
    5.     try {  
    6.         if (poolSize < corePoolSize && runState == RUNNING)  
    7.             t = addThread(firstTask);  
    8.     } finally {  
    9.         mainLock.unlock();  
    10.     }  
    11.     if (t == null)  
    12.         return false;  
    13.     t.start();  
    14.     return true;  
    15. }  

    addThread方法首先创建Work对象,然后调用threadFactory创建新的线程,如果创建的线程不为null,将Work对象的thread属性

    设置为此创建出来的线程,并将此Work对象放入workers中,然后在增加当前线程池的中线程数,增加后回到

    addIfUnderCorePoolSize方法,释放mainLock,最后启动这个新创建的线程来执行新传入的任务。

    addThread方法实现:

    1.     private Thread addThread(Runnable firstTask) {  
    2.         Worker w = new Worker(firstTask);  
    3.         Thread t = threadFactory.newThread(w);<span style="color:#ff0000;"></span>  
    4.         if (t != null) {  
    5.             w.thread = t;  
    6.             workers.add(w);  
    7.             int nt = ++poolSize;  
    8.             if (nt > largestPoolSize)  
    9.                 largestPoolSize = nt;  
    10.         }  
    11.         return t;  
    12.     }  

      ThreadFactory 接口默认实现DefaultThreadFactory

    1. public Thread newThread(Runnable r) {  
    2.     Thread t = new Thread(group, r,  
    3.                           namePrefix + threadNumber.getAndIncrement(),  
    4.                           0);  
    5.     if (t.isDaemon())  
    6.         t.setDaemon(false);  
    7.     if (t.getPriority() != Thread.NORM_PRIORITY)  
    8.         t.setPriority(Thread.NORM_PRIORITY);  
    9.     return t;  
    10. }  

      从addThread方法看得出,Worker对象包装了参数传入的任务,threadFactory新创建的线程包装了Worker对象,在执行新创建线程的run方法时,调用到了Worker对象的run方法.

      Worker的run方法

    1. public void run() {  
    2.     try {  
    3.         Runnable task = firstTask;  
    4.         firstTask = null;  
    5.         while (task != null || (task = getTask()) != null) {  
    6.             runTask(task);  
    7.             task = null;  
    8.         }  
    9.     } finally {  
    10.         workerDone(this);  
    11.     }  
    12. }  

      从以上方法可以看出,Worker所在的线程启动后,首先执行创建其时传入的Runnable任务,执行完成后,循环调用getTask来获取新的任务,在没有任务的情况下,退出此线程。

      getTask方法实现:

    1. Runnable getTask() {  
    2.     for (;;) {  
    3.         try {  
    4.             int state = runState;  
    5.             if (state > SHUTDOWN)  
    6.                 return null;  
    7.             Runnable r;  
    8.             if (state == SHUTDOWN)  // Help drain queue  
    9.                 r = workQueue.poll();  
    10.             else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
    11.                 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
    12.             else  
    13.                 r = workQueue.take();  
    14.             if (r != null)  
    15.                 return r;  
    16.             if (workerCanExit()) {  
    17.                 if (runState >= SHUTDOWN) // Wake up others  
    18.                     interruptIdleWorkers();  
    19.                 return null;  
    20.             }  
    21.             // Else retry  
    22.         } catch (InterruptedException ie) {  
    23.             // On interruption, re-check runState  
    24.         }  
    25.     }  
    26. }  

      getTask就是通过WorkQueue的poll或task方法来获取下一个要执行的任务。

      回到execute方法 ,execute 方法部分实现:

    1. if (runState == RUNNING && workQueue.offer(command)) {  
    2.             if (runState != RUNNING || poolSize == 0)  
    3.                    ensureQueuedTaskHandled(command);  
    4.            }  
    5.            else if (!addIfUnderMaximumPoolSize(command))  
    6.                reject(command); // is shutdown or saturated  


      如果当前线程池数量大于corePoolSize或addIfUnderCorePoolSize方法执行失败,则执行后续操作;如果线程池处于运 行状态并且workQueue中成功加入任务,再次判断如果线程池的状态不为运行状态或当前线程池数为0,则调用 ensureQueuedTaskHandled方法

    ensureQueuedTaskHandled方法实现:

    1. private void ensureQueuedTaskHandled(Runnable command) {  
    2.     final ReentrantLock mainLock = this.mainLock;  
    3.     mainLock.lock();  
    4.     boolean reject = false;  
    5.     Thread t = null;  
    6.     try {  
    7.         int state = runState;  
    8.         if (state != RUNNING && workQueue.remove(command))  
    9.             reject = true;  
    10.         else if (state < STOP &&  
    11.                  poolSize < Math.max(corePoolSize, 1) &&  
    12.                  !workQueue.isEmpty())  
    13.             t = addThread(null);  
    14.     } finally {  
    15.         mainLock.unlock();  
    16.     }  
    17.     if (reject)  
    18.         reject(command);  
    19.     else if (t != null)  
    20.         t.start();  
    21. }  

      ensureQueuedTaskHandled方法判断线程池运行,如果状态不为运行状态,从workQueue中删除, 并调用reject做拒绝处理。

    reject方法实现:

    1. void reject(Runnable command) {  
    2.     handler.rejectedExecution(command, this);  
    3. }  

    再次回到execute方法,

    1. if (runState == RUNNING && workQueue.offer(command)) {  
    2.                if (runState != RUNNING || poolSize == 0)  
    3.                    ensureQueuedTaskHandled(command);  
    4.            }  
    5.            else if (!addIfUnderMaximumPoolSize(command))  
    6.                reject(command); // is shutdown or saturated  

    如线程池workQueue offer失败或不处于运行状态,调用addIfUnderMaximumPoolSize,addIfUnderMaximumPoolSize方法

    基本和addIfUnderCorePoolSize实现类似,不同点在于根据最大线程数(maximumPoolSize)进行比较,如果超过最大线程数,

    返回false,调用reject方法,下面是addIfUnderMaximumPoolSize方法实现:

    1. private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {  
    2.        Thread t = null;  
    3.        final ReentrantLock mainLock = this.mainLock;  
    4.        mainLock.lock();  
    5.        try {  
    6.            if (poolSize < maximumPoolSize && runState == RUNNING)  
    7.                t = addThread(firstTask);  
    8.        } finally {  
    9.            mainLock.unlock();  
    10.        }  
    11.        if (t == null)  
    12.            return false;  
    13.        t.start();  
    14.        return true;  
    15.    }  

    3. 添加任务处理流程   当一个任务通过execute(Runnable)方法欲添加到线程池时:   如果当前线程池中的数量小于corePoolSize,并线程池处于Running状态,创建并添加的任务。   如果当前线程池中的数量等于corePoolSize,并线程池处于Running状态,缓冲队列 workQueue未满,

    那么任务被放入缓冲队列、等待任务调度执行。   如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于

    maximumPoolSize,新提交任务会创建新线程执行任务。如果当前线程池中的数量大于corePoolSize,

    缓冲队列workQueue已满,并且线程池中的数量等maximumPoolSize,新提交任务由Handler处理。

    当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。

    4. RejectedExecutionHandler  默认有四个选择:

      ThreadPoolExecutor.AbortPolicy():当线程池中的数量等于最大线程数时、直接抛出抛出java.util.concurrent.RejectedExecutionException异常

    1. public static class AbortPolicy implements RejectedExecutionHandler {  
    2.     /** 
    3.      * Creates an {@code AbortPolicy}. 
    4.      */  
    5.     public AbortPolicy() { }  
    6.   
    7.     /** 
    8.      * Always throws RejectedExecutionException.
    9.      * @param r the runnable task requested to be executed 
    10.      * @param e the executor attempting to execute this task 
    11.      * @throws RejectedExecutionException always. 
    12.      */  
    13.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    14.         throw new RejectedExecutionException("Task " + r.toString() +  
    15.                                              " rejected from " +  
    16.                                              e.toString());  
    17.     }  
    18. }  

      ThreadPoolExecutor.CallerRunsPolicy() :当线程池中的数量等于最大线程数时、重试执行当前的任务,交由调用者线程来执行任务

    1. public static class CallerRunsPolicy implements RejectedExecutionHandler {  
    2.      /** 
    3.       * Creates a {@code CallerRunsPolicy}. 
    4.       */  
    5.      public CallerRunsPolicy() { }  
    6.   
    7.      /** 
    8.       * Executes task r in the caller's thread, unless the executor 
    9.       * has been shut down, in which case the task is discarded.
    10.       * @param r the runnable task requested to be executed 
    11.       * @param e the executor attempting to execute this task 
    12.       */  
    13.      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    14.          if (!e.isShutdown()) {  
    15.              r.run();  
    16.          }  
    17.      }  
    18.  }  

      ThreadPoolExecutor.DiscardOldestPolicy() :当线程池中的数量等于最大线程数时、抛弃线程池中最后一个要执行的任务,并执行新传入的任务

    1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {  
    2.       /** 
    3.        * Creates a {@code DiscardOldestPolicy} for the given executor. 
    4.        */  
    5.       public DiscardOldestPolicy() { }  
    6.   
    7.       /** 
    8.        * Obtains and ignores the next task that the executor 
    9.        * would otherwise execute, if one is immediately available, 
    10.        * and then retries execution of task r, unless the executor 
    11.        * is shut down, in which case task r is instead discarded. 
    12.        * @param r the runnable task requested to be executed 
    13.        * @param e the executor attempting to execute this task 
    14.        */  
    15.       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    16.           if (!e.isShutdown()) {  
    17.               e.getQueue().poll();  
    18.               e.execute(r);  
    19.           }  
    20.       }  
    21.   }  

    ThreadPoolExecutor.DiscardPolicy() :当线程池中的数量等于最大线程数时,不做任何动作

      1. public static class DiscardPolicy implements RejectedExecutionHandler {  
      2.     /** 
      3.      * Creates a {@code DiscardPolicy}. 
      4.      */  
      5.     public DiscardPolicy() { }  
      6.   
      7.     /** 
      8.      * Does nothing, which has the effect of discarding task r. 
      9.      * 
      10.      * @param r the runnable task requested to be executed 
      11.      * @param e the executor attempting to execute this task 
      12.      */  
      13.     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
      14.     }  
  • 相关阅读:
    Atitit.播放系统规划新版本 v4 q18 and 最近版本回顾
    Atitit.播放系统规划新版本 v4 q18 and 最近版本回顾
    atitit.极光消息推送服务器端开发实现推送  jpush v3. 总结o7p
    atitit.极光消息推送服务器端开发实现推送  jpush v3. 总结o7p
    Atitit.文件搜索工具 attilax 总结
    Atitit.文件搜索工具 attilax 总结
    Atitit.软件命名空间  包的命名统计 及命名表(2000个名称) 方案java package
    Atitit.软件命名空间  包的命名统计 及命名表(2000个名称) 方案java package
    Atitit..状态机与词法分析  通用分词器 分词引擎的设计与实现 attilax总结
    Atitit..状态机与词法分析  通用分词器 分词引擎的设计与实现 attilax总结
  • 原文地址:https://www.cnblogs.com/wxgblogs/p/5458905.html
Copyright © 2011-2022 走看看