zoukankan      html  css  js  c++  java
  • 线程池中的阻塞队列选择

    转载:https://zhuanlan.zhihu.com/p/32867181

    自从最近的某年某月某天起,线上服务开始变得不那么稳定。在高峰期,时常有几台机器的内存持续飙升,并且无法回收,导致服务不可用。

    例如GC时间采样曲线:

    和内存使用曲线:

    图中所示,18:50-19:00的阶段,已经处于服务不可用的状态了。上游服务的超时异常会增加,该台机器会触发熔断。熔断触发后,改台机器的流量会打到其他机器,其他机器发生类似的情况的可能性会提高,极端情况会引起所有服务宕机,曲线掉底。

    因为线上内存过大,如果采用 jmap dump的方式,这个任务可能需要很久才可以执行完,同时把这么大的文件存放起来导入工具也是一件很难的事情。再看JVM启动参数,也很久没有变更过 Xms, Xmx, -XX:NewRatio, -XX:SurvivorRatio, 虽然没有仔细分析程序使用内存情况,但看起来也无大碍。

    于是开始找代码,某年某天某月~ 嗯,注意到一段这样的代码提交:

    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void push2Kafka(Object msg) {
        executor.execute(new WriteTask(msg,  false));    
    }

    相关代码的完整功能是,每次线上调用,都会把计算结果的日志打到 Kafka,Kafka消费方再继续后续的逻辑。内存被耗尽可能有一个原因是,因为使用了 newFixedThreadPool 线程池,而它的工作机制是,固定了N个线程,而提交给线程池的任务队列是不限制大小的,如果Kafka发消息被阻塞或者变慢,那么显然队列里面的内容会越来越多,也就会导致这样的问题。

    为了验证这个想法,做了个小实验,把 newFixedThreadPool 线程池的线程个数调小一点,例如 1。果然压测了一下,很快就复现了内存耗尽,服务不可用的悲剧。

    最后的修复策略是使用了自定义的线程池参数,而非 Executors 默认实现解决了问题。下面就把线程池相关的原理和参数总结一下,避免未来踩坑。

    1. Java线程池

    虽然Java线程池理论,以及构造线程池的各种参数,以及 Executors 提供的默认实现之前研读过,不过线上还没有发生过线程池误用引发的事故,所以有必要把这些参数再仔细琢磨一遍。

    优先补充一些线程池的工作理论,有助于展开下面的内容。线程池顾名思义,就是由很多线程构成的池子,来一个任务,就从池子中取一个线程,处理这个任务。这个理解是我在第一次接触到这个概念时候的理解,虽然整体基本切入到核心,但是实际上会比这个复杂。例如线程池肯定不会无限扩大的,否则资源会耗尽;当线程数到达一个阶段,提交的任务会被暂时存储在一个队列中,如果队列内容可以不断扩大,极端下也会耗尽资源,那选择什么类型的队列,当队列满如何处理任务,都有涉及很多内容。线程池总体的工作过程如下图:

    线程池内的线程数的大小相关的概念有两个,一个是核心池大小,还有最大池大小。如果当前的线程个数比核心池个数小,当任务到来,会优先创建一个新的线程并执行任务。当已经到达核心池大小,则把任务放入队列,为了资源不被耗尽,队列的最大容量可能也是有上限的,如果达到队列上限则考虑继续创建新线程执行任务,如果此刻线程的个数已经到达最大池上限,则考虑把任务丢弃。

    在 java.util.concurrent 包中,提供了 ThreadPoolExecutor 的实现。

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
    } 

    既然有了刚刚对线程池工作原理对概述,这些参数就很容易理解了:

    corePoolSize- 核心池大小,既然如前原理部分所述。需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到corePoolSize。若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法。

    maximumPoolSize-池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程。

    keepAliveTime - 当线程数大于核心时,多于的空闲线程最多存活时间

    unit - keepAliveTime 参数的时间单位。

    workQueue - 当线程数目超过核心线程数时用于保存任务的队列。主要有3种类型的BlockingQueue可供选择:无界队列,有界队列和同步移交。将在下文中详细阐述。从参数中可以看到,此队列仅保存实现Runnable接口的任务。 别看这个参数位置很靠后,但是真的很重要,因为楼主的坑就因这个参数而起,这些细节有必要仔细了解清楚。

    threadFactory - 执行程序创建新线程时使用的工厂。

    handler - 阻塞队列已满且线程数达到最大值时所采取的饱和策略。java默认提供了4种饱和策略的实现方式:中止、抛弃、抛弃最旧的、调用者运行。将在下文中详细阐述。

    2. 可选择的阻塞队列BlockingQueue详解

    在重复一下新任务进入时线程池的执行策略: 
    如果运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于corePoolSize,则任务根本不会存入queue中,而是直接运行) 
    如果运行的线程大于等于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。 
    如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。 
    主要有3种类型的BlockingQueue:

    无界队列

    队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而楼主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。

    有界队列

    常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue与有界的LinkedBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。 
    使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。

    在我们的修复方案中,选择的就是这个类型的队列,虽然会有部分任务被丢失,但是我们线上是排序日志搜集任务,所以对部分对丢失是可以容忍的。

    同步移交队列

    如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

    3. 可选择的饱和策略RejectedExecutionHandler详解

    JDK主要提供了4种饱和策略供选择。4种策略都做为静态内部类在ThreadPoolExcutor中进行实现。

    3.1 AbortPolicy中止策略

    该策略是默认饱和策略。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
     } 

    使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

    3.2 DiscardPolicy抛弃策略

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }

    如代码所示,不做任何处理直接抛弃任务

    3.3 DiscardOldestPolicy抛弃旧任务策略

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
    } 

    如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

    3.4 CallerRunsPolicy调用者运行

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
    } 

    既不抛弃任务也不抛出异常,直接运行任务的run方法,换言之将任务回退给调用者来直接运行。使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。

    4. Java提供的四种常用线程池解析

    既然楼主踩坑就是使用了 JDK 的默认实现,那么再来看看这些默认实现到底干了什么,封装了哪些参数。简而言之 Executors 工厂方法Executors.newCachedThreadPool() 提供了无界线程池,可以进行自动线程回收;Executors.newFixedThreadPool(int) 提供了固定大小线程池,内部使用无界队列;Executors.newSingleThreadExecutor() 提供了单个后台线程。

    详细介绍一下上述四种线程池。

    4.1 newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    } 

    在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 
    初看该构造函数时我有这样的疑惑:核心线程池为0,那按照前面所讲的线程池策略新任务来临时无法进入核心线程池,只能进入 SynchronousQueue中进行等待,而SynchronousQueue的大小为1,那岂不是第一个任务到达时只能等待在队列中,直到第二个任务到达发现无法进入队列才能创建第一个线程? 
    这个问题的答案在上面讲SynchronousQueue时其实已经给出了,要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。因此即便SynchronousQueue一开始为空且大小为1,第一个任务也无法放入其中,因为没有线程在等待从SynchronousQueue中取走元素。因此第一个任务到达时便会创建一个新线程执行该任务。

    4.2 newFixedThreadPool

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
     }

    看代码一目了然了,线程数量固定,使用无限大的队列。再次强调,楼主就是踩的这个无限大队列的坑。

    4.3 newScheduledThreadPool

    创建一个定长线程池,支持定时及周期性任务执行。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    在来看看ScheduledThreadPoolExecutor()的构造函数

     public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        } 

    ScheduledThreadPoolExecutor的父类即ThreadPoolExecutor,因此这里各参数含义和上面一样。值得关心的是DelayedWorkQueue这个阻塞对列,在上面没有介绍,它作为静态内部类就在ScheduledThreadPoolExecutor中进行了实现。简单的说,DelayedWorkQueue是一个无界队列,它能按一定的顺序对工作队列中的元素进行排列。

    4.4 newSingleThreadExecutor

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
     } 

    首先new了一个线程数目为 1 的ScheduledThreadPoolExecutor,再把该对象传入DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的实现代码:

    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
                super(executor);
                e = executor;
    } 

    在看看它的父类

    DelegatedExecutorService(ExecutorService executor) { 
               e = executor; 
    } 

    其实就是使用装饰模式增强了ScheduledExecutorService(1)的功能,不仅确保只有一个线程顺序执行任务,也保证线程意外终止后会重新创建一个线程继续执行任务。

  • 相关阅读:
    PAT 甲级 1115 Counting Nodes in a BST (30 分)
    PAT 甲级 1114 Family Property (25 分)
    PAT 甲级 1114 Family Property (25 分)
    Python Ethical Hacking
    Python Ethical Hacking
    Python Ethical Hacking
    Python Ethical Hacking
    Python Ethical Hacking
    Python Ethical Hacking
    Python Ethical Hacking
  • 原文地址:https://www.cnblogs.com/brithToSpring/p/14053364.html
Copyright © 2011-2022 走看看