zoukankan      html  css  js  c++  java
  • 线程池最佳实践

    简单演示一下如何使用线程池

    private static final int CORE_POOL_SIZE = 5;
        private static final int MAX_POOL_SIZE = 10;
        private static final int QUEUE_CAPACITY = 100;
        private static final Long KEEP_ALIVE_TIME = 1L;
    
        public static void main(String[] args) {
    
            //使用阿里巴巴推荐的创建线程池的方式
            //通过ThreadPoolExecutor构造函数自定义参数创建
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE,
                    MAX_POOL_SIZE,
                    KEEP_ALIVE_TIME,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                    new ThreadPoolExecutor.CallerRunsPolicy());
    
            for (int i = 0; i < 10; i++) {
                executor.execute(() -> {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("CurrentThread name:" + Thread.currentThread().getName() + "date:" + Instant.now());
                });
            }
            //终止线程池
            executor.shutdown();
            try {
                executor.awaitTermination(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Finished all threads");
        }

    1. 使用 ThreadPoolExecutor 的构造函数声明线程池

    1. 线程池必须手动通过 ThreadPoolExecutor 的构造函数来声明,避免使用Executors 类的 newFixedThreadPool 和 newCachedThreadPool ,因为可能会有 OOM 的风险。

    Executors 返回线程池对象的弊端如下:

    • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
    • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

    说白了就是:使用有界队列,控制线程创建数量。

    除了避免 OOM 的原因之外,不推荐使用 Executors 提供的两种快捷的线程池的原因还有:

    1. 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
    2. 我们应该显示地给我们的线程池命名,这样有助于我们定位问题。

    2.监测线程池运行状态

    你可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件。

    除此之外,我们还可以利用 ThreadPoolExecutor 的相关 API做一个简陋的监控。从下图可以看出, ThreadPoolExecutor提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等。

    下面是一个简单的 Demo。printThreadPoolStatus()会每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。

        /**
         * 打印线程池的状态
         *
         * @param threadPool 线程池对象
         */
        public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
            ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-thread-pool-status", false));
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                log.info("=========================");
                log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
                log.info("Active Threads: {}", threadPool.getActiveCount());
                log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
                log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
                log.info("=========================");
            }, 0, 1, TimeUnit.SECONDS);
        }

    3.建议不同类别的业务用不同的线程池

    很多人在实际项目中都会有类似这样的问题:我的项目中多个业务需要用到线程池,是为每个线程池都定义一个还是说定义一个公共的线程池呢?

    一般建议是不同的业务使用不同的线程池,配置线程池的时候根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,重心优化系统性能瓶颈相关的业务。

    我们再来看一个真实的事故案例! (本案例来源自:《线程池运用不当的一次线上事故》 ,很精彩的一个案例)

    上面的代码可能会存在死锁的情况,为什么呢?画个图给大家捋一捋。

    试想这样一种极端情况:

    假如我们线程池的核心线程数为 n,父任务(扣费任务)数量为 n,父任务下面有两个子任务(扣费任务下的子任务),其中一个已经执行完成,另外一个被放在了任务队列中。由于父任务把线程池核心线程资源用完,所以子任务因为无法获取到线程资源无法正常执行,一直被阻塞在队列中。父任务等待子任务执行完成,而子任务等待父任务释放线程池资源,这也就造成了 "死锁"。

    解决方法也很简单,就是新增加一个用于执行子任务的线程池专门为其服务。

    4.别忘记给线程池命名

    初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。

    默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

    给线程池里的线程命名通常有下面两种方式:

    **1.利用 guava 的 ThreadFactoryBuilder **

    ThreadFactory threadFactory = new ThreadFactoryBuilder()
                            .setNameFormat(threadNamePrefix + "-%d")
                            .setDaemon(true).build();
    ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)

     2.自己实现 ThreadFactor

    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author zfang
     * @date 2021/6/11 15:04
     */
    public class NamingThreadFactory implements ThreadFactory {
    
        private final AtomicInteger threadNum = new AtomicInteger();
        private final ThreadFactory delegate;
        private final String name;
    
        /**
         * 创建一个带名字的线程池生产工厂
         */
        public NamingThreadFactory(ThreadFactory delegate, String name) {
            this.delegate = delegate;
            this.name = name; // TODO consider uniqueness
        }
    
        @Override
        public Thread newThread(Runnable r) {
            Thread t = delegate.newThread(r);
            t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
            return t;
        }
    }

     shutdown

        public static void shutdownThreadPool(ExecutorService executor, Logger logger) {
            executor.shutdown();//停止接收新任务,原来的任务继续执行
            int retry = 3;
            while (retry > 0) {
                retry--;
                try {
                    //当前线程阻塞,直到:
                    //等所有已提交的任务(包括正在跑的和队列中等待的)执行完;
                    //或者 等超时时间到了(timeout 和 TimeUnit设定的时间);
                    //或者 线程被中断,抛出InterruptedException
                    if (executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
                        return;
                    }
                } catch (InterruptedException e) {
                    executor.shutdownNow();
                    Thread.interrupted();
                } catch (Throwable ex) {
                    if (logger != null) {
                        logger.error("ThreadPoolManager shutdown executor has error : ", ex);
                    }
                }
            }
            //立即停止线程池,正在跑的和正在等待的任务都停下了
            executor.shutdownNow();
        }
  • 相关阅读:
    汉字乱码、加密后结果字符串不一致
    msgpack和TParams互相转换
    unigui监听会话开始和结束
    System.JSON.Builders.pas
    保证最终一致性的模式
    使用 Delta Sharing 协议进行数据共享
    dremio 16 升级问题
    graylog 4.0 运行
    supabase 开源firebase 可选工具
    cube.js 最新playground 说明
  • 原文地址:https://www.cnblogs.com/xiaomaoyvtou/p/13375538.html
Copyright © 2011-2022 走看看