zoukankan      html  css  js  c++  java
  • 【Dubbo 源码解析】05_Dubbo 服务发现&引用

    Dubbo 服务发现&引用

    Dubbo 引用的服务消费者最终会构造成一个 Spring 的 Bean,具体是通过 ReferenceBean 来实现的。它是一个 FactoryBean,所有的服务消费者 Bean 都通过它来生产。

    ReferenceBean#getObject() --> ReferenceConfig#get()

    ReferenceConfig 最终会创建一个动态代理类返回:

    private T createProxy(Map<String, String> map) {
        ......
        // assemble URL from register center's configuration
        // 从注册中心的配置组装 URL(服务发现)
        List<URL> us = loadRegistries(false);
        if (us != null && !us.isEmpty()) {
            for (URL u : us) {
                URL monitorUrl = loadMonitor(u);
                if (monitorUrl != null) {
                    map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                }
                urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
            }
        }
        ......
        if (urls.size() == 1) {
            // 创建 Invoker
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // use AvailableCluster only when register's cluster is available
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 当服务提供者有多个时,就创建一个 ClusterInvoker
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
        ......
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }

     

    服务发现

    dubbo 的服务发现,是通过从注册中心订阅服务提供者组装成 URL,然后通过 URL 创建出 Invoker 来实现的。

    服务引用

    Dubbo 的服务引用,实际上是为引用的接口创建一个 Proxy,这个 Proxy 的功能就是去执行 refprotocol.refer(interfaceClass, url) 创建出来的 Invoker。 当服务提供者有多个时,就创建一个 ClusterInvoker。Cluster 是一个 SPI 扩展点,默认使用的是 failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster。

    所以,Consumer 端服务调用的逻辑被封装在 refprotocol.refer(interfaceClass, url) 创建出来的 Invoker 上。通过之前 Dubbo Protocol & Filter 的学习,我们可以知道 refprotocol 是一个 Wrapped Protocol,refer() 方法创建出来的 Invoker 是被 Filter 包裹的一个 DubboInvoker。

    综上,Consumer 端服务调用的逻辑是:

    1. 执行 FailoverClusterInvoker#invoke(Invocation invocation) (负载均衡。当有多个 provider 时走这一步,没有的话,就跳过)

    2. 执行 Filter#invoke(Invoker<?> invoker, Invocation invocation) (所有 group="provider" 的 Filter)

    3. 执行 DubboInvoker#invoke(Invocation inv)

     

    服务消费端创建 tcp 连接

    refprotocol.refer(interfaceClass, url) 被调用时会去创建与 provider 的 tcp 连接。

    DubboProtocol#refer(Class<T> serviceType, URL url) 源码如下:

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker. 同时,创建一条与 provider 的 tcp 连接
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

    Dubbo 服务引用时,首先创建一条与 provider 的 tcp 连接 ExchangeClient,然后再创建一个 DubboInvoker。

    getClients(url) 最终会调用 initClient(URL url) 来创建一条新连接:

    private ExchangeClient initClient(URL url) {
        ......
        // 设置 codec = "dubbo"
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        ......
        ExchangeClient client;
        try {
            // connection should be lazy
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                // 创建一条新连接
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

    新连接创建过程:

    Exchangers.connect(url, requestHandler) --> ExchangeClient#connect(URL url, ExchangeHandler handler) --> Exchanger$Adaptive#connect(URL url, ExchangeHandler handler) --> HeaderExchanger#connect(URL url, ExchangeHandler handler) --> 返回 new HeaderExchangeClient(client, true)

    // HeaderExchanger.class
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        // 创建新连接;启动 dubbo 心跳
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    Transporters#connect(URL url, ChannelHandler... handlers) --> Transporter$Adaptive#connect(URL url, ChannelHandler handler) --> NettyTransporter#connect(URL url, ChannelHandler listener) --> 返回 new NettyClient(url, listener)

    编解码最终使用的是:com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec

     

    附:

    Dubbo 心跳(HeartBeatTask)的作用是:当检测到心跳超时的时候,自动重连

     

     

    官方如是说:

    引用服务

    1. 直连引用服务:

    在没有注册中心,直连提供者的情况下 [3]ReferenceConfig 解析出的 URL 的格式为:dubbo://service-host/com.foo.FooService?version=1.0.0

    基于扩展点自适应机制,通过 URL 的 dubbo:// 协议头识别,直接调用 DubboProtocolrefer() 方法,返回提供者引用。

    2. 从注册中心发现引用服务:

    在有注册中心,通过注册中心发现提供者地址的情况下 [4]ReferenceConfig 解析出的 URL 的格式为:registry://registry-host/org.apache.dubbo.registry.RegistryService?refer=URL.encode("consumer://consumer-host/com.foo.FooService?version=1.0.0")

    基于扩展点自适应机制,通过 URL 的 registry:// 协议头识别,就会调用 RegistryProtocolrefer()方法,基于 refer 参数中的条件,查询提供者 URL,如: dubbo://service-host/com.foo.FooService?version=1.0.0

    基于扩展点自适应机制,通过提供者 URL 的 dubbo:// 协议头识别,就会调用 DubboProtocolrefer()方法,得到提供者引用。

    然后 RegistryProtocol 将多个提供者引用,通过 Cluster 扩展点,伪装成单个提供者引用返回。

    服务消费者消费一个服务的详细过程


    上图是服务消费的主过程:

    首先 ReferenceConfig 类的 init 方法调用 Protocolrefer 方法生成 Invoker 实例(如上图中的红色部分),这是服务消费的关键。接下来把 Invoker 转换为客户端需要的接口(如:HelloWorld)。

    关于每种协议如 RMI/Dubbo/Web service 等它们在调用 refer 方法生成 Invoker 实例的细节和上一章节所描述的类似。

      

    如果想了解更多Dubbo源码的知识,请移步 Dubbo源码解读——通向高手之路 的视频讲解:
    http://edu.51cto.com/sd/2e565
  • 相关阅读:
    字符串 CSV解析 表格 逗号分隔值 通讯录 电话簿 MD
    Context Application 使用总结 MD
    RxJava RxPermissions 动态权限 简介 原理 案例 MD
    Luban 鲁班 图片压缩 MD
    FileProvider N 7.0 升级 安装APK 选择文件 拍照 临时权限 MD
    组件化 得到 DDComponent JIMU 模块 插件 MD
    gradlew 命令行 build 调试 构建错误 Manifest merger failed MD
    protobuf Protocol Buffers 简介 案例 MD
    ORM数据库框架 SQLite 常用数据库框架比较 MD
    [工具配置]requirejs 多页面,多入口js文件打包总结
  • 原文地址:https://www.cnblogs.com/kevin-yuan/p/10346558.html
Copyright © 2011-2022 走看看