zoukankan      html  css  js  c++  java
  • 线程

    线程池实现原理

    线程池介绍

    多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
    假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
    如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

    一个线程池包括以下四个基本组成部分:

    1. 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
    2. 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
    3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
    4. 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

    Java 并发编程包

    ThreadPoolExecutor

    • corePoolSize
      线程池中保留的线程数量。
      在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。
      默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
      如果设置了allowCoreThreadTimeOut,当线程池中空闲的线程超过keepAliveTime,那么这些线程将被销毁,线程池中将没有任何线程。

    • maximumPoolSize
      线程池中最大线程数。
      这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    • keepAliveTime
      表示线程没有任务执行时最多保持多久时间会终止。
      默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。
      但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    • workQueue
      一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

    • threadFactory
      线程工厂,主要用来创建线程

    • handler
      表示当拒绝处理任务时的策略,有以下四种取值:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    • beforeExecute & afterExecute
      Worker中,private void runTask(Runnable task)方法调用,是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等

    线程池状态

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    任务的执行

    比较重要的成员变量:

    // 任务缓存队列,用来存放等待执行的任务
    private final BlockingQueue<Runnable> workQueue;
    
    //线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
    private final ReentrantLock mainLock = new ReentrantLock();
    
    //用来存放工作集
    private final HashSet<Worker> workers = new HashSet<Worker>();  
    
    //线程存活时间  
    private volatile long  keepAliveTime;  
    
    //是否允许为核心线程设置存活时间
    private volatile boolean allowCoreThreadTimeOut; 
    
    //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int  corePoolSize;     
    
    //线程池最大能容忍的线程数
    private volatile int  maximumPoolSize; 
    
    //线程池中当前的线程数
    private volatile int   poolSize;     
    
    //任务拒绝策略
    private volatile RejectedExecutionHandler handler; 
     
    //线程工厂,用来创建线程
    private volatile ThreadFactory threadFactory;   
    
    //用来记录线程池中曾经出现过的最大线程数
    private int largestPoolSize;   
    
    //用来记录已经执行完毕的任务个数
    private long completedTaskCount;

    关于corePoolSize、maximumPoolSize、largestPoolSize三个变量
    corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:

    假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
    因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
    当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
    
    如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;  
    然后 就将任务也分配给这4个临时工人做;
    
    如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
    当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
    
    这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
    也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
    
    不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。
    largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

    空闲线程获取缓存队列中任务的方式
    这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。

    任务提交给线程池后执行的方式

    1. 首先,要清楚corePoolSize和maximumPoolSize的含义
    2. 其次,要知道Worker是用来起到什么作用的
    3. 要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:
    • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务
    • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
    • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理
    • 如果线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

    线程池中的线程初始化

    默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
    在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

    prestartCoreThread():初始化一个核心线程;
    prestartAllCoreThreads():初始化所有核心线程

    任务缓存队列及排队策略

    在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
    workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

    1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

    2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

    3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

    下面主要是不同队列策略表现:

    直接递交:一种比较好的默认选择是使用SynchronousQueue,这种策略会将提交的任务直接传送给工作线程,而不持有。如果当前没有工作线程来处理,即任务放入队列失败,则根据线程池的实现,会引发新的工作线程创建,因此新提交的任务会被处理。这种策略在当提交的一批任务之间有依赖关系的时候避免了锁竞争消耗。值得一提的是,这种策略最好是配合unbounded线程数来使用,从而避免任务被拒绝。同时我们必须要考虑到一种场景,当任务到来的速度大于任务处理的速度,将会引起无限制的线程数不断的增加。

    无界队列:使用无界队列如LinkedBlockingQueue没有指定最大容量的时候,将会引起当核心线程都在忙的时候,新的任务被放在队列上,因此,永远不会有大于corePoolSize的线程被创建,因此maximumPoolSize参数将失效。这种策略比较适合所有的任务都不相互依赖,独立执行。举个例子,如网页服务器中,每个线程独立处理请求。但是当任务处理速度小于任务进入速度的时候会引起队列的无限膨胀。

    有界队列:有界队列如ArrayBlockingQueue帮助限制资源的消耗,但是不容易控制。队列长度和maximumPoolSize这两个值会相互影响,使用大的队列和小maximumPoolSize会减少CPU的使用、操作系统资源、上下文切换的消耗,但是会降低吞吐量,如果任务被频繁的阻塞如IO线程,系统其实可以调度更多的线程。使用小的队列通常需要大maximumPoolSize,从而使得CPU更忙一些,但是又会增加降低吞吐量的线程调度的消耗。总结一下是IO密集型可以考虑多些线程来平衡CPU的使用,CPU密集型可以考虑少些线程减少线程调度的消耗。

     

    任务拒绝策略

    当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,这是一种feedback策略,可以降低任务提交的速度。

    线程池的关闭

    ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

    shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
    shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

    线程池容量的动态调整

    ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

    setCorePoolSize:设置核心池大小
    setMaximumPoolSize:设置线程池最大能创建的线程数目大小

    当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

    newFixedThreadPool

    创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,超出的线程会在队列中等待,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    newSingleThreadExecutor

    将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue

    newCachedThreadPool

    将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

    当提交到线程池的任务抛出异常,线程池并不会终止,线程池对这种情况做了专门的处理。

    java线程池(newFixedThreadPool)线程消失疑问?

    ReentrantLock 使用

    线程池的使用

    如何合理配置线程池的大小

    一般需要根据任务的类型来配置线程池大小:
    如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 N(CPU)+1
    如果是IO密集型任务,参考值可以设置为2*N(CPU)
    当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

    线程结果监听

    判断线程执行是否结束,并在线程执行结束后执行相应的记录日志操作

     

    利用Hook嵌入你的行为

    ThreadPoolExecutor提供了protected类型可以被覆盖的钩子方法,允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化ThreadLocal、收集统计信息、如记录日志等操作。这类Hook如beforeExecute和afterExecute。另外还有一个Hook可以用来在任务被执行完的时候让用户插入逻辑,如rerminated。

    如果hook方法执行失败,则内部的工作线程的执行将会失败或被中断。

    beforeExecute & afterExecute

    预留抽象接口,留给用户自己实现,可以记录任务的一些日志信息等。

     

    Callable和Future

    Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务。
    Callable和Runnable有几点不同:
    (1)Callable规定的方法是call(),而Runnable规定的方法是run().
    (2)Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。
    (3)call()方法可抛出异常,而run()方法是不能抛出异常的。
    (4)运行Callable任务可拿到一个Future对象, Future表示异步计算的结果。

    它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。 通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。

    实战案例

    邮件发送线程池

    http://jeyke.iteye.com/blog/1678864

    链接

  • 相关阅读:
    Ch1 机器学习基础
    信息论与编码课程设计
    实验4 数据库的安全性、完整性
    实验3 SQL语言—更新操作、视图、索引等操作
    实验2 SQL语言—SELECT查询操作
    实验1 数据库的定义和建立实验
    计算机网络|网络层作业
    信息安全从业者书单推荐
    jenkins异常 -- active (exited),无法启动
    性能测试 -- docker部署grafana
  • 原文地址:https://www.cnblogs.com/tonyq/p/7844097.html
Copyright © 2011-2022 走看看