zoukankan      html  css  js  c++  java
  • 【Dubbo 源码解析】05_Dubbo 服务发现&引用

    Dubbo 服务发现&引用

    Dubbo 引用的服务消费者最终会构造成一个 Spring 的 Bean,具体是通过 ReferenceBean 来实现的。它是一个 FactoryBean,所有的服务消费者 Bean 都通过它来生产。

    ReferenceBean#getObject() --> ReferenceConfig#get()

    ReferenceConfig 最终会创建一个动态代理类返回:

    private T createProxy(Map<String, String> map) {
        ......
        // assemble URL from register center's configuration
        // 从注册中心的配置组装 URL(服务发现)
        List<URL> us = loadRegistries(false);
        if (us != null && !us.isEmpty()) {
            for (URL u : us) {
                URL monitorUrl = loadMonitor(u);
                if (monitorUrl != null) {
                    map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                }
                urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
            }
        }
        ......
        if (urls.size() == 1) {
            // 创建 Invoker
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // use AvailableCluster only when register's cluster is available
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 当服务提供者有多个时,就创建一个 ClusterInvoker
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
        ......
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }

     

    服务发现

    dubbo 的服务发现,是通过从注册中心订阅服务提供者组装成 URL,然后通过 URL 创建出 Invoker 来实现的。

    服务引用

    Dubbo 的服务引用,实际上是为引用的接口创建一个 Proxy,这个 Proxy 的功能就是去执行 refprotocol.refer(interfaceClass, url) 创建出来的 Invoker。 当服务提供者有多个时,就创建一个 ClusterInvoker。Cluster 是一个 SPI 扩展点,默认使用的是 failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster。

    所以,Consumer 端服务调用的逻辑被封装在 refprotocol.refer(interfaceClass, url) 创建出来的 Invoker 上。通过之前 Dubbo Protocol & Filter 的学习,我们可以知道 refprotocol 是一个 Wrapped Protocol,refer() 方法创建出来的 Invoker 是被 Filter 包裹的一个 DubboInvoker。

    综上,Consumer 端服务调用的逻辑是:

    1. 执行 FailoverClusterInvoker#invoke(Invocation invocation) (负载均衡。当有多个 provider 时走这一步,没有的话,就跳过)

    2. 执行 Filter#invoke(Invoker<?> invoker, Invocation invocation) (所有 group="provider" 的 Filter)

    3. 执行 DubboInvoker#invoke(Invocation inv)

     

    服务消费端创建 tcp 连接

    refprotocol.refer(interfaceClass, url) 被调用时会去创建与 provider 的 tcp 连接。

    DubboProtocol#refer(Class<T> serviceType, URL url) 源码如下:

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker. 同时,创建一条与 provider 的 tcp 连接
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

    Dubbo 服务引用时,首先创建一条与 provider 的 tcp 连接 ExchangeClient,然后再创建一个 DubboInvoker。

    getClients(url) 最终会调用 initClient(URL url) 来创建一条新连接:

    private ExchangeClient initClient(URL url) {
        ......
        // 设置 codec = "dubbo"
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        ......
        ExchangeClient client;
        try {
            // connection should be lazy
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                // 创建一条新连接
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

    新连接创建过程:

    Exchangers.connect(url, requestHandler) --> ExchangeClient#connect(URL url, ExchangeHandler handler) --> Exchanger$Adaptive#connect(URL url, ExchangeHandler handler) --> HeaderExchanger#connect(URL url, ExchangeHandler handler) --> 返回 new HeaderExchangeClient(client, true)

    // HeaderExchanger.class
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        // 创建新连接;启动 dubbo 心跳
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    Transporters#connect(URL url, ChannelHandler... handlers) --> Transporter$Adaptive#connect(URL url, ChannelHandler handler) --> NettyTransporter#connect(URL url, ChannelHandler listener) --> 返回 new NettyClient(url, listener)

    编解码最终使用的是:com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec

     

    附:

    Dubbo 心跳(HeartBeatTask)的作用是:当检测到心跳超时的时候,自动重连

     

     

    官方如是说:

    引用服务

    1. 直连引用服务:

    在没有注册中心,直连提供者的情况下 [3]ReferenceConfig 解析出的 URL 的格式为:dubbo://service-host/com.foo.FooService?version=1.0.0

    基于扩展点自适应机制,通过 URL 的 dubbo:// 协议头识别,直接调用 DubboProtocolrefer() 方法,返回提供者引用。

    2. 从注册中心发现引用服务:

    在有注册中心,通过注册中心发现提供者地址的情况下 [4]ReferenceConfig 解析出的 URL 的格式为:registry://registry-host/org.apache.dubbo.registry.RegistryService?refer=URL.encode("consumer://consumer-host/com.foo.FooService?version=1.0.0")

    基于扩展点自适应机制,通过 URL 的 registry:// 协议头识别,就会调用 RegistryProtocolrefer()方法,基于 refer 参数中的条件,查询提供者 URL,如: dubbo://service-host/com.foo.FooService?version=1.0.0

    基于扩展点自适应机制,通过提供者 URL 的 dubbo:// 协议头识别,就会调用 DubboProtocolrefer()方法,得到提供者引用。

    然后 RegistryProtocol 将多个提供者引用,通过 Cluster 扩展点,伪装成单个提供者引用返回。

    服务消费者消费一个服务的详细过程


    上图是服务消费的主过程:

    首先 ReferenceConfig 类的 init 方法调用 Protocolrefer 方法生成 Invoker 实例(如上图中的红色部分),这是服务消费的关键。接下来把 Invoker 转换为客户端需要的接口(如:HelloWorld)。

    关于每种协议如 RMI/Dubbo/Web service 等它们在调用 refer 方法生成 Invoker 实例的细节和上一章节所描述的类似。

      

    如果想了解更多Dubbo源码的知识,请移步 Dubbo源码解读——通向高手之路 的视频讲解:
    http://edu.51cto.com/sd/2e565
  • 相关阅读:
    MySQL隐式类型转换导致索引失效
    解决MySQL报错[Err] 1093
    二:C#对象、集合、DataTable与Json内容互转示例;
    一:Newtonsoft.Json 支持序列化与反序列化的.net 对象类型;
    Newtonsoft.Json 概述
    为什么Elasticsearch查询变得这么慢了?
    Elasticsearch 5.x 字段折叠的使用
    Elasticsearch 删除数据
    Linux环境下安装 ElasticHD
    ElasticHD Windows环境下安装
  • 原文地址:https://www.cnblogs.com/kevin-yuan/p/10346558.html
Copyright © 2011-2022 走看看