zoukankan      html  css  js  c++  java
  • SOFA 源码分析 —— 服务发布过程

    前言

    SOFA 包含了 RPC 框架,底层通信框架是 bolt ,基于 Netty 4,今天将通过 SOFA—RPC 源码中的例子,看看他是如何发布一个服务的。

    示例代码

    下面的代码在 com.alipay.sofa.rpc.quickstart.QuickStartServer 类下。

    ServerConfig serverConfig = new ServerConfig()
        .setProtocol("bolt") // 设置一个协议,默认bolt
        .setPort(9696) // 设置一个端口,默认12200
        .setDaemon(false); // 非守护线程
    
    ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
        .setInterfaceId(HelloService.class.getName()) // 指定接口
        .setRef(new HelloServiceImpl()) // 指定实现
        .setServer(serverConfig); // 指定服务端
    
    providerConfig.export(); // 发布服务
    

    首先,创建一个 ServerConfig ,包含了端口,协议等基础信息,当然,这些都是手动设定的,在该类加载的时候,会自动加载很多配置文件中的服务器默认配置。比如 RpcConfigs 类,RpcRuntimeContext 上下文等。

    然后呢,创建一个 ProviderConfig,也是个 config,不过多继承了一个 AbstractInterfaceConfig 抽象类,该类是接口级别的配置,而 ServerConfig 是 服务器级别的配置。虽然都继承了 AbstractIdConfig。

    ProviderConfig 包含了接口名称,接口指定实现类,还有服务器的配置。

    最后,ProviderConfig 调用 export 发布服务。

    展示给我的 API 很简单,但内部是如何实现的呢?

    在看源码之前,我们思考一下:如果我们自己来实现,怎么弄?

    RPC 框架简单一点来说,就是使用动态代理和 Socket。

    SOFA 使用 Netty 来做网络通信框架,我们之前也写过一个简单的 Netty RPC,主要是通过 handler 的 channelRead 方法来实现。

    SOFA 是这么操作的吗?

    一起来看看。

    # 源码分析

    上面的示例代码其实就是 3 个步骤,创建 ServerConfig,创建 ProviderConfig,调用 export 方法。

    先看第一步,还是有点意思的。

    虽然是空构造方法,但 ServerConfig 的属性都是自动初始化的,而他的父类 AbstractIdConfig 更有意思了,父类有 1 个地方值得注意:

    static {
        RpcRuntimeContext.now();
    }
    

    熟悉类加载的同学都知道,这是为了主动加载 RpcRuntimeContext ,看名字是 RPC 运行时上下文,所谓上下文,大约就是我们人类聊天中的 "老地方" 的意思。

    这个上下文会在静态块中加载 Module(基于扩展点实现),注册 JVM 关闭钩子(类似 Tomcat)。还有很多配置信息。

    然后呢?创建 ProviderConfig 对象。这个类比上面的那个类多继承了一个 AbstractInterfaceConfig,接口级别的配置。比如有些方法我不想发布啊,比如权重啊,比如超时啊,比如具体的实现类啊等等,当然还需要一个 ServerConfig 的属性(注册到 Server 中啊喂)。

    最后就是发布了。export 方法。

    ProviderCofing 拥有一个 export 方法,但并不是直接就在这里发布的,因为他是一个 config,不适合在config 里面做这些事情,违背单一职责。

    SOFA 使用了一个 Bootstrap 类来进行操作。和大部分服务器类似,这里就是启动服务器的地方。因为这个类会多线程使用,比如并发的发布服务。而不是一个一个慢慢的发布服务。所以他不是单例的,而是和 Config 一起使用的,并缓存在 map 中。

    ProviderBootstrap 目前有 3 个实现:Rest,Bolt,Dubbo。Bolt 是他的默认实现。

    export 方法默认有个实现(Dubbo 的话就要重写了)。主要逻辑是执行 doExport 方法,其中包括延迟加载逻辑。

    而 doExport 方法中,就是 SOFA 发布服务的逻辑所在了。

    楼主将方法的异常处理逻辑去除,整体如下:

     private void doExport() {
            if (exported) {
                return;
            }
            String key = providerConfig.buildKey();
            String appName = providerConfig.getAppName();
            // 检查参数
            checkParameters();
            // 注意同一interface,同一uniqleId,不同server情况
            AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
            if (cnt == null) { // 没有发布过
                cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
            }
            int c = cnt.incrementAndGet();
            int maxProxyCount = providerConfig.getRepeatedExportLimit();
            if (maxProxyCount > 0) {
              // 超过最大数量,直接抛出异常
            }
            // 构造请求调用器
            providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
            // 初始化注册中心
            if (providerConfig.isRegister()) {
                List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
                if (CommonUtils.isNotEmpty(registryConfigs)) {
                    for (RegistryConfig registryConfig : registryConfigs) {
                        RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                    }
                }
            }
            // 将处理器注册到server
            List<ServerConfig> serverConfigs = providerConfig.getServer();
            for (ServerConfig serverConfig : serverConfigs) {
                Server server = serverConfig.buildIfAbsent();
                // 注册序列化接口
                server.registerProcessor(providerConfig, providerProxyInvoker);
                if (serverConfig.isAutoStart()) {
                    server.start();
                }
            }
    
            // 注册到注册中心
            providerConfig.setConfigListener(new ProviderAttributeListener());
            register();
    
            // 记录一些缓存数据
            RpcRuntimeContext.cacheProviderConfig(this);
            exported = true;
        }
    

    主要逻辑如下:

    1. 根据 providerConfig 创建一个 key 和 AppName。
    2. 检验同一个服务多次发布的次数。
    3. 创建一个 ProviderProxyInvoker, 其中包含了过滤器链,而过滤器链的最后一链就是对接口实现类的调用。
    4. 初始化注册中心,创建 Server(会有多个Server,因为可能配置了多个协议)。
    5. 将 config 和 invoker 注册到 Server 中。内部是将其放进了一个 Map 中。
    6. 启动 Server。启动 Server 其实就是启动 Netty 服务,并创建一个 RpcHandler,也就是 Netty 的 Handler,这个 RpcHandler 内部含有一个数据结构,包含接口级别的 invoker。所以,当请求进入的时候,RpcHandler 的 channelRead 方法会被调用,然后间接的调用 invoker 方法。
    7. 成功启动后,注册到注册中心。将数据缓存到 RpcRuntimeContext 的一个 Set 中。

    一起来详细看看。

    Invoker 怎么构造的?很简单,最主要的就是过滤器。关于过滤器,我们之前已经写过一篇文章了。不再赘述。

    关键看看 Server 是如何构造的。

    关键代码 serverConfig.buildIfAbsent(),类似 HashMap 的 putIfAbsent。如果不存在就创建。

    Server 接口目前有 2 个实现,bolt 和 rest。当然,Server 也是基于扩展的,所以,不用怕,可以随便增加实现。

    关键代码在 ServerFactory 的 getServer 中,其中会获取扩展点的 Server,然后,执行 Server 的 init 方法,我们看看默认 bolt 的 init 方法。

        @Override
        public void init(ServerConfig serverConfig) {
            this.serverConfig = serverConfig;
            // 启动线程池
            bizThreadPool = initThreadPool(serverConfig);
            boltServerProcessor = new BoltServerProcessor(this);
        }
    

    保存了 serverConfig 的引用,启动了一个业务线程池,创建了一个 BoltServerProcessor 对象。

    第一:这个线程池会在 Bolt 的 RpcHandler 中被使用,也就是说,复杂业务都是在这个线程池执行,不会影响 Netty 的 IO 线程。

    第二:BoltServerProcessor 非常重要,他的构造方法包括了当前的 BoltServer,所以他俩是互相依赖的。关键点来了:

    BoltServerProcessor 实现了 UserProcessor 接口,而 Bolt 的 RpcHandler 持有一个 Map<String, UserProcessor<?>>,所以,当 RpcHandler 被执行 channelRead 方法的时候,一定会根据接口名称找到对应的 UserProcessor,并执行他的 handlerRequest 方法。

    那么,RpcHandler 是什么时候创建并放置到 RpcHandler 中的呢?

    具体是这样的:在 server.start() 执行的时候,该方法会初始化 Netty 的 Server,在 SOFA 中,叫 RpcServer,将 BoltServerProcessor 放置到名叫 userProcessors 的 Map 中。然后,当 RpcServer 启动的时候,也就是 start 方法,会执行一个 init 方法,该方法内部就是设置 Netty 各种属性的地方,包括 Hander,其中有 2 行代码对我们很重要:

    final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);
    pipeline.addLast("handler", rpcHandler);
    

    创建了一个 RpcHandler,并添加到 pipeline 中,这个 Handler 的构造参数就是包含所有 BoltServerProcessor 的 Map。

    所以,总的流程就是:

    每个接口都会创建一个 providerConfig 对象,这个对象会创建对应的 invoker 对象(包含过滤器链),这两个对象都会放到 BoltServer 的 invokerMap 中,而 BoltServer 还包含其他对象,比如 BoltServerProcessor(继承 UserProcessor), RpcServer(依赖 RpcHandler)。当初始化 BoltServerProcessor 的时候,会传入 this(BoltServer),当初始化 RpcServer 的时候,会传入 BoltServerProcessor 到 RpcServer 的 Map 中。在 RpcHandler 初始化的时候,又会将 RpcServer 的 Map 传进自己的内部。完成最终的依赖。
    当请求进入,RpcHandler 调用对应的 UserProcessor 的 handlerRequest 方法,而该方法中,会调用对应的 invoker,invoker 调用过滤器链,知道调用真正的实现类。

    而大概的 UML 图就是下面这样的:

    image.png

    红色部分是 RPC 的核心,包含 Solt 的 Server,实现 UserProcessor 接口的 BoltServerProcessor,业务线程池,存储所有接口实现的 Map。

    绿色部分是 Bolt 的接口和类,只要实现了 UserProcessor 接口,就能将具体实现替换,也既是处理具体数据的逻辑。

    最后,看看关键类 BoltServerProcessor ,他是融合 RPC 和 Bolt 的胶水类。

    该类会注册一个序列化器替代 Bolt 默认的。handleRequest 方法是这个类的核心方法。有很多逻辑,主要看这里:

    // 查找服务
    Invoker invoker = boltServer.findInvoker(serviceName);
    // 真正调用
    response = doInvoke(serviceName, invoker, request);
    
    /**
     * 找到服务端Invoker
     *
     * @param serviceName 服务名
     * @return Invoker对象
     */
    public Invoker findInvoker(String serviceName) {
        return invokerMap.get(serviceName);
    }
    

    根据服务名称,从 Map 中找到服务,然后调用 invoker 的 invoker 方法。

    再看看 Netty 到 BoltServerProcessor 的 handlerRequest 的调用链,使用 IDEA 的 Hierarchy 功能,查看该方法,最后停留在 ProcessTast 中,一个 Runnable.

    image.png

    根据经验,这个类肯定是被放到线程池了。什么时候放的呢?看看他的构造方法的 Hierarchy。

    image.png

    从图中可以看到 ,Bolt 的 RpcHandler 的 channelRead 最终会调用 ProcessTask 的 构造方法。

    那么 BoltServer 的用户线程池什么时候使用呢?还是使用 IDEA 的 Hierarchy 功能。

    image.png

    其实也是在这个过程中,当用户没有设置线程池,则使用系统线程池。

    总结

    好了,关于 SOFA 的服务发布和服务的接收过程,就介绍完了,可以说,整个框架还是非常轻量级的。基本操作就是:内部通过在 Netty的 Handler 中保存一个存储服务实现的 Map 完成远程调用。

    其实和我们之前用 Netty 写的小 demo 类似。

    作者:莫那鲁道

    出处: 博客园:http://www.cnblogs.com/stateis0/
                个人博客: thinkinjava.cn

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    [原]将Oracle 中的blob导出到文件中
    [原]unique index和non unique index的区别
    lsattr/chattr
    [原]说不清楚是Oracle的Bug还是TSM的Bug
    [摘]Oracle限制某个数据库帐号只能在特定机器上连入数据库
    [原]复制Oracle Home所需要注意的几个问题
    [原]Oracle Data Guard 折腾记(二)
    配置vsFTP的虚拟用户认证
    [原]给Oracle 11g Interval分区进行重命名
    [摘]如何抓住蝴蝶效应中的那只蝴蝶
  • 原文地址:https://www.cnblogs.com/stateis0/p/8975294.html
Copyright © 2011-2022 走看看