zoukankan      html  css  js  c++  java
  • 线程池


    1、什么是线程池?

        线程池是指在初始化一个多线程应用程序过程中创建一个线程集合,然后在需要执行新的任务时重用这些线程而不是新建一个线程(提高线程复用,减少性能开销)。线程池中线程的数量通常完全取决于可用内存数量和应用程序的需求。然而,增加可用线程数量是可能的。线程池中的每个线程都有被分配一个任务,一旦任务已经完成了,线程回到池子中然后等待下一次分配任务。

    2、为什么要用线程池:

    合理的使用线程池能够带来3个很明显的好处:
    1. 降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗
    2. 提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
    3. 提高线程的可管理性:线程池可以统一管理、分配、调优和监控

        本质上来讲,我们使用线程池主要就是为了减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务;节约应用内存(线程开的越多,消耗的内存也就越大,最后死机)

    3、线程池的作用:

        线程池作用就是限制系统中执行线程的数量。根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

        说完了线程池的概念和作用,我们再看看代码中的线程池:

        在Java中,线程池的代码起源之Executor(翻译过来就是执行者)注意:这个类是一个接口。

     
    Executor

        但是严格意义上讲Executor并不是一个线程池(如图其源码就一个 execute 方法),所以Executor仅只是一个执行线程的工具。那么,线程池的真正面纱是什么?利用AS的类继承关系发现,Executor有一个 ExecutorService 子接口。

    ExecutorService

        实际上,一般说线程池接口,基本上说的是这个 ExecutorService。ExecutorService源码里面有各种API(比如说执行 excute ( xxx ),比如关闭  isShutdown ( ))帮助我们去使用。ExecutorService接口的默认实现类为ThreadPoolExecutor(翻译过来就是线程池执行者)。既然是默认实现类我们就可以根据应用场景去私人订制了。

     
    ExecutorService - 1
    ExecutorService - 2

    4.线程池状态

    线程池同样有五种状态:Running, SHUTDOWN, STOP, TIDYING,TERMINATED。 
    RUNNING:处于RUNNING状态的线程池能够接受新任务,以及对新添加的任务进行处理。
    SHUTDOWN:处于SHUTDOWN状态的线程池不可以接受新任务,但是可以对已添加的任务进行处理。
    STOP:处于STOP状态的线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
    TIDYING:当所有的任务已终止,ctl记录的任务数量0,线程池会变为
    TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()terminated()ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
    TERMINATED:线程池彻底终止的状态。

    5、ThreadPoolExecutor

    既然找到了突破口,那我们集中火力先去了解下ThreadPoolExecutor

    ThreadPoolExecutor构造参数

    首先从ThreadPoolExecutor 的构造参数开始分析,通过代码截图得知,ThreadPoolExecutor的构造方法有以下4种:

     
    ThreadPoolExecutor构造参数

    下面就构造方法里面的参数逐一分析说明:

    1:int corePoolSize (core:核心的) = >  该线程池中核心线程数最大值

    什么是核心线程:线程池新建线程的时候,如果当前线程总数小于 corePoolSize ,则新建的是核心线程;如果超过corePoolSize,则新建的是非核心线程。

    核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。

    如果指定ThreadPoolExecutor的 allowCoreThreadTimeOut 这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间( keepAliveTime),就会被销毁掉

    2:int maximumPoolSize  = >  该线程池中线程总数的最大值

    线程总数计算公式 = 核心线程数 + 非核心线程数。

    3:long keepAliveTime  = >  该线程池中非核心线程闲置超时时长

    注意:一个非核心线程,如果不干活(闲置状态)的时长,超过这个参数所设定的时长,就会被销毁掉。但是,如果设置了  allowCoreThreadTimeOut = true,则会作用于核心线程。

    4:TimeUnit unit  = > (时间单位)

    首先,TimeUnit是一个枚举类型,翻译过来就是时间单位,我们最常用的时间单位包括:

    MILLISECONDS : 1毫秒 、SECONDS : 秒、MINUTES : 分、HOURS : 小时、DAYS : 天

    5:BlockingQueue<Runnable> workQueue = >( Blocking:阻塞的,queue:队列)

    该线程池中的任务队列:维护着等待执行的Runnable对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务

     
    BlockingQueue源码

    其中,BlockingQueue中具体的API介绍:

    offer(E e): 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。

    offer(E e, long timeout, TimeUnit unit): 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.

    add(E e): 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。

    put(E e): 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。

    take(): 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。

    poll(long timeout, TimeUnit unit): 在给定的时间里,从队列中获取值,如果没有取到会抛出异常。

    remainingCapacity():获取队列中剩余的空间。

    remove(Object o): 从队列中移除指定的值。

    contains(Object o): 判断队列中是否拥有该值。

    drainTo(Collection c): 将队列中值,全部移除,并发设置到给定的集合中。

    说完了BlockingQueue常用的API,在说说其常用的workQueue类型:

    一般来说,workQueue有以下四种队列类型:

    SynchronousQueue:(同步队列)这个队列接收到任务的时候,会直接提交给线程处理,而不保留它(名字定义为 同步队列)。但有一种情况,假设所有线程都在工作怎么办?

    这种情况下,SynchronousQueue就会新建一个线程来处理这个任务。所以为了保证不出现(线程数达到了maximumPoolSize而不能新建线程)的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大,去规避这个使用风险。

    LinkedBlockingQueue(链表阻塞队列):这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize

    ArrayBlockingQueue(数组阻塞队列):可以限定队列的长度(既然是数组,那么就限定了大小),接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误

    DelayQueue(延迟队列):队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务


    说完了BlockingQueue,继续回到ThreadPoolExecutor的构造参数上面

    6:ThreadFactory threadFactory = > 创建线程的方式,这是一个接口,new它的时候需要实现他的Thread newThread(Runnable r)方法

    7:RejectedExecutionHandler handler = > 这个主要是用来抛异常的

    当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。

    构造参数基本上就介绍完毕了。

    (如果看累了就先休息下。。。内容的确较多。。。)

        花了这么大篇幅去介绍ThreadPoolExecutor这个类的构造函数,可能你会觉得好累好空虚,好吧,其实我们的付出都是为打通线程池最后的壁垒做的必要准备,因为 千里之行、始于足下 我们始终要坚信 倘想达到最高处,就要从低处开始 。

        说完了这么多,我们知道了实例化一个线程池,只需要在构造参数里面去添加自己设置的属性值(设置正确即可使用,设置错误即抛异常),这样问题就来了:

    一个任务,它是如何进入线程池去执行任务?

    ThreadPoolExecutor这个类,里面有一个API,在上面也随口提到过,有一个执行的方法,先上图

    添加任务

    首先我们初始化一个线程池后,即可调用 execute这个方法,里面传入Runnable即可向线程池添加任务。

    问题又来了,既然线程池新添加了任务,那么线程池是如何处理这些批量任务?

    1:如果线程数量未达到corePoolSize,则新建一个线程(核心线程)执行任务

    2:如果线程数量达到了corePools,则将任务移入队列等待

    3:如果队列已满,新建线程(非核心线程)执行任务

    4:如果队列已满,总线程数又达到了maximumPoolSize,就会由RejectedExecutionHandler抛出异常

    6、四种线程池

    但是,实际上,Java已经为我们提供了四种线程池!

    好吧,在Java中,Executors这个类已经为我们提供了常用的四种线程池,分别为:

    A:newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    newFixedThreadPool

    源码注释翻译

    创建一个线程池,使用固定数量的线程在共享的无界队列中操作。

    在任何时候,有最多  nThreads(就是我们传入参数的数量)的线程将处理任务。

    如果所有线程都处于活动状态时,提交额外的任务,他们会在队列中等待,直到有一个线程可用。

    如果在执行过程中出现故障,任何线程都会终止。如果需要执行后续任务,新的任务将取代它的位置。线程池中的线程会一直存在,直到它显式为止(调用shutdown)

    nThreads 就是传入线程池的数量  ,当nThreads  <= 0 就会抛异常IllegalArgumentException

    B:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

     
    newCachedThreadPool

    源码注释翻译:

    创建一个线程池,根据需要创建新线程,但是将重写之前线程池的构造。

    这个线程池通常会提高性能去执行许多短期异步任务的程序。

    如果有可用线程,当线程池调用execute, 将重用之前的构造函数。

    如果没有现有的线程可用,那么就创建新的线程并添加到池中。

    线程没有使用60秒的时间被终止并从线程池里移除缓存。

    因此,一个闲置时间足够长的线程池不消耗任何资源。

    注意,线程池有类似的属性,但有一些不同的细节(例如,超时参数)可以使用@link ThreadPoolExecutor构造函数创建。

    C:newScheduledThreadPool 创建一个定长任务线程池,支持定时及周期性任务执行。

    newScheduledThreadPool

    源码注释翻译:

    创建一个线程池,它可以安排在 a 之后运行的命令给定延迟,或定期执行。

    corePoolSize (这个参数) 是指在池中保留的线程数,即使它们是空闲的。这个函数最终会返回一个新创建的调度线程池

    如果 corePoolSize < 0 ,则会抛出 IllegalArgumentException

    Ps:这个还支持多传入一个ThreadFactory

    D:newSingleThreadExecutor 创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

     
     

    源码注释翻译:

    创建一个线程执行器,它使用单个运行中的线程操作在一个无界队列中。

    请注意,如果这个单独的线程终止是因为在执行前异常或者终止,若需要执行后续的任务,那么就需要一个新的去替代它。

    任务被保证按顺序的执行,并且在任何给定的时间内不超过一个任务将是活动的。

    不像其他等价 newFixedThreadPool(1) 这个返回的线程池对象是保证不运行重新配置以使用额外的线程。

    最终返回的是一个重新创建的单线程去执行。

    总结:

    Java为我们提供的四种线程池基本上就介绍完毕了。可以看到,这四种每一个具体的线程池都是 跟 ThreadPoolExecutor 配置有关的。因此,前面花大篇幅介绍ThreadPoolExecutor的构造参数在这里就起到了作用,整体来说,线程池的基本概念就结束了。

    下面上一份伪代码加深理解

     

    6.1、newCachedThreadPool

    6.1.1.作用:创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程。

    6.1.2.特征: 
    (1)线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE) 
    (2)线程池中的线程可进行缓存重复利用和回收(回收默认时间为1分钟) 
    (3)当线程池中,没有可用线程,会重新创建一个线程

    6.1.3.创建方式:

    ExecutorService pool = Executors.newSingleThreadExecutor();

    一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    使用方式:

     1 public class Demo15 {
     2     public static void main(String[] args) {
     3         ExecutorService threadPool = Executors.newCachedThreadPool();
     4 
     5         //执行5次
     6         for (int i = 0; i <5 ; i++) {
     7             threadPool.execute(new CachDemo());
     8         }
     9         threadPool.shutdown();
    10 
    11     }
    12 
    13     static class CachDemo implements Runnable{
    14         @Override
    15         public void run() {
    16             String name = Thread.currentThread().getName();
    17                 try {
    18                     Thread.sleep(100);
    19                     System.out.println(name );
    20                     System.out.println("线程执行完了");
    21                 } catch (InterruptedException e) {
    22                     e.printStackTrace();
    23                 }
    24 
    25         }
    26     }
    27 }
    View Code

    输出结果如下:

    pool-1-thread-5
    线程执行完了
    pool-1-thread-4
    线程执行完了
    pool-1-thread-3
    线程执行完了
    pool-1-thread-2
    线程执行完了
    pool-1-thread-1
    线程执行完了

    从输出的结果我们可以看出,一直只有一个线程在运行。

    6.2、newFixedThreadPool

    6.2.1.作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。

    6.2.2.特征: 
    (1)线程池中的线程处于一定的量,可以很好的控制线程的并发量 
    (2)线程可以重复被使用,在显示关闭之前,都将一直存在 
    (3)超出一定量的线程被提交时候需在队列中等待

    6.2.3.创建方式: 
    (1)Executors.newFixedThreadPool(int nThreads);//nThreads为线程的数量 
    (2)Executors.newFixedThreadPool(int nThreads,ThreadFactory threadFactory);//nThreads为线程的数量,threadFactory创建线程的工厂方式

    ExecutorService pool = Executors.newFixedThreadPool(10);

    创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    使用方式:

     1 public class Demo14 {
     2     public static void main(String[] args) {
     3         ExecutorService threadPool = Executors.newFixedThreadPool(2);//线程池数量为2
     4 
     5         //执行5次
     6         for (int i = 0; i <5 ; i++) {
     7             threadPool.execute(new FixedDemo());
     8         }
     9         threadPool.shutdown();
    10 
    11     }
    12 
    13     static class FixedDemo implements Runnable{
    14         @Override
    15         public void run() {
    16             String name = Thread.currentThread().getName();
    17             for (int i = 0; i <2; i++) {
    18                 System.out.println(name +"——"+i);
    19             }
    20         }
    21     }
    22 }
    View Code

    输出结果如下:

    pool-1-thread-1——0
    pool-1-thread-1——1
    pool-1-thread-1——0
    pool-1-thread-1——1
    pool-1-thread-1——0
    pool-1-thread-1——1
    pool-1-thread-2——0
    pool-1-thread-2——1
    pool-1-thread-2——0
    pool-1-thread-2——1

    6.3、newSingleThreadExecutor

    6.3.1.作用:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool(1) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。

    6.3.2.特征: 
    (1)线程池中最多执行1个线程,之后提交的线程活动将会排在队列中以此执行

    6.3.3.创建方式: 
    (1)Executors.newSingleThreadExecutor() ; 
    (2)Executors.newSingleThreadExecutor(ThreadFactory threadFactory);// threadFactory创建线程的工厂方式

    创建方式:

    ExecutorService pool = Executors.newCachedThreadPool();

    创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添加新线程来处理任务。

    使用方式如上2所示。

    6.4、newScheduleThreadPool

    6.4.1.作用: 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

    6.4.2.特征: 
    (1)线程池中具有指定数量的线程,即便是空线程也将保留 
    (2)可定时或者延迟执行线程活动

    6.4.3.创建方式: 
    (1)Executors.newScheduledThreadPool(int corePoolSize);// corePoolSize线程的个数 
    (2)newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory);// corePoolSize线程的个数,threadFactory创建线程的工厂

    创建方式:

    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);

    此线程池支持定时以及周期性执行任务的需求。

    使用方式:

     1 public class Demo9ScheduledThreadPool {
     2 
     3     public static void main(String[] args) throws InterruptedException {
     4         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
     5         System.out.println("程序开始:" + new Date());
     6         for (int i = 0; i < 3; i++) {
     7             //for (int i = 3; i > 0; i--) {
     8             // 第二个参数是延迟多久执行
     9             scheduledThreadPool.schedule(new Task(), i, TimeUnit.SECONDS);
    10         }
    11         Thread.sleep(500);
    12         // 关闭线程池
    13         scheduledThreadPool.shutdown();
    14     }
    15 
    16     static class Task implements Runnable {
    17         @Override
    18         public void run() {
    19             try {
    20                 String name = Thread.currentThread().getName();
    21 
    22                 System.out.println(name + ", 开始:" + new Date());
    23                 Thread.sleep(1000);
    24                 System.out.println(name + ", 结束:" + new Date());
    25 
    26             } catch (InterruptedException e) {
    27                 e.printStackTrace();
    28             }
    29         }
    30     }
    31 }
    View Code

    上面演示的是延迟0——3秒执行任务,如果想要执行周期性的任务可以用下面的方式,每秒执行一次

    程序开始:Sun Aug 09 20:22:51 CST 2020
    pool-1-thread-1, 开始:Sun Aug 09 20:22:51 CST 2020
    pool-1-thread-1, 结束:Sun Aug 09 20:22:52 CST 2020
    pool-1-thread-1, 开始:Sun Aug 09 20:22:52 CST 2020
    pool-1-thread-2, 开始:Sun Aug 09 20:22:53 CST 2020
    pool-1-thread-1, 结束:Sun Aug 09 20:22:53 CST 2020
    pool-1-thread-2, 结束:Sun Aug 09 20:22:54 CST 2020

    6.5、newSingleThreadScheduledExecutor

    作用: 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

    特征: 
    (1)线程池中最多执行1个线程,之后提交的线程活动将会排在队列中以此执行 
    (2)可定时或者延迟执行线程活动

    创建方式: 
    (1)Executors.newSingleThreadScheduledExecutor() ; 
    (2)Executors.newSingleThreadScheduledExecutor(ThreadFactory threadFactory) ;//threadFactory创建线程的工厂

    使用方式:

     1 public class Demo16 {
     2     public static void main(String[] args) {
     3         ExecutorService threadPool = Executors.newSingleThreadExecutor();
     4 
     5         //执行5次
     6         for (int i = 0; i <5 ; i++) {
     7             threadPool.execute(new CachDemo());
     8         }
     9         threadPool.shutdown();
    10     }
    11 
    12     static class CachDemo implements Runnable{
    13         @Override
    14         public void run() {
    15             String name = Thread.currentThread().getName();
    16                 try {
    17                     Thread.sleep(1000);
    18                     System.out.println(name );
    19                     System.out.println("线程执行完了");
    20                 } catch (InterruptedException e) {
    21                     e.printStackTrace();
    22                 }
    23 
    24         }
    25     }
    26 }
    View Code

    输出结果如下:

     1 pool-1-thread-1
     2 线程执行完了
     3 pool-1-thread-1
     4 线程执行完了
     5 pool-1-thread-1
     6 线程执行完了
     7 pool-1-thread-1
     8 线程执行完了
     9 pool-1-thread-1
    10 线程执行完了

    从始至终都只有一个线程操作。

    6.6.五种线程池的使用场景

    • newSingleThreadExecutor:一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。

    • newFixedThreadPool:一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。

    • newCachedThreadPool:一个可以无限扩大的线程池,比较适合处理执行时间比较小的任务。

    • newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。

    • newWorkStealingPool:一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。

    6.7.线程池任务执行流程

    1. 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
    2. 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
    3. 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
    4. 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
    5. 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
    6. 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

     

  • 相关阅读:
    扫面线模板
    (动态规划、栈)leetcode 84. Largest Rectangle in Histogram, 85. Maximal Rectangle
    tmux 常见命令汇总
    leetcode 221
    leetcode 319 29
    (贪心)leetcode 392. Is Subsequence, 771. Jewels and Stones, 463. Island Perimeter
    leetcode 982 668
    Python import 同文件夹下的py文件的函数,pycharm报错
    Windows里Anaconda-Navigator无法打开的解决方案
    Windows下 gpu版 Tensorflow 安装
  • 原文地址:https://www.cnblogs.com/aaaazzzz/p/12842390.html
Copyright © 2011-2022 走看看