zoukankan      html  css  js  c++  java
  • Java线程池

    Java线程池

    基本概念

    特点:

    • 管理线程,避免增加和销毁线程的资源消耗。
    • 提高响应速度。任务到达无需等待线程创建即可立即执行。
    • 重复利用。
    • 避免了线程数量膨胀导致的过分调度问题。
    • 提供更多功能。可以添加更多功能,如延时定时线程池等。

    线程池核心设计与实现

    • 线程池的实现类为ThreadPoolExecutor
    • 顶层接口为Executor:将任务提交和任务执行进行解耦。(用户只需提供Runnable对象,由Executor完成线程的调度和任务的执行)
    • ExecutorService:增加了为一批异步任务生成Future的方法。提供线程管控的能力(停止线程池的运行)
    • AbstractExecutorService:上层的抽象类,将执行任务的流程串联起来,下层只要执行一个方法。
    • ThreadPoolExecutor:管理线程和任务。

    线程池运行机制

    • 线程池的运行由两部分组成:
      • 任务管理(生产者)
      • 线程管理(消费者)
    • 任务被提交后,有三种处理:
      • 直接申请线程执行该任务
      • 缓冲到队列中等待线程执行
      • 拒绝该任务
    • 线程管理:
      • 根据任务请求进行线程的分配
      • 线程执行完任务后获取新任务去执行
      • 线程获取不到任务时,会被回收

    生命周期管理

    • 线程池的运行状态由两个变量组成:
      • runState(运行状态)
      • workerCount(线程数量)
    • 使用一个AtomicInteger变量ctl对两个变量一起维护:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    • ctl的高3位保存runState,低29位保存workerCount

    任务执行机制

    任务调度

    • 提交的任务如何执行是由任务调度决定的。
    • 是由任务的调度是由execute()方法完成。

    流程:

    1. 检查线程池运行状态。若不是RUNNING,则直接拒绝。
    2. workerCount<corePoolSize,则创建并启动一个线程并执行新提交的任务。
    3. workerCount>=corePoolSize,且线程池的阻塞队列未满,则任务添加到阻塞队列中。
    4. workerCount>=corePoolSize并且worker<maximumPoolSize,则线程池的阻塞队列已满,创建并启动一个线程来执行新提交的任务。
    5. workerCount>=maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务。(默认抛出异常)

    任务调度流程:
    任务调度流程

    任务缓冲

    • 线程池目的是:将任务和线程两者解耦。
    • 实现的手段是使用生产者消费者模式,通过阻塞队列实现的。
    • 阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

    阻塞队列(Blocking Queue):

    • 队列为空时,获取元素的线程会等待队列变为非空。
    • 队列满时,存入任务的线程会等待队列可用。
    • 阻塞队列是存放任务的容器,线程只能从容器中取出任务。

    阻塞队列:

    常见的阻塞队列:

    名称 描述
    ArrayBlockingQueue 一个用数组实现的有界阻塞队列,按照FIFO对任务排序,支持公平锁和非公平锁
    LinkedBlockingQueue 基于链表结构的阻塞队列,按照FIFO排序任务.可以设置容量,不设置时为无界阻塞队列,最大长度为Integer.MAX_VALUE,newFixedThreadPool使用该队列.
    DelayQueue 延迟获取的无界队列,指定多久才能从队列中获取当前元素,只有延时期后才能从队列中获取元素,newScheduledThreadPool线程池使用该队列.
    PriorityBlockingQueue 支持线程优先级的无界队列,默认自然序排列,可自定义实现compareTo()方法指定元素的排序规则,不保证同一优先级的先后顺序
    SynchronousQueue 不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移出操作,否则插入操作处于阻塞状态。支持公平锁和非公平锁。newCachedThreadPool线程池使用该队列,新任务到来时创建线程,有空闲线程则重复使用,线程空闲60秒会被回收。
    LinkedTransferQueue 链表结构组成的无界阻塞队列,额外有transfer()tryTransfer()方法
    LinkedBlockingDeque 链表结构组成的双向阻塞队列,队列的头尾均可添加和移出元素,可减低锁的竞争

    任务申请

    两种任务执行方式:

    • 任务直接由新创建的线程执行
    • 线程从任务队列中获取任务执行(主要)

    tryTask()方法实现线程从阻塞队列中获取任务:

    1. 获取线程池状态及线程数量
    2. 线程池是否停止,若停止,则返回null(线程空闲)
    3. 若没有停止,则检查线程数是否过多。若过多,则返回null。
    4. 若没有过多,则检查该线程是否为可回收线程,若是,则限时获取任务。
    5. 若否,则阻塞获取任务。
    6. 任务申请完成。

    任务拒绝

    当线程池的任务缓存队列已满,并且线程池中的线程数达到最大值时,需要拒绝新提交的任务。

    可用实现接口RejectExecutionHadler来定制拒绝策略,或者使用四种已有策略:


    线程的管理

    Worker线程

    为了管理线程的状态并维护线程的生命周期,设计工作线程Worker

    final Thread thread; // 当前Worker持有的线程
    Runnable firstTask; // 初始化线程的任务(可为null)
    volatile long completedTasks; // 完成的任务数
    

    Worker执行任务的模型:

    注:

    • firstTask为null,则线程启动时执行任务队列中的任务。若不为null,则执行firstTask这个任务。
    • 线程池使用Hash表持有线程的引用。通过添加引用,删除引用控制线程的生命周期。
    • Worker通过继承AQS实现独占锁。没有使用ReentrantLock,为了表示不可重入特定反应形成现在的执行状态。

    线程的状态:

    • lock方法获取独占锁后,当前线程正在执行任务。
    • 正在执行任务,不应该中断线程。
    • 若线程非独占锁状态,则没有处理任务,可以对线程进行中断。
    • 线程池执行shutdown()方法或tryTerminate()方法时调用interruptIdleWorkers()方法中断空闲线程。从而调用tryLock()判断线程池中是否有空闲线程。若是空闲的,则可以回收。

    Worker线程增加

    • 通过调用addWorker()方法增加线程.
    • 有两个参数:
      • firstTaks:新增线程执行的第一个任务(可为null)
      • core:true:判断当前活动线程数是否少于corePoolSize;false:判断当前活动线程数是否小于maximumPoolSize.

    新增线程流程:

    Worker线程回收

    • 线程的回收依赖于JVM自动回收.
    • 线程池根据当前线程池的状态维护线程引用,防止线程被JVM回收.
    • 当决定回收指定线程时,只需将其引用消除.
    • 核心线程可以无限等待获取任务,非核心线程限时获取任务.
    • 当Worker无法获取到任务时,循环结束,Worker主动消除自身在线程池的引用.

    线程的销毁:

    Worker线程执行任务

    • Worker中的run()方法会调用runWorker()方法.
    • 流程:
      1. 不断循环通过getTask()方法从阻塞队列中获取任务.
      2. 若线程池正在停止,则保证当前线程时中断状态,否则保证当前线程不是中断状态.
      3. 执行任务.
      4. getTask()结果为null,则跳出循环,执行processWorkerExit()方法,销毁线程.

    执行任务流程:

    线程池的创建

    可以通过ThreadPoolExecutor来创建。

    构造函数:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)
    

    参数:

    • corePoolSize:线程池核心线程数最大值
    • maximumPoolSize:线程池最大线程数大小
    • keepAliveTime:线程池非核心线程空闲的存活时间大小
    • unit:线程空闲存活时间单位
    • workQueue:存放任务的阻塞队列
    • threadFactory:用于设置创建线程的工厂。
    • handler:线程池饱和策略事件。

    任务执行

    线程池执行流程(对应于execute()方法)

    1. 提交一个任务,线程池里存活的核心线程数小于corePoolSie时,线程池创建一个核心线程处理提交的任务。
    2. 若线程池核心线程数已满(线程数等于corePoolSie),新提交的任务被放在任务队列workQueue排队等待执行。
    3. 当线程池里面的存活线程数达到corePoolSize时,并且任务队列workQueue已满,则判断是否到达maximumPoolSize(即是否达到最大线程数)。若没有,则创建非核心线程执行提交的任务。
    4. 若线程数达到maxmumPoolSize时,新的任务采用拒绝策略处理。

    四种拒绝策略

    • AbortPolicy:抛出异常(默认)
    • DiscardPolicy:抛弃任务,无异常。
    • DiscardOldsPolicy:抛弃队列中最老的任务,将当前任务提交给线程池。
    • CallerRunsPolicy:交给线程池调用所在的线程处理。

    线程池的异常处理

    • 使用try catch捕获异常.
    • 通过Future对象的get()方法接收抛出的异常,再处理.
    • 为工作者线程设置UncaughtExceptionHandler,在UncaughtExceptionHandler方法中处理异常.

    示例:

    // 1. 通过try...catch捕获异常
    try {
        System.out.println(3/0);
    } catch (Exception e){
        System.out.println(e.getMessage());
    }
    
    // 2. sumbit执行,Future.get接收异常
    Future<?> future = threadPool.submit(() -> {
        System.out.println(3 / 0); // 产生异常
    });
    
    try {
        future.get(); // 接收异常
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    
    // 3. 设置Thread.UncaughtExceptionHandler处理未检测的异常
    ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
        Thread t = new Thread(r);
        t.setUncaughtExceptionHandler((t1, e) -> {
            System.out.println(t1.getName() + " 抛出异常 " + e);
        });
        return t;
    });
    
    // 4. 重写ThreadPoolExecutor.afterExecute方法,处理传递的异常引用
    class ExtendExecutor extends ThreadPoolExecutor {
        public ExtendExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t == null && r instanceof Future<?>){
                try {
                    Object result = ((Future<?>) r).get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            if (t != null)
                System.out.println(t);
        }
    }
    

    常用的线程池

    • newFixedThreadPool:固定数目线程的线程池
    • newCachedThreadPool:可缓存线程的线程池
    • newSingleThreadExecutor:单线程的线程池
    • newScheduledThreadPool:定时及周期执行的线程池

    newFixedThreadPool

    特点:

    • 核心线程和最大线程数大小一致
    • 没有空闲时间,keepAliveTime为0
    • 阻塞队列为无界队列LinkedBlockingQueue

    工作机制:

    1. 任务被提交.
    2. 检查线程池中线程数是否小于coreSize.若小于,则创建核心线程执行任务.
    3. 若不小于,则将任务加入阻塞队列,等待被取出执行.
    4. 若队列容量一直增加,可能导致OOM.

    异常:
    使用无界队列可能导致内存飙升.
    若一个线程获取任务后,任务的执行时间比较长,导致队列的任务堆积过多,内存不足后抛出OOM.

    使用场景:
    适用于CPU密集型任务,确保CPU被工作线程使用的情况下,尽可能少的分配线程,适用与执行长期任务.

    newCachedThreadPool

    特点:

    • 核心线程数为0
    • 最大线程数为Integer.MAX_VALUE
    • 阻塞队列是SynchronousQueue
    • 非核心线程空闲存活时间为60秒

    提交的任务速度大于处理任务的速度时,每次提交一个任务,就会创建一个线程.
    空闲60秒的线程会被终止,长时间保持空闲的CachedThreadPool不会占用资源.

    工作机制:

    1. 提交任务.
    2. 若没有核心线程,则任务直接加到SynchronousQueue队列.
    3. 判断是否有空闲线程,若有,则取出任务执行.
    4. 若没有,则新建一个线程执行.
    5. 执行完任务的线程,还可以存活60秒.若存活期间接收到任务,可以继续存活;否则被销毁.

    使用场景:
    并发执行大量短期的小任务.

    newSingleThreadExecutor

    特点:

    • 核心线程数为1
    • 最大线程数为1
    • 阻塞队列为LinkedBlockingQueue
    • keepAliveTime为0

    工作机制:

    1. 提交任务.
    2. 线程池是否有一个线程,若没有,则新建线程执行.
    3. 若有,则将任务加到阻塞队列.
    4. 唯一的线程执行完一个任务后,再从队列中取出任务.

    使用场景:
    串行执行任务的场景,一个任务接着一个任务地执行.

    newScheduledThreadPool

    特点:

    • 最大线程数为Integer.MAX_VALUE
    • 阻塞队列是DelayedWorkQueue
    • keepAliveTime为0
    • scheduleAtFixedRate():以某种速率周期性执行
    • scheduleWithFixedDelay():在某个延迟后执行

    工作机制:

    1. 添加一个任务.
    2. 线程池中的线程从DelayQueue中取任务.
    3. 线程从DelayQueue中获取time大于等于当前时间的任务.
    4. 执行完后修改这个任务的time为下次被执行的时间.
    5. 将该任务放回DelayQueue队列中.

    使用场景:
    周期性执行任务的场景,需要限制线程数量的场景.


    线程池状态

    • RUNNING
    • SHUTDOWN
    • STOP
    • TIDYING
    • TERMINATED
    运行状态 描述
    RUNNING 能够接受新提交的任务,也能处理阻塞队列中的任务
    SHUTDOWN 关闭状态。不再接受新提交的任务,但可以继续处理阻塞队列中保存的任务
    STOP 不能接受新任务,不处理队列中的任务,会中断正在处理任务的线程
    TIDYING 所有任务都已终止,workCount为0
    TERMINATED terminated()执行完后进入该状态

    线程池生命周期:

    参考:

  • 相关阅读:
    Elasticsearch的CURD、复杂查询、聚合函数、映射mappings
    Python操作Elasticsearch对象
    Python连接Elasticsearch
    Elasticsearch的分析过程,内置字符过滤器、分析器、分词器、分词过滤器(真是变态多啊!美滋滋)
    Elasticsearch的数据组织
    Elasticsearch背景初识
    Nginx之负载均衡
    linux常用命令
    百度地图API,定位您的当前位置
    使用gulp自动构建项目
  • 原文地址:https://www.cnblogs.com/truestoriesavici01/p/13217341.html
Copyright © 2011-2022 走看看