zoukankan      html  css  js  c++  java
  • ElasticSearch 线程池类型分析之 ExecutorScalingQueue

    ElasticSearch 线程池类型分析之 ExecutorScalingQueue

    ElasticSearch 线程池类型分析之SizeBlockingQueue这篇文章中分析了ES的fixed类型的线程池。本文分析scaling类型的线程池,以及该线程池所使用的任务队列:ExecutorScalingQueue
    从ThreadPool类中可看出,scaling线程池主要用来执行ES的系统操作:FLUSH、FORCE_MERGE、REFRESH、SNAPSHOT...而fixed类型的线程池则执行用户发起的操作:SEARCH、INDEX、GET、WRITE。系统操作有什么特点呢?系统操作请求量小、可容忍一定的延时。从线程池的角度看,执行系统操作的任务不会被线程池的拒绝策略拒绝,而这正是由ExecutorScalingQueue任务队列和ForceQueuePolicy拒绝策略实现的。

    1,执行FLUSH、REFRESH这些操作的线程池是如何创建的?

    org.elasticsearch.common.util.concurrent.EsExecutors.newScaling

        public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
            ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
            EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder);
            queue.executor = executor;
            return executor;
        }
    

    线程池对象是 EsThreadPoolExecutor、任务队列是 ExecutorScalingQueue、拒绝策略是 ForceQueuePolicy

    2,ForceQueuePolicy 的任务拒绝处理逻辑是什么?

    ForceQueuePolicy和ExecutorScalingQueue都是org.elasticsearch.common.util.concurrent.EsExecutors.EsExecutors 的内部类。EsExecutors是一个工具类,用来创建ThreadPoolExecutor对象。

    org.elasticsearch.common.util.concurrent.EsExecutors.newScaling
    org.elasticsearch.common.util.concurrent.EsExecutors.newFixed
    org.elasticsearch.common.util.concurrent.EsExecutors.newAutoQueueFixed
    再加上 private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService()... ES中所有的线程池对象都由EsExecutors创建了。

    当向 EsThreadPoolExecutor 提交任务时,如果触发了拒绝策略,则会执行如下的rejectedExecution方法:将任务再添加到任务队列中。

        /**
         * A handler for rejected tasks that adds the specified element to this queue,
         * waiting if necessary for space to become available.
         */
        static class ForceQueuePolicy implements XRejectedExecutionHandler {
    
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    // force queue policy should only be used with a scaling queue
                    assert executor.getQueue() instanceof ExecutorScalingQueue;
                    //将被"拒绝"的任务再put到任务队列中
                    executor.getQueue().put(r);
                } catch (final InterruptedException e) {
                    // a scaling queue never blocks so a put to it can never be interrupted
                    throw new AssertionError(e);
                }
            }
            //因为任务不会被拒绝,所以这里的被拒绝的任务计数总是返回0
            @Override
            public long rejected() {
                return 0;
            }
    
        }
    

    3, 任务队列

    ExecutorScalingQueue 继承了LinkedTransferQueue,所以是一个无界队列。它和 SizeBlockingQueue 所不同的是:SizeBlockingQueue的容量是有限制的,而ExecutorScalingQueue没有长度限制,这意味着可以将任意多个任务提交到 ExecutorScalingQueue中排队等待,这与它一起搭配使用的拒绝策略ForceQueuePolicy是吻合的。同时,这也表明FLUSH、REFRESH、SNAPSHOT等这些操作都不会被拒绝,不过这些操作的执行频率都很低
    试想,对于SEARCH(搜索请求)、INDEX(索引文档请求)、WRITE(添加文档请求)这些由用户触发的操作,可能QPS会非常大,而REFRESH(刷新段segment)、FLUSH这样的操作是系统层面的操作,执行频率很低。因此分开交由不同的线程池处理是非常有必要的,这样就可以为线程池配置不同的特点(有界队列、无界队列)的任务队列以及拒绝处理策略了。

    在任务入队列时,ExecutorScalingQueue的offer方法先判断线程池中是否有空闲线程,若有空闲线程,tryTransfer方法会立即成功返回true,任务直接交由线程处理而不需要入队列再排队等待了
    这里也可以看出: LinkedBlockingQueue 与 LinkedTransferQueue 的区别,我想这也是为什么ES选择LinkedTransferQueue作为任务队列的原因之一吧。若线程池中没有空闲的线程,再判断线程池中当前已有线程数量是否达到了最大线程数量(max pool size),若未达到,则新建线程来处理任务,否则任务就进入队列排队等待处理,而由于ExecutorScalingQueue是个无界队列,没有长度限制,而REFRESH这样的操作又没有低响应时间要求,因此长时间排队也能够接受。

            /**
             * ExecutorScalingQueue 必须与 ForceQueuePolicy 拒绝策略搭配使用.
             *
             * 采用 ExecutorScalingQueue 作为任务队列的线程池它的 core pool size 和 max pool size 可以不相等
             * 当不断地向线程池提交任务,线程的个数达到了core pool size但尚未达到 max pool size时, left大于0成立,返回false
             * 触发 ThreadPoolExecutor#execute方法中if语句 workQueue.offer(command) 为false,从而导致if语句不成立
             * 于是执行 addWorker 方法创建新线程来执行任务,如果 addWorker 不小心失败了,会执行 rejected(command),但是这个任务是不能
             * 被拒绝的,因为我们只是想让 线程池 优先创建 max pool size个线程来处理任务.
             * 于是采用 ForceQueuePolicy 保证任务一定是提交到队列里,从而保证任务"不被拒绝"
             * @param e
             * @return
             */
        static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
    
            ThreadPoolExecutor executor;
    
            ExecutorScalingQueue() {
            }
    
            @Override
            public boolean offer(E e) {
                // first try to transfer to a waiting worker thread
                //如果线程池中有空闲的线程,tryTransfer会立即成功,直接将任务交由线程处理(省去了任务的排队过程)
                if (!tryTransfer(e)) {
                    // check if there might be spare capacity in the thread
                    // pool executor
                    int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
                    if (left > 0) {
                        //线程池当前已有的线程数量尚未达到 max pool size, 返回false, 触发ThreadPoolExecutor的addWorker方法被调用,从而创建新线程
                        // reject queuing the task to force the thread pool
                        // executor to add a worker if it can; combined
                        // with ForceQueuePolicy, this causes the thread
                        // pool to always scale up to max pool size and we
                        // only queue when there is no spare capacity
                        return false;
                    } else {
                        //线程池当前已有的线程数量 已经是 max pool size了, 任务入队列排队等待
                        return super.offer(e);
                    }
                } else {
                    return true;
                }
            }
        }
    

    总结:

    本文分析了 ES中FLUSH、FORCE_MERGE、REFRESH、SNAPSHOT...操作所使用的线程池及其任务队列、拒绝策略。理解线程池的实现原理有助于各种操作的调优,有时候写数据到ES或执行大量的查询请求时,可能会发现ES的日志里面有一些操作被拒绝的提示,这时,就能针对性地去调整线程池的配置了。
    不管是refresh刷新segment,还是 snapshot 快照备份,这些操作可理解为"系统操作",这与用户操作(search、get)是有区别的:write/get 需要良好的响应时间,这意味着任务不能长时间排队太久。write/get 请求量可能非常大、QPS非常高,需要一些限制,所以这也是为什么它们的任务队列容量是固定的,当wirte/get的请求量大到处理不过来时,就会触发拒绝策略,任务被拒绝执行了。而对于refresh这类操作,执行不是太频繁,有些系统操作还很重要,这种任务提交时就不能被拒绝,因此ForcePolicy是一个很好的选择。从这里也可以看出:在一个大系统里面,有各种类型的操作,因此有必要使用多个线程池来分别处理这些操作。而如何协调统一管理多个线程池(EsExecutors类、ExecutorBuilder类),及时回收空闲线程,设置合适的任务队列长度(各种类型的任务队列:ExecutorScalingQueue、SizeBlockingQueue、ResizableBlockingQueue),将所有的任务处理操作都统一到一套代码流程逻辑(AbstractRunnable类、EsThreadPoolExecutor类的doExecute()方法)下执行,这些都需要很强的编码能力。
    最后,提一下search操作,很特殊。ES主要是用来做搜索的,那么负责执行search操作的线程池是如何实现的呢?它又采用了什么任务队列呢?它的拒绝策略又是什么呢?提前透露一下:search操作的线程池的任务队列可动态调整任务队列的长度,并且以一种十分巧妙的方式统计每个任务的执行时间。读完源码之后,感叹这些代码的设计思路是那么优美。

    参考文章:
    ElasticSearch 线程池类型分析之SizeBlockingQueue

    ES index操作 剖析

    原文:https://www.cnblogs.com/hapjin/p/11005676.html

  • 相关阅读:
    spark streaming 概述
    spark sql 的性能调优
    LeetCode 106. Construct Binary Tree from Inorder and Postorder Traversal (用中序和后序树遍历来建立二叉树)
    LeetCode 105. Construct Binary Tree from Preorder and Inorder Traversal (用先序和中序树遍历来建立二叉树)
    LeetCode 90. Subsets II (子集合之二)
    LeetCode 88. Merge Sorted Array(合并有序数组)
    LeetCode 81. Search in Rotated Sorted Array II(在旋转有序序列中搜索之二)
    LeetCode 80. Remove Duplicates from Sorted Array II (从有序序列里移除重复项之二)
    LeetCode 79. Word Search(单词搜索)
    LeetCode 78. Subsets(子集合)
  • 原文地址:https://www.cnblogs.com/hapjin/p/11005676.html
Copyright © 2011-2022 走看看