zoukankan      html  css  js  c++  java
  • SOFA 源码分析 — 自定义线程池原理

    前言

    在 SOFA-RPC 的官方介绍里,介绍了自定义线程池,可以为指定服务设置一个独立的业务线程池,和 SOFARPC 自身的业务线程池是隔离的。多个服务可以共用一个独立的线程池。

    API使用方式如下:

    UserThreadPool threadPool = new UserThreadPool();
    threadPool.setCorePoolSize(10);
    threadPool.setMaximumPoolSize(100);
    threadPool.setKeepAliveTime(200);
    threadPool.setPrestartAllCoreThreads(false);
    threadPool.setAllowCoreThreadTimeOut(false);
    threadPool.setQueueSize(200);
    
    UserThreadPoolManager.registerUserThread("com.alipay.sofa.rpc.quickstart.HelloService", threadPool);
    

    如上为 HelloService 服务设置了一个自定义线程池。

    在 SOFABoot 中如下使用:

    <bean id="customExcutor" class="com.alipay.sofa.rpc.server.UserThreadPool" init-method="init">
        <property name="corePoolSize" value="10" />
        <property name="maximumPoolSize" value="10" />
        <property name="queueSize" value="0" />
    </bean>
    
    <bean id="helloService" class="com.alipay.sofa.rpc.quickstart.HelloService"/>
    
    <sofa:service ref="helloService" interface="XXXService">
        <sofa:binding.bolt>
            <sofa:global-attrs thread-pool-ref="customExcutor"/>
        </sofa:binding.bolt>
    </sofa:service>
    

    那么实现原理是什么呢?

    一起来看看。

    源码分析

    关键代码:

    
    UserThreadPoolManager.
            registerUserThread("com.alipay.sofa.rpc.quickstart.HelloService", threadPool);
    
    

    UserThreadPoolManager 是一个用户自定义线程池管理器。里面包含一个 Map, key 是接口名称,value 是线程池(一个 UserThreadPool对象)。

    看看这个 UserThreadPool。

    很简单的一个类,封装了 JDK 的线程池。并初始化了一些线程池参数,比如:

    • corePoolSize = 10
    • maximumPoolSize = 100
    • keepAliveTime = 300000(线程回收时间(毫秒))
    • queueSize = 0
    • threadPoolName = "SofaUserProcessor" 线程名字
    • boolean allowCoreThreadTimeOut 是否关闭核心线程池
    • boolean prestartAllCoreThreads 是否初始化核心线程池
    • volatile ThreadPoolExecutor executor

    初始化的时候,默认参数不变,核心线程数 10,最大 100,默认不关闭核心线程池,默认不初始化线程池。默认是 SynchronousQueue 队列,此队列性能最高,也可以设置成阻塞队列,或者优先级队列。当然,这些都是可以改的。

    这个线程池什么时候回起作用呢?

    先说结论:当 Netty 读取数据(channelRead 方法)后,通过层层调用,会调用 RpcRequestProcessor 类的 process 方法。该方法会拿到上下文的 UserProcessor 对象(bolt 的话,实现类是 BoltServerProcessor),UserProcessor 有一个内部接口 ExecutorSelector,线程池选择器,该选择器定义了一个 select 方法,返回的是线程池,如果用户自定义了线程池,就会返回自定义线程池(方式:UserThreadPoolManager.getUserThread(service)),如果没有,返回系统线程池。

    来看看具体代码。

    RpcHandler 我们很熟悉了,就是 Netty 的 handler ,ChannelRead 方法中,会调用 RpcCommandHandler 的 handleCommand 方法,该方法会提交到线程池执行。任务内容是执行 process 方法。

    通过调用,最后会执行 RpcRequestProcessor 的 process 方法。调用栈如下:

    105 行会有如下判断:

    // to check whether get executor using executor selector
    if (null == userProcessor.getExecutorSelector()) {
        executor = userProcessor.getExecutor();
    } else {
        // in case haven't deserialized in io thread
        // it need to deserialize clazz and header before using executor dispath strategy
        if (!deserializeRequestCommand(ctx, cmd, RpcDeserializeLevel.DESERIALIZE_HEADER)) {
            return;
        }
        //try get executor with strategy
        executor = userProcessor.getExecutorSelector().select(cmd.getRequestClass(),
            cmd.getRequestHeader());
    }
    

    尝试获取线线程池选择器,如果是 null, 使用系统线程池,如果不是 null,调用选择器的 select 方法得到线程池,随后,使用这个线程执行任务。

    // Till now, if executor still null, then try default
    if (executor == null) {
        executor = (this.getExecutor() == null ? defaultExecutor : this.getExecutor());
    }
    
    // use the final executor dispatch process task
    executor.execute(new ProcessTask(ctx, cmd));
    

    那么这个 select 方法是如何实现的呢?目前仅有一个实现,BoltServerProcessor 的内部类 UserThreadPoolSelector。该方法逻辑如下:
    从 Header 中获取服务名称,根据服务名称调用 UserThreadPoolManager.getUserThread(service) ,如果返回值不是 null ,说明用户设置自定义线程池了,就返回该线程池。如果是 null,返回系统线程池。

    而 BoltServerProcessor 的 getExecutorSelector 判断规则如下:

        @Override
        public ExecutorSelector getExecutorSelector() {
            return UserThreadPoolManager.hasUserThread() ? executorSelector : null;
        }
    
        public static boolean hasUserThread() {
            return userThreadMap != null && userThreadMap.size() > 0;
        }
    
        public BoltServerProcessor(BoltServer boltServer) {
            this.boltServer = boltServer;
            this.executorSelector = new UserThreadPoolSelector(); // 支持自定义业务线程池
        }
    

    可以看到,BoltServerProcessor 默认就会创建一个内部类对象,只要 UserThreadPoolManager 里面的 Map 不是空,就会尝试调用 select 方法,如果通过服务名称找到缓存中的自定义线程池,就直接返回了。非常完美。

    需要注意一点,系统线程池只有一个,默认核心线程池大小 20 ,最大 200。貌似这也是 tomcat 的默认配置,因此,并发很高的时候,可能就需要用户使用自定义线程池了,能够显著提高并发量。

    总结

    好了,关于自定线程池的原理探究的差不多了,这个功能挺有用的,当系统并发很高的时候,或者某个服务很慢,不能让该服务影响其他服务,就可以使用自定线程池,将这些慢服务和其他服务隔离开。

    原理则是通过 UserThreadPoolManager 与 Server 进行交互,当 Server 执行任务的时候,会从当前的上下文中,找到与调用服务对应的线程池,如果有的话,就返回 UserThreadPoolManager 管理的线程池,如果没有,返回框架线程池。

    具体判断的代码在 Bolt 模块 com.alipay.remoting.rpc.protocol.RpcRequestProcessor 的 process 方法中。

    bye!!!

    作者:莫那鲁道

    出处: 博客园:http://www.cnblogs.com/stateis0/
                个人博客: thinkinjava.cn

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    LeetCode-102-二叉树的层序遍历
    LeetCode-086-分隔链表
    LeetCode-082-删除排序链表中的重复元素 II
    LeetCode-081-搜索旋转排序数组 II
    [leetcode]92. Reverse Linked List II反转链表2
    [leetcode]94. Binary Tree Inorder Traversal二叉树中序遍历
    [leetcode]100. Same Tree相同的树
    [leetcode]54. Spiral Matrix螺旋矩阵
    [leetcode]58. Length of Last Word最后一个词的长度
    [leetcode]41. First Missing Positive第一个未出现的正数
  • 原文地址:https://www.cnblogs.com/stateis0/p/8983550.html
Copyright © 2011-2022 走看看