zoukankan      html  css  js  c++  java
  • 线程池的使用

    线程池的使用

    1、线程池的使用场景

    • 等待返回任务的结果的多步骤的处理场景, 批量并发执行任务,总耗时是单个步骤耗时最长的那个,提供整体的执行效率,

    • 最终一致性,异步执行任务,无需等待,快速返回

    2、线程池的关键参数说明

    一般情况下我们是通过ThreadPoolExecutor来构造我们的线程池对象的。

    * 阿里巴巴的开发规范文档是禁止直接使用Executors静态工厂类来创建线程池的,原因是

    【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
    的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
    说明: Executors 返回的线程池对象的弊端如下:
    (1) FixedThreadPool 和 SingleThreadPool :
    允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
    (2) CachedThreadPool 和 ScheduledThreadPool :
    允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
           
      }
    

    参数说明:

    • corePoolSize:核心线程数,线程池最低的线程数
    • maximumPoolSize:允许的最大的线程数
    • keepAliveTime:当前线程数超过corePoolSize的时候,空闲线程保留的时间
    • unit: keepAliveTime线程保留的时间的单位
    • workQueue: 任务缓冲区
    • threadFactory: 线程的构造工厂
    • handler: 线程池饱含时候的处理策略

    3、线程池的分类

    Java通过Executors提供四种线程池,分别为:

    • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
    • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    3.1、newCachedThreadPool
    public static ExecutorService newCachedThreadPool(){
        return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
    }
    

    它是一个可以无限扩大的线程池;

    • 它比较适合处理执行时间比较小的任务;

    • corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;

    • keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;

    • 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

    3.2、newFixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads){
        return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }
    
    • 它是一种固定大小的线程池;corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
    • keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
    • 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
    • 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
    3.3、ScheduledThreadPool
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
    • 定时任务的使用
    3.4、SingleThreadExecutor
    public static ExecutorService newSingleThreadExecutor(){
        return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }
    
    • 它只会创建一条工作线程处理任务;
    • 采用的阻塞队列为LinkedBlockingQueue;
    3.5、总结
    线程池 特点 建议使用场景
    newCachedThreadPool 1、线程数无上限
    2、空闲线程存活60s
    3、阻塞队列
    1、任务执行时间短
    2、任务要求响应时间短
    newFixedThreadPool 1、线程数固定
    2、无界队列
    1、任务比较平缓
    2、控制最大的线程数
    newScheduledThreadPool 核心线程数量固定、非核心线程数量无限制(闲置时马上回收) 执行定时 / 周期性 任务
    newSingleThreadExecutor 只有一个核心线程(保证所有任务按照指定顺序在一个线程中执行,不需要处理线程同步的问题) 不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作,文件操作等

    4、使用线程池容易出现的问题

    现象 原因
    整个系统影响缓慢,大部分504 1、为设置最大的线程数,任务积压过多,线程数用尽
    oom 1、队列无界或者size设置过大
    使用线程池对效率并没有明显的提升 1、线程池的参数设置过小,线程数过小或者队列过小,或者是服务器的cpu核数太低

    5、线程池的监控

    5.1、为什么要对线程池进行监控

    • 线程池中线程数和队列的类型及长度对线程会造成很大的影响,而且会争夺系统稀有资源,线程数。设置不当,或是没有最大的利用系统资源,提高系统的整体运行效率,或是导致整个系统的故障。典型的场景是线程数被占满,其他的请求无响应。或是任务积压过多,直接oom
    • 方便的排查线程中的故障以及优化线程池的使用

    5.2、监控的原理

    • 另起一个定时单线程数的线程池newSingleThreadScheduledExecutor

    • 调用scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)定时执行监控任务;

    • 定时任务内 通过ThreadPoolExecutor对象获取监控的对象信息,比如t线程池需要执行的任务数、线程池在运行过程中已完成的任务数、曾经创建过的最大线程数、线程池里的线程数量、线程池里活跃的线程数量、当前排队线程数

    • 根据预设的日志或报警策略,进行规则控制

    5.3、实现的细节

    定义线程池并启动监控

     /**
         * 定义线程池的队列的长度
         */
        private final Integer queueSize = 1000;
    
        /**
         * 定义一个定长的线程池
         */
        private ExecutorService executorService;
    
        @PostConstruct
        private void initExecutorService() {
            log.info(
                    "executorService init with param: threadcount:{} ,queuesize:{}",
                    systemConfig.getThreadCount(),
                    systemConfig.getThreadQueueSize());
            executorService =
                    new ThreadPoolExecutor(
                            systemConfig.getThreadCount(),
                            systemConfig.getThreadCount(),
                            0,
                            TimeUnit.MILLISECONDS,
                            new ArrayBlockingQueue(systemConfig.getThreadQueueSize()),
                            new BasicThreadFactory.Builder()
                                    .namingPattern("async-sign-thread-%d")
                                    .build(),
                            (r, executor) -> log.error("the async executor pool is full!!"));
    
            /** 启动线程池的监控 */
            ThreadPoolMonitoring threadPoolMonitoring = new ThreadPoolMonitoring();
            threadPoolMonitoring.init();
        }
    

    线程池的监控

    /**
         * 功能说明:线程池监控
         *
         * @params
         * @return <br>
         *     修改历史<br>
         *     [2019年06月14日 10:20:10 10:20] 创建方法by fengqingyang
         */
        public class ThreadPoolMonitoring {
            /** 用于周期性监控线程池的运行状态 */
            private final ScheduledExecutorService scheduledExecutorService =
                    Executors.newSingleThreadScheduledExecutor(
                            new BasicThreadFactory.Builder()
                                    .namingPattern("async thread executor monitor")
                                    .build());
    
            /**
             * 功能说明:自动运行监控
             *
             * @return <br>
             *     修改历史<br>
             *     [2019年06月14日 10:26:51 10:26] 创建方法by fengqingyang
             * @params
             */
            public void init() {
                scheduledExecutorService.scheduleAtFixedRate(
                        () -> {
                            try {
                                ThreadPoolExecutor threadPoolExecutor =
                                        (ThreadPoolExecutor) executorService;
                                /** 线程池需要执行的任务数 */
                                long taskCount = threadPoolExecutor.getTaskCount();
                                /** 线程池在运行过程中已完成的任务数 */
                                long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
                                /** 曾经创建过的最大线程数 */
                                long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
                                /** 线程池里的线程数量 */
                                long poolSize = threadPoolExecutor.getPoolSize();
                                /** 线程池里活跃的线程数量 */
                                long activeCount = threadPoolExecutor.getActiveCount();
                                /** 当前排队线程数 */
                                int queueSize = threadPoolExecutor.getQueue().size();
                                log.info(
                                        "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
                                        taskCount,
                                        completedTaskCount,
                                        largestPoolSize,
                                        poolSize,
                                        activeCount,
                                        queueSize);
    
                                /** 超过阀值的80%报警 */
                                if (activeCount >= systemConfig.getThreadCount() * 0.8) {
                                    log.error(
                                            "async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{},queueSize:{}",
                                            taskCount,
                                            completedTaskCount,
                                            largestPoolSize,
                                            poolSize,
                                            activeCount,
                                            queueSize);
                                    ;
                                }
                            } catch (Exception ex) {
                                log.error("ThreadPoolMonitoring service error,{}", ex.getMessage());
                            }
                        },
                        0,
                        30,
                        TimeUnit.SECONDS);
            }
        }
    

    6、需要注意的事项

    • 线程数要合理设置,一般建议值是核数的2倍。
    • 线程池队列的类型和长度要根据业特性合理设置
    • 不同的业务需要线程池隔离,避免相互影响
    • 未每个线程池增加特有的命名规范以及关键的日志,方便出问题排查和优化

    7、后续

    更多精彩,敬请关注, 程序员导航网 https://chenzhuofan.top

    .
  • 相关阅读:
    Promise.all和Promise.race区别,和使用场景
    使用Promise解决多层异步调用的简单学习【转】
    前端性能优化-缓存
    Node.js机制及原理理解初步【转】
    微信小程序 canvas 字体自动换行(支持换行符)
    百度地图-鼠标悬停样式
    文件I/O相关函数
    获取系统限制信息
    标准C头文件
    数据库系统小结:(不包括详细知识点,更像一个大纲)
  • 原文地址:https://www.cnblogs.com/chen-xing/p/11153678.html
Copyright © 2011-2022 走看看