zoukankan      html  css  js  c++  java
  • 多线程编程学习十一(ThreadPoolExecutor 详解).

    一、ThreadPoolExecutor 参数说明

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    • corePoolSize:核心线程池的大小。当提交一个任务到线程池时,核心线程池会创建一个核心线程来执行任务,即使其他核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于核心线程池基本大小时就不再创建。如果调用了线程池的 prestartAllCoreThreads() 方法,核心线程池会提前创建并启动所有核心线程。

    • workQueue:任务队列。当核心线程池中没有线程时,所提交的任务会被暂存在队列中。Java 提供了多种阻塞队列

    • maximumPoolSize:线程池允许创建的最大线程数。如果队列也满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的空闲线程执行任务。值得注意的是,如果使用了无界的任务队列则这个参数不起作用。

    • keepAliveTime:当线程池中的线程数大于 corePoolSize 时,keepAliveTime 为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。值得注意的是,如果使用了无界的任务队列则这个参数不起作用。

    • TimeUnit:线程活动保持时间的单位。

    • threadFactory:创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置符合业务的名字。

      // 依赖 guava
      new ThreadFactoryBuilder().setNameFormat("xx-task-%d").build();
      
    • handler:饱和策略。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。Java 提供了以下4种策略:

      • AbortPolicy:默认。直接抛出异常。
      • CallerRunsPolicy:只用调用者所在线程来运行任务。
      • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
      • DiscardPolicy:不处理,丢弃掉。

    tips: 一般我们称核心线程池中的线程为核心线程,这部分线程不会被回收;超过任务队列后,创建的线程为空闲线程,这部分线程会被回收(回收时间即 keepAliveTime)

    二、常见的 ThreadPoolExecutor 介绍

    Executors 是创建 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的工厂类。

    Java 提供了多种类型的 ThreadPoolExecutor,比较常见的有 FixedThreadPool、SingleThreadExecutor、CachedThreadPool等。

    FixedThreadPool

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    FixedThreadPool 被称为可重用固定线程数的线程池。可以看到 corePoolSize 和 maximumPoolSize 都被设置成了 nThreads;keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止;使用了阻塞队列 LinkedBlockingQueue 作为线程的工作队列(队列的容量为 Integer.MAX_VALUE)。

    FixedThreadPool 所存在的问题是,由于队列的容量为 Integer.MAX_VALUE,基本可以认为是无界的,所以 maximumPoolSize 和 keepAliveTime 参数都不会生效,饱和拒绝策略也不会执行,会造成任务大量堆积在阻塞队列中。

    FixedThreadPool 适用于为了满足资源管理的需求,而需要限制线程数量的应用场景。

    SingleThreadExecutor

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    SingleThreadExecutor 是使用单个线程的线程池。可以看到 corePoolSize 和 maximumPoolSize 被设置为1,其他参数与 FixedThreadPool 相同,所以所带来的风险也和 FixedThreadPool 一致,就不赘述了。

    SingleThreadExecutor 适用于需要保证顺序的执行各个任务。

    CachedThreadPool

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    CachedThreadPool 是一个会根据需要创建新线程的线程池。可以看到 corePoolSize 被设置为 0,所以创建的线程都为空闲线程;maximumPoolSize 被设置为 Integer.MAX_VALUE(基本可认为无界),意味着可以创建无限数量的空闲线程;keepAliveTime 设置为60L,意味着空闲线程等待新任务的最长时间为60秒;使用没有容量的 SynchronousQueue 作为线程池的工作队列。

    CachedThreadPool 所存在的问题是, 如果主线程提交任务的速度高于maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

    CachedThreadPool 适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

    三、自建 ThreadPoolExecutor 线程池

    鉴于上面提到的风险,我们更提倡使用 ThreadPoolExecutor 去创建线程池,而不用 Executors 工厂去创建。

    以下是一个 ThreadPoolExecutor 创建线程池的 Demo 实例:

    public class Pool {
    
        static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pool-task-%d").build();
        static ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
                200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
                threadFactory, new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 1. 无返回值的任务执行 -> Runnable
            executor.execute(() -> System.out.println("Hello World"));
            // 2. 有返回值的任务执行 -> Callable
            Future<String> future = executor.submit(() -> "Hello World");
            // get 方法会阻塞线程执行等待返回结果
            String result = future.get();
            System.out.println(result);
    
            // 3. 监控线程池
            monitor();
    
            // 4. 关闭线程池
            shutdownAndAwaitTermination();
    
            monitor();
        }
    
        private static void monitor() {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
            System.out.println("【线程池任务】线程池中曾经创建过的最大线程数:" + threadPoolExecutor.getLargestPoolSize());
            System.out.println("【线程池任务】线程池中线程数:" + threadPoolExecutor.getPoolSize());
            System.out.println("【线程池任务】线程池中活动的线程数:" + threadPoolExecutor.getActiveCount());
            System.out.println("【线程池任务】队列中等待执行的任务数:" + threadPoolExecutor.getQueue().size());
            System.out.println("【线程池任务】线程池已执行完任务数:" + threadPoolExecutor.getCompletedTaskCount());
        }
    
        /**
         * 关闭线程池
         * 1. shutdown、shutdownNow 的原理都是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程。
         * 2. shutdownNow:将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
         * 3. shutdown:将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
         */
        private static void shutdownAndAwaitTermination() {
            // 禁止提交新任务
            executor.shutdown();
            try {
                // 等待现有任务终止
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    // 取消当前正在执行的任务
                    executor.shutdownNow();
                    // 等待一段时间让任务响应被取消
                    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                        System.err.println("Pool did not terminate");
                    }
                }
            } catch (InterruptedException ie) {
                // 如果当前线程也中断,则取消
                executor.shutdownNow();
                // 保留中断状态
                Thread.currentThread().interrupt();
            }
        }
    }
    
    

    创建线程池需要注意以下几点:

    1. CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu+1 个线程。
    2. IO 密集型任务(数据库读写等)应配置尽可能多的线程,如配置 Ncpu*2 个线程。
    3. 优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。
    4. 建议使用有界队列。可以避免创建数量非常多的线程,甚至拖垮系统。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。

    四、ScheduledThreadPoolExecutor

    ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。

        public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
        }
    

    ScheduledThreadPoolExecutor 的功能与 Timer 类似,但功能更强大、更灵活。Timer 对应的是单个后台线程,而ScheduledThreadPoolExecutor 可以在构造函数中指定多个对应的后台线程数。

    Java 提供了多种类型的 ScheduledThreadPoolExecutor ,可以通过 Executors 创建,比较常见的有 ScheduledThreadPool、SingleThreadScheduledExecutor 等。适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。

    public class ScheduleTaskTest {
    
        static ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").build();
        static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5, threadFactory);
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 1. 延迟 3 秒后执行 Runnable 方法
            scheduledExecutorService.schedule(() -> System.out.println("Hello World"), 3000, TimeUnit.MILLISECONDS);
    
            // 2. 延迟 3 秒后执行 Callable 方法
            ScheduledFuture<String> scheduledFuture = scheduledExecutorService.schedule(() -> "Hello ScheduledFuture", 3000, TimeUnit.MILLISECONDS);
            System.out.println(scheduledFuture.get());
    
            // 3. 延迟 1 秒后开始每隔 3 秒周期执行。
            //    如果中间任务遇到异常,则禁止后续执行。
            //    固定的频率来执行某项任务,它不受任务执行时间的影响。到时间,就执行。
            scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("Hello ScheduleAtFixedRate"), 1, 3000, TimeUnit.MILLISECONDS);
    
            // 4. 延迟 1 秒后,每个任务结束延迟 3 秒后再执行下个任务。
            //    如果中间任务遇到异常,则禁止后续执行。
            //    受任务执行时间的影响,等待任务执行结束后才开始计算延迟。
            scheduledExecutorService.scheduleWithFixedDelay(() -> System.out.println("Hello ScheduleWithFixedDelay"), 1, 3000, TimeUnit.MILLISECONDS);
        }
    }
    

    ScheduledThreadPoolExecutor 的执行步骤大抵如下:

    1. 当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay()方 法时,会向 DelayedWorkQueue 队列添加 ScheduledFutureTask 任务。
    2. 线程池中的线程从 DelayedWorkQueue队列中获取执行时间已到达的 ScheduledFutureTask,然后执行任务。
    3. 线程修改 ScheduledFutureTask 任务的执行时间为下次将要被执行的时间。
    4. 线程把修改后的 ScheduledFutureTask 重新放回队列。
  • 相关阅读:
    SCI写作经典替换词,瞬间高大上!(转)
    最佳化常用测试函数 Optimization Test functions
    算法复杂度速查表
    VS 代码行统计
    CPLEX IDE 菜单栏语言设置( 中文 英文 韩文 等多国语言 设置)
    如何从PDF文件中提取矢量图
    Matlab无法打开M文件的错误( Undefined function or method 'uiopen' for input arguments of type 'char)
    visual studio 资源视图 空白 解决方案
    MFC DialogBar 按钮灰色不响应
    嗨翻C语言笔记(二)
  • 原文地址:https://www.cnblogs.com/jmcui/p/11552583.html
Copyright © 2011-2022 走看看