zoukankan      html  css  js  c++  java
  • 线程池基础知识

    1、为什么要使用线程池,线程池用什么用

      创建线程和销毁线程的花销是比较大的,这些时间有可能比处理业务的时间还要长。这样频繁的创建线程和销毁线程,再加上业务工作线程,消耗系统资源的时间,可能导致系统资源不足。(我们可以把创建和销毁的线程的过程去掉)

    1. 降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗
    2. 提高响应速度:任务到达时不需要等待线程创建就可以立即执行
    3. 提高线程的可管理性:线程池可以统一管理、分配、调优和监控

    2、几种常见的线程池及使用场景

    JAVA通过Executors提供了四种线程池:

      单线程化线程池(newSingleThreadExecutor)

      可控最大并发数线程池(newFixedThreadPool)

      可回收缓存线程池(newCachedThreadPool)

      支持定时与周期性任务的线程池(newScheduledThreadPool)

    (1)newSingleThreadExecutor

    线程池特点:

    • 核心线程数和最大线程数大小一样且都是1
    • keepAliveTime为0
    • 阻塞队列是LinkedBlockingQueue

    该线程池的工作机制是:

    1. 线程池中没有线程时,新建一个线程执行任务
    2. 有一个线程以后,将任务加入阻塞队列,不停加加加
    3. 唯一的这一个线程不停地去队列里取任务执行

    适用场景:

    SingleThreadExecutor适用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    1 public static ExecutorService newSingleThreadExecutor() {
    2         return new FinalizableDelegatedExecutorService
    3             (new ThreadPoolExecutor(1, 1,
    4                                     0L, TimeUnit.MILLISECONDS,
    5                                     new LinkedBlockingQueue<Runnable>()));
    6     }

    单线程化线程池(newSingleThreadExecutor)的优点,串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    使用单线程化线程池(newSingleThreadExecutor)的一般方法下代码所示。

     1 import java.util.concurrent.ExecutorService;
     2 import java.util.concurrent.Executors;
     3 import java.util.concurrent.TimeUnit;
     4  
     5 public class ThreadPoolByNewSingleThreadExecutor {
     6  
     7     public static void main(String[] args) {
     8         /**
     9          * 单线程化的线程池
    10          */
    11         ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    12         for (int i = 0; i < 10; i++) {
    13             final int index = i;
    14             singleThreadExecutor.execute(new Runnable() {
    15                 @Override
    16                 public void run() {
    17                     Thread.currentThread().setName("Thread i = " + index);
    18                     System.out.println(Thread.currentThread().getName() + " index = " + index);
    19                     try {
    20                         Thread.sleep(500);
    21                     } catch (InterruptedException e) {
    22                         System.out.println("ssss");
    23                     }
    24                 }
    25             });
    26         }
    27         singleThreadExecutor.shutdown();
    28         System.out.println("on the main thread...");
    29         
    30     }
    31  
    32 }

    参考:https://blog.csdn.net/android2011_1/article/details/79629890

    (2)newFixedThreadPool

    线程池特点:

    • 核心线程数和最大线程数大小一样
    • keepAliveTime为0
    • 阻塞队列是LinkedBlockingQueue

    它是固定大小的线程池,其核心线程数和最大线程数大小一样。并且阻塞队列用的是LinkedBlockingQueue,也就是说线程最大数这个参数失效了基本,所以不会出现外包线程的存在,所以也可以认为keepAliveTime参数是一个摆设。除非allowCoreThreadTimeOut方法的调用。

    该线程池的工作机制是:

    1. 线程数少于核心线程数,也就是设置的线程数时,新建线程执行任务
    2. 线程数等于核心线程数后,将任务加入阻塞队列
      1. 由于队列容量非常大(Integer.MAX_VALUE),可以一直加加加。(当线程池中的任务比较特殊时,比如关于数据库的长时间的IO操作,可能导致OOM)
    3. 执行完任务的线程反复去队列中取任务执行

    适用场景:

    FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程即可。一般Ncpu+1

    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    1  public static ExecutorService newFixedThreadPool(int nThreads) {
    2         return new ThreadPoolExecutor(nThreads, nThreads,
    3                                       0L, TimeUnit.MILLISECONDS,
    4                                       new LinkedBlockingQueue<Runnable>());
    5     }

    刚刚介绍了单线程化线程池newSingleThreadExecutor,可控最大并发数线程池(newFixedThreadPool)与其最大的区别是可以通知执行多个线程,可以简单的将newSingleThreadExecutor理解为newFixedThreadPool(1)。例如运行一下两个程序:
    单线程化线程池(newSingleThreadExecutor)示例:

     1 import java.util.concurrent.ExecutorService;
     2 import java.util.concurrent.Executors;
     3  
     4 public class ThreadPoolByNewSingleThreadExecutor {
     5  
     6     public static void main(String[] args) {
     7         /**
     8          * 单线程化的线程池
     9          */
    10         ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    11         for (int i = 0; i < 10; i++) {
    12             final int index = i;
    13             singleThreadExecutor.execute(new Runnable() {
    14                 @Override
    15                 public void run() {
    16                     Thread.currentThread().setName("Thread i = " + index);
    17                     System.out.println(Thread.currentThread().getName() + " index = " + index);
    18                     try {
    19                         Thread.sleep(500);
    20                     } catch (InterruptedException e) {
    21                         System.out.println("exception");
    22                     }
    23                 }
    24             });
    25         }
    26         singleThreadExecutor.shutdown();
    27         System.out.println("on the main thread...");
    28         
    29     }
    30  
    31 }

    可控最大并发数线程池(newFixedThreadPool)示例:

     1 import java.util.concurrent.ExecutorService;
     2 import java.util.concurrent.Executors;
     3  
     4 public class ThreadPoolByNewFixedThreadPool {
     5     public static void main(String[] args) {
     6         
     7         ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
     8         for (int i = 0; i < 10; i++) {
     9             final int index = i;
    10             newFixedThreadPool.execute(new Runnable() {
    11                 @Override
    12                 public void run() {
    13                     Thread.currentThread().setName("Thread i = " + index);
    14                     System.out.println(Thread.currentThread().getName() + " index = " + index);
    15                     try {
    16                         Thread.sleep(500);
    17                     } catch (InterruptedException e) {
    18                         System.out.println("exception");
    19                     }
    20                 }
    21             });
    22         }
    23         newFixedThreadPool.shutdown();
    24         System.out.println("on the main thread...");
    25     }
    26  
    27 }

    结果从显示上看虽然很相似,但是观察到的执行效果确实完全不一致的,newSingleThreadPool中,只有一个线程,每次输出一行后暂停0.5秒,newFixedThreadPool(3)中可以创建3个线程,一次输出3行后暂停0.5秒(当然是这三个线程都暂停0.5秒)。

    动画对比如下所示:

     

    (3)newCachedThreadPool

    线程池特点:

    • 核心线程数为0,且最大线程数为Integer.MAX_VALUE
    • 阻塞队列是SynchronousQueue

    SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue

    锁当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

    该线程池的工作机制是:

    1. 没有核心线程,直接向SynchronousQueue中提交任务
    2. 如果有空闲线程,就去取出任务执行;如果没有空闲线程,就新建一个
    3. 执行完任务的线程有60秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就拜拜

    适用场景:

    CachedThreadPool 用于并发执行大量短期的小任务。

     创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

    1 public static ExecutorService newCachedThreadPool() {
    2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3                                       60L, TimeUnit.SECONDS,
    4                                       new SynchronousQueue<Runnable>());
    5     }

      以上介绍了单线程化线程池(newSingleThreadExecutor)、可控最大并发数线程池(newFixedThreadPool)。下面介绍的是第三种newCachedThreadPool——可回收缓存线程池。

      在JAVA文档中是这样介绍可回收缓存线程池的:创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟(默认)未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。(可以通过java.util.concurrent.ThreadPoolExecutor类的构造方法构造更加具体的类,例如指定时间参数)

            创建线程本身需要很多资源,包括内存,记录线程状态,以及控制阻塞等等。因此,相比另外两种线程池,在需要频繁创建短期异步线程的场景下,newCachedThreadPool能够复用已完成而未关闭的线程来提高程序性能。

    newCachedThreadPool使用方法与其他两种线程池使用方法类似,基本使用代码如下所示:

     1 import java.util.concurrent.ExecutorService;
     2 import java.util.concurrent.Executors;
     3  
     4 public class NewCachedThreadPoolExecutor {
     5  
     6     public static void main(String[] args) {
     7         ExecutorService cachedThreadExecutor = Executors.newCachedThreadPool();
     8         for (int i = 0; i < 10; i++) {
     9             final int index = i;
    10             try {
    11                 Thread.sleep(1000);
    12             } catch (InterruptedException e1) {
    13                 e1.printStackTrace();
    14             }
    15             cachedThreadExecutor.execute(new Runnable() {
    16                 @Override
    17                 public void run() {
    18                     System.out.println(Thread.currentThread().getName() + ": index = " + index);
    19                 }
    20             });
    21         }
    22         cachedThreadExecutor.shutdown();
    23         
    24     }
    25  
    26 }

    执行结果如下图,会发现线程的名字都一样,因为在执行后续任务的时候,上一个任务已经完成,会复用上一个任务的线程资源来执行。

     

    (4)newScheduledThreadPool

    线程池特点:

    • 最大线程数为Integer.MAX_VALUE
    • 阻塞队列是DelayedWorkQueue

    ScheduledThreadPoolExecutor 添加任务提供了另外两个方法:

    • scheduleAtFixedRate() :按某种速率周期执行
    • scheduleWithFixedDelay():在某个延迟后执行

    两种方法的内部实现都是创建了一个ScheduledFutureTask对象封装了任务的延迟执行时间及执行周期,并调用decorateTask()方法转成RunnableScheduledFuture对象,然后添加到延迟队列中。

    DelayQueue:中封装了一个优先级队列,这个队列会对队列中的ScheduledFutureTask 进行排序,两个任务的执行 time 不同时,time 小的先执行;否则比较添加到队列中的ScheduledFutureTask的顺序号 sequenceNumber ,先提交的先执行。

    该线程池的工作机制是:

    1. 调用上面两个方法添加一个任务
    2. 线程池中的线程从 DelayQueue 中取任务
    3. 然后执行任务

    具体执行步骤:

    1. 线程从 DelayQueue 中获取 time 大于等于当前时间的 ScheduledFutureTask
      1. DelayQueue.take()
    2. 执行完后修改这个 task 的 time 为下次被执行的时间
    3. 然后再把这个 task 放回队列中
      1. DelayQueue.add()

    适用场景:

    ScheduledThreadPoolExecutor用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。

    创建一个定长线程池,支持定时及周期性任务执行。

    1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    2         return new ScheduledThreadPoolExecutor(corePoolSize);
    3     }

    newScheduledThreadPool用于构造安排线程池,能够根据需要安排在给定延迟后运行命令或者定期地执行。

    在JAVA文档的介绍:

    1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
    2. 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
    3. 参数:corePoolSize - 池中所保存的线程数,即使线程是空闲的也包括在内。
    4. 返回:新创建的安排线程池

    需要注意的是,参数corePoolSize在这个方法中是没用意义的,详解见JAVA进阶----ThreadPoolExecutor机制

    具体实现:

     1 import java.util.concurrent.Executors;
     2 import java.util.concurrent.ScheduledExecutorService;
     3 import java.util.concurrent.TimeUnit;
     4  
     5 public class ThreadPoolByNewScheduledThreadPool {
     6  
     7     public static void main(String[] args) {
     8         
     9         Thread thread = new Thread(new Runnable() {
    10             @Override
    11             public void run() {
    12                 System.out.println(Thread.currentThread().getName() + " : 延迟3秒");
    13             }
    14         });
    15         
    16         /**
    17          * 定长线程池,支持定时及周期性任务执行
    18          */
    19         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    20         
    21         //延迟3s后运行
    22         scheduledThreadPool.schedule(thread, 3, TimeUnit.SECONDS);
    23         
    24         //首次执行延迟1s,每次间隔3秒
    25         //scheduledThreadPool.scheduleAtFixedRate(thread, 1, 3, TimeUnit.SECONDS);
    26         
    27         //每次执行结束,已固定时延开启下次执行
    28         //scheduledThreadPool.scheduleWithFixedDelay(thread, 1, 3, TimeUnit.SECONDS);
    29         
    30         System.out.println(Thread.currentThread().getName() + " : main thread");
    31         scheduledThreadPool.shutdown();
    32 //        try {
    33 //            Thread.sleep(12000);
    34 //        } catch (InterruptedException e) {
    35 //            e.printStackTrace();
    36 //        }
    37 //        scheduledThreadPool.shutdownNow();
    38     }
    39  
    40 }

    后面注释掉的内容表示强制程序最大执行实际为12s,这通常是不切实际的,常常会需要在线程中设置标志位或标记系统时间来获取程序的终止时间。这就涉及到获取线程返回值的问题。将在后续文章中进行介绍。

    线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:
    1)newFixedThreadPool和newSingleThreadExecutor:
      主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
    2)newCachedThreadPool和newScheduledThreadPool:
      主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

    3、线程池都有哪几种工作队列

    (1)ArrayBlockingQueue
    是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    (2)LinkedBlockingQueue
    一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
    (3)SynchronousQueue
    一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    (4)PriorityBlockingQueue
    一个具有优先级的无限阻塞队列

    4、线程池中的几种重要的参数及流程说明

     1 public ThreadPoolExecutor(int corePoolSize,  
     2                               int maximumPoolSize,  
     3                               long keepAliveTime,  
     4                               TimeUnit unit,  
     5                               BlockingQueue<Runnable> workQueue,  
     6                               ThreadFactory threadFactory,  
     7                               RejectedExecutionHandler handler) {  
     8         if (corePoolSize < 0 ||  
     9             maximumPoolSize <= 0 ||  
    10             maximumPoolSize < corePoolSize ||  
    11             keepAliveTime < 0)  
    12             throw new IllegalArgumentException();  
    13         if (workQueue == null || threadFactory == null || handler == null)  
    14             throw new NullPointerException();  
    15         this.corePoolSize = corePoolSize;  
    16         this.maximumPoolSize = maximumPoolSize;  
    17         this.workQueue = workQueue;  
    18         this.keepAliveTime = unit.toNanos(keepAliveTime);  
    19         this.threadFactory = threadFactory;  
    20         this.handler = handler;  
    21     } 

    构造方法参数讲解

    参数名 作用
    corePoolSize 核心线程池大小
    maximumPoolSize 最大线程池大小
    keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
    TimeUnit keepAliveTime时间单位
    workQueue 阻塞任务队列
    threadFactory 新建线程工厂
    RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

    重点讲解:
    其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。
    1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
    2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
    3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
    4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
    5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
    6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

    corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

    maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    1 TimeUnit.DAYS;               //
    2 TimeUnit.HOURS;             //小时
    3 TimeUnit.MINUTES;           //分钟
    4 TimeUnit.SECONDS;           //
    5 TimeUnit.MILLISECONDS;      //毫秒
    6 TimeUnit.MICROSECONDS;      //微妙
    7 TimeUnit.NANOSECONDS;       //纳秒

    workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    1 ArrayBlockingQueue
    2 LinkedBlockingQueue
    3 SynchronousQueue
    4 PriorityBlockingQueue

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和SynchronousQueue。线程池的排队策略与BlockingQueue有关。

    threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等

    handler:表示当拒绝处理任务时的策略,有以下四种取值:

      1 1、AbortPolicy:直接抛出异常。
      2 2、CallerRunsPolicy:只用调用者所在线程来运行任务。
      3 3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
      4 4、DiscardPolicy:不处理,丢弃掉。
      5 5、也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
      6    /**
      7      * A handler for rejected tasks that runs the rejected task
      8      * directly in the calling thread of the {@code execute} method,
      9      * unless the executor has been shut down, in which case the task
     10      * is discarded.
     11      */
     12     public static class CallerRunsPolicy implements RejectedExecutionHandler {
     13         /**
     14          * Creates a {@code CallerRunsPolicy}.
     15          */
     16         public CallerRunsPolicy() { }
     17 
     18         /**
     19          * Executes task r in the caller's thread, unless the executor
     20          * has been shut down, in which case the task is discarded.
     21          *
     22          * @param r the runnable task requested to be executed
     23          * @param e the executor attempting to execute this task
     24          */
     25         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     26             if (!e.isShutdown()) {
     27                 r.run();
     28             }
     29         }
     30     }
     31 
     32     /**
     33      * A handler for rejected tasks that throws a
     34      * {@code RejectedExecutionException}.
     35      */
     36     public static class AbortPolicy implements RejectedExecutionHandler {
     37         /**
     38          * Creates an {@code AbortPolicy}.
     39          */
     40         public AbortPolicy() { }
     41 
     42         /**
     43          * Always throws RejectedExecutionException.
     44          *
     45          * @param r the runnable task requested to be executed
     46          * @param e the executor attempting to execute this task
     47          * @throws RejectedExecutionException always
     48          */
     49         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     50             throw new RejectedExecutionException("Task " + r.toString() +
     51                                                  " rejected from " +
     52                                                  e.toString());
     53         }
     54     }
     55 
     56     /**
     57      * A handler for rejected tasks that silently discards the
     58      * rejected task.
     59      */
     60     public static class DiscardPolicy implements RejectedExecutionHandler {
     61         /**
     62          * Creates a {@code DiscardPolicy}.
     63          */
     64         public DiscardPolicy() { }
     65 
     66         /**
     67          * Does nothing, which has the effect of discarding task r.
     68          *
     69          * @param r the runnable task requested to be executed
     70          * @param e the executor attempting to execute this task
     71          */
     72         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     73         }
     74     }
     75 
     76     /**
     77      * A handler for rejected tasks that discards the oldest unhandled
     78      * request and then retries {@code execute}, unless the executor
     79      * is shut down, in which case the task is discarded.
     80      */
     81     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
     82         /**
     83          * Creates a {@code DiscardOldestPolicy} for the given executor.
     84          */
     85         public DiscardOldestPolicy() { }
     86 
     87         /**
     88          * Obtains and ignores the next task that the executor
     89          * would otherwise execute, if one is immediately available,
     90          * and then retries execution of task r, unless the executor
     91          * is shut down, in which case task r is instead discarded.
     92          *
     93          * @param r the runnable task requested to be executed
     94          * @param e the executor attempting to execute this task
     95          */
     96         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     97             if (!e.isShutdown()) {
     98                 e.getQueue().poll();
     99                 e.execute(r);
    100             }
    101         }
    102     }

    ThreadPoolExecutor 源码理解 https://www.cnblogs.com/dolphin0520/p/3932921.html

     1  public static void test(int size) {
     2         ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 20, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));
     3 
     4         for (int i = 0; i < size; i++) {
     5             poolExecutor.execute(new DemoTask(i));
     6 
     7 
     8             Console.log("poolSize:" + poolExecutor.getPoolSize());
     9             Console.log("corePoolSize:" + poolExecutor.getCorePoolSize());
    10             Console.log("maximumPoolSize:" + poolExecutor.getMaximumPoolSize());
    11             Console.log("queue:" + poolExecutor.getQueue().size());
    12             Console.log("completedTaskCount:" + poolExecutor.getCompletedTaskCount());
    13             Console.log("largestPoolSize:" + poolExecutor.getLargestPoolSize());
    14             Console.log("keepAliveTime:" + poolExecutor.getKeepAliveTime(TimeUnit.SECONDS));
    15 
    16         }
    17 
    18         poolExecutor.shutdown();
    19     }
    20 
    21 class DemoTask implements Runnable {
    22 
    23     private int taskNum;
    24 
    25     public DemoTask(int taskNum) {
    26         this.taskNum = taskNum;
    27     }
    28 
    29     @Override
    30     public void run() {
    31         Console.log(StringUtils.center("正在执行" + taskNum, 20, "="));
    32 
    33         try {
    34             Thread.sleep(2000);
    35         } catch (InterruptedException e) {
    36             e.printStackTrace();
    37         }
    38         Console.log(StringUtils.center("执行完毕" + taskNum, 20, "="));
    39     }
    40 }
      1 =======正在执行0========
      2 poolSize:1
      3 corePoolSize:5
      4 maximumPoolSize:20
      5 queue:0
      6 completedTaskCount:0
      7 largestPoolSize:1
      8 keepAliveTime:2
      9 poolSize:2
     10 corePoolSize:5
     11 maximumPoolSize:20
     12 queue:0
     13 completedTaskCount:0
     14 =======正在执行1========
     15 largestPoolSize:2
     16 keepAliveTime:2
     17 poolSize:3
     18 corePoolSize:5
     19 maximumPoolSize:20
     20 =======正在执行2========
     21 queue:0
     22 completedTaskCount:0
     23 largestPoolSize:3
     24 keepAliveTime:2
     25 poolSize:4
     26 corePoolSize:5
     27 maximumPoolSize:20
     28 queue:0
     29 =======正在执行3========
     30 completedTaskCount:0
     31 largestPoolSize:4
     32 keepAliveTime:2
     33 poolSize:5
     34 corePoolSize:5
     35 =======正在执行4========
     36 maximumPoolSize:20
     37 queue:0
     38 completedTaskCount:0
     39 largestPoolSize:5
     40 keepAliveTime:2
     41 poolSize:5
     42 corePoolSize:5
     43 maximumPoolSize:20
     44 queue:1
     45 completedTaskCount:0
     46 largestPoolSize:5
     47 keepAliveTime:2
     48 poolSize:5
     49 corePoolSize:5
     50 maximumPoolSize:20
     51 queue:2
     52 completedTaskCount:0
     53 largestPoolSize:5
     54 keepAliveTime:2
     55 poolSize:5
     56 corePoolSize:5
     57 maximumPoolSize:20
     58 queue:3
     59 completedTaskCount:0
     60 largestPoolSize:5
     61 keepAliveTime:2
     62 poolSize:5
     63 corePoolSize:5
     64 maximumPoolSize:20
     65 queue:4
     66 completedTaskCount:0
     67 largestPoolSize:5
     68 keepAliveTime:2
     69 poolSize:5
     70 corePoolSize:5
     71 maximumPoolSize:20
     72 queue:5
     73 completedTaskCount:0
     74 largestPoolSize:5
     75 keepAliveTime:2
     76 poolSize:6
     77 corePoolSize:5
     78 maximumPoolSize:20
     79 queue:5
     80 completedTaskCount:0
     81 largestPoolSize:6
     82 keepAliveTime:2
     83 poolSize:7
     84 corePoolSize:5
     85 maximumPoolSize:20
     86 queue:5
     87 completedTaskCount:0
     88 largestPoolSize:7
     89 keepAliveTime:2
     90 =======正在执行11=======
     91 poolSize:8
     92 corePoolSize:5
     93 maximumPoolSize:20
     94 queue:5
     95 completedTaskCount:0
     96 =======正在执行12=======
     97 =======正在执行10=======
     98 largestPoolSize:8
     99 keepAliveTime:2
    100 poolSize:9
    101 corePoolSize:5
    102 =======正在执行13=======
    103 maximumPoolSize:20
    104 queue:5
    105 completedTaskCount:0
    106 largestPoolSize:9
    107 keepAliveTime:2
    108 poolSize:10
    109 corePoolSize:5
    110 maximumPoolSize:20
    111 =======正在执行14=======
    112 queue:5
    113 completedTaskCount:0
    114 largestPoolSize:10
    115 keepAliveTime:2
    116 poolSize:11
    117 corePoolSize:5
    118 maximumPoolSize:20
    119 queue:5
    120 =======正在执行15=======
    121 completedTaskCount:0
    122 largestPoolSize:11
    123 keepAliveTime:2
    124 poolSize:12
    125 corePoolSize:5
    126 maximumPoolSize:20
    127 queue:5
    128 completedTaskCount:0
    129 =======正在执行16=======
    130 largestPoolSize:12
    131 keepAliveTime:2
    132 poolSize:13
    133 corePoolSize:5
    134 maximumPoolSize:20
    135 =======正在执行17=======
    136 queue:5
    137 completedTaskCount:0
    138 largestPoolSize:13
    139 keepAliveTime:2
    140 poolSize:14
    141 corePoolSize:5
    142 maximumPoolSize:20
    143 queue:5
    144 =======正在执行18=======
    145 completedTaskCount:0
    146 largestPoolSize:14
    147 keepAliveTime:2
    148 poolSize:15
    149 corePoolSize:5
    150 maximumPoolSize:20
    151 =======正在执行19=======
    152 queue:5
    153 completedTaskCount:0
    154 largestPoolSize:15
    155 keepAliveTime:2
    156 =======执行完毕0========
    157 =======正在执行5========
    158 =======执行完毕1========
    159 =======执行完毕2========
    160 =======正在执行6========
    161 =======正在执行7========
    162 =======执行完毕4========
    163 =======正在执行8========
    164 =======执行完毕3========
    165 =======正在执行9========
    166 =======执行完毕13=======
    167 =======执行完毕12=======
    168 =======执行完毕10=======
    169 =======执行完毕11=======
    170 =======执行完毕15=======
    171 =======执行完毕16=======
    172 =======执行完毕14=======
    173 =======执行完毕19=======
    174 =======执行完毕18=======
    175 =======执行完毕17=======
    176 =======执行完毕5========
    177 =======执行完毕7========
    178 =======执行完毕6========
    179 =======执行完毕8========
    180 =======执行完毕9========

    5、线程池有哪几种工作队列

    • ArrayBlockingQueue (有界队列):是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue (无界队列):一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue(同步队列): 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • DelayQueue(延迟队列):一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。
    • PriorityBlockingQueue(优先级队列): 一个具有优先级得无限阻塞队列。

    6、怎么理解无界队列和有界队列

    • 有界队列即长度有限,满了以后ArrayBlockingQueue会插入阻塞。
    • 无界队列就是里面能放无数的东西而不会因为队列长度限制被阻塞,但是可能会出现OOM异常。

    有界队列
    1.初始的poolSize < corePoolSize,提交的runnable任务,会直接做为new一个Thread的参数,立马执行 。
    2.当提交的任务数超过了corePoolSize,会将当前的runable提交到一个block queue中。
    3.有界队列满了之后,如果poolSize < maximumPoolsize时,会尝试new 一个Thread的进行救急处理,立马执行对应的runnable任务。
    4.如果3中也无法处理了,就会走到第四步执行reject操作。
    无界队列
    与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加,若后续仍有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

    当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略。

    7、如何合理配置线程池的大小

    http://ifeve.com/how-to-calculate-threadpool-size/

    参考:

    Java并发:https://www.javazhiyin.com/topic/thread/page/2

    https://www.iteye.com/blog/825635381-2184680

    https://www.jianshu.com/p/6c6f396fc88e

    https://www.jianshu.com/p/9710b899e749

  • 相关阅读:
    2019-2020-2 20174313张博《网络对抗技术》Exp5 信息搜集与漏洞扫描
    2019-2020-2 20174313张博《网络对抗技术》Exp4 恶意代码分析
    2019-2020-2 20174313张博《网络对抗技术》Exp3-免杀原理与实践
    2019-2020-2 20174313张博《网络对抗技术》Exp2-后门原理与实践
    2019-2020-2 20174313张博 《网络对抗技术》Exp1 PC平台逆向破解
    Exp9 Web安全基础
    实验八 Web基础
    实验七-网络欺诈防范
    MSF基础应用
    信息搜集与漏洞扫描
  • 原文地址:https://www.cnblogs.com/116970u/p/11629079.html
Copyright © 2011-2022 走看看