zoukankan      html  css  js  c++  java
  • 搞懂Dubbo服务发布与服务注册

    一.前言

      本文讲服务发布与服务注册,服务提供者本地发布服务,然后向注册中心注册服务,将服务实现类以服务接口的形式提供出去,以便服务消费者从注册中心查阅并调用服务。

      本文源码分析基于org.apache.dubbo:dubbo:2.7.2,服务端代码例子是上文的例子

      如果没有Dubbo SPI的基础知识,建议先看Dubbo SPI,否则源码怎么跳转的将毫无头绪

      Dubbo SPI:https://www.cnblogs.com/GrimMjx/p/10970643.html

    二.服务发布

    调用顺序

      首先讲一下大致的服务发布的调用顺序图,蓝色方法不分析,主要是起一个netty服务

    -org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent
      -org.apache.dubbo.config.ServiceConfig#export
        -org.apache.dubbo.config.ServiceConfig#doExport
          -org.apache.dubbo.config.ServiceConfig#doExportUrls
            -org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
              -org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
                -org.apache.dubbo.registry.integration.RegistryProtocol#export
                  -org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
                    -org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
                      -org.apache.dubbo.remoting.exchange.Exchangers#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.exchange.ExchangeHandler)
                        -org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
                          -org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)
                            -org.apache.dubbo.remoting.transport.netty.NettyTransporter#bind

                              ...

    源码分析

    1.org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent,这个方法就不多说了,注释写的很详细了

     1 /**
     2  * ServiceBean实现了ApplicationListener接口
     3  * 在IOC的容器的启动过程,当所有的bean都已经处理完成之后,spring ioc容器会发布ContextRefreshedEvent事件。
     4  * 此处就是接收到事件处理的逻辑,开始服务发布之旅
     5  *
     6  * @param event
     7  */
     8 @Override
     9 public void onApplicationEvent(ContextRefreshedEvent event) {
    10     // 是否已发布  &&  是否已经被取消发布
    11     if (!isExported() && !isUnexported()) {
    12         if (logger.isInfoEnabled()) {
    13             logger.info("The service ready on spring started. service: " + getInterface());
    14         }
    15         // 发布
    16         export();
    17     }
    18 }

    2.org.apache.dubbo.config.ServiceConfig#export,这里的checkAndUpdateSubConfigs主要做的就是检测标签合法,检测各种对象是否为空,为空则创建。之后判断是否可以发布和是否需要延迟发布,需要则延迟再doExport

     1 public synchronized void export() {
     2     // 检测<dubbo:service>的interface是否合法
     3     // 检查provider为空
     4     // 检查各种对象是否为空,为空则创建
     5     checkAndUpdateSubConfigs();
     6     if (!shouldExport()) {
     7         return;
     8     }
     9     // 是否延迟?
    10     if (shouldDelay()) {
    11         // 延迟
    12         DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    13     } else {
    14         doExport();
    15     }
    16 }

    3.org.apache.dubbo.config.ServiceConfig#doExport,注释写的很清楚,还是没有走到核心逻辑

     1 protected synchronized void doExport() {
     2     // 是否已经被取消发布
     3     if (unexported) {
     4         throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
     5     }
     6     // 是否已经发布,注意这个变量volatile修饰
     7     if (exported) {
     8         return;
     9     }
    10     exported = true;
    11     if (StringUtils.isEmpty(path)) {
    12         path = interfaceName;
    13     }
    14     doExportUrls();
    15 }

    4.org.apache.dubbo.config.ServiceConfig#doExportUrls,先加载要注册的url,然后遍历所有协议,发布服务并注册

     1 @SuppressWarnings({"unchecked", "rawtypes"})
     2 private void doExportUrls() {
     3     // 加载要注册的url
     4     // registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&pid=10516&registry=zookeeper&timestamp=1559889491339
     5     List<URL> registryURLs = loadRegistries(true);
     6     // for循环每个协议,发布服务并注册到注册中心
     7     for (ProtocolConfig protocolConfig : protocols) {
     8         String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
     9         ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
    10         ApplicationModel.initProviderModel(pathKey, providerModel);
    11         doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    12     }
    13 }

    5.org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol,看方法名字也知道,单一协议多导出服务。这个方法做的事情比较多,我这边拆成4个部分,第一部分、第二部分和第三部分都是在做填充map的事情,第三部分最后生成导出url。第四部分开始发布服务,里面会判断到底是发布服务并且注册到注册中心呢还是仅发布服务。

      1 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
      2     /** 第一部分开始 **/
      3     String name = protocolConfig.getName();
      4     // 为空或者为空字符串,默认dubbo协议
      5     if (StringUtils.isEmpty(name)) {
      6         name = DUBBO;
      7     }
      8 
      9     Map<String, String> map = new HashMap<String, String>();
     10     map.put(SIDE_KEY, PROVIDER_SIDE);
     11 
     12     appendRuntimeParameters(map);
     13     appendParameters(map, metrics);
     14     appendParameters(map, application);
     15     appendParameters(map, module);
     16     // remove 'default.' prefix for configs from ProviderConfig
     17     // appendParameters(map, provider, Constants.DEFAULT_KEY);
     18     appendParameters(map, provider);
     19     appendParameters(map, protocolConfig);
     20     appendParameters(map, this);
     21     // 上面的代码就是将版本,方法,各种配置放到map里去
     22     // 这里给出debug的时候的map对象
     23     // map.toString() = {side=provider, application=dubbo-server, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368}
     24     /** 第一部分结束 **/
     25 
     26     /** 第二部分开始 **/
     27     // 这段if里做的事情主要是检测<dubbo:method> 标签中的配置信息,填充map
     28     if (CollectionUtils.isNotEmpty(methods)) {
     29         for (MethodConfig method : methods) {
     30             // 添加MethodConfig到map中,key=方法名.属性 value=属性值
     31             // ex   sayHello.retries:2
     32             appendParameters(map, method, method.getName());
     33             String retryKey = method.getName() + ".retry";
     34             if (map.containsKey(retryKey)) {
     35                 String retryValue = map.remove(retryKey);
     36                 if ("false".equals(retryValue)) {
     37                     map.put(method.getName() + ".retries", "0");
     38                 }
     39             }
     40             
     41             // 获取ArgumentConfig列表
     42             List<ArgumentConfig> arguments = method.getArguments();
     43             if (CollectionUtils.isNotEmpty(arguments)) {
     44                 for (ArgumentConfig argument : arguments) {
     45                     // convert argument type
     46                     if (argument.getType() != null && argument.getType().length() > 0) {
     47                         Method[] methods = interfaceClass.getMethods();
     48                         // visit all methods
     49                         if (methods != null && methods.length > 0) {
     50                             for (int i = 0; i < methods.length; i++) {
     51                                 String methodName = methods[i].getName();
     52                                 // target the method, and get its signature
     53                                 if (methodName.equals(method.getName())) {
     54                                     Class<?>[] argtypes = methods[i].getParameterTypes();
     55                                     // one callback in the method
     56                                     if (argument.getIndex() != -1) {
     57                                         if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
     58                                             // 添加ArgumentConfig信息到map中
     59                                             appendParameters(map, argument, method.getName() + "." + argument.getIndex());
     60                                         } else {
     61                                             throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
     62                                         }
     63                                     } else {
     64                                         // multiple callbacks in the method
     65                                         for (int j = 0; j < argtypes.length; j++) {
     66                                             Class<?> argclazz = argtypes[j];
     67                                             if (argclazz.getName().equals(argument.getType())) {
     68                                                 appendParameters(map, argument, method.getName() + "." + j);
     69                                                 if (argument.getIndex() != -1 && argument.getIndex() != j) {
     70                                                     throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
     71                                                 }
     72                                             }
     73                                         }
     74                                     }
     75                                 }
     76                             }
     77                         }
     78                     } else if (argument.getIndex() != -1) {
     79                         appendParameters(map, argument, method.getName() + "." + argument.getIndex());
     80                     } else {
     81                         throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
     82                     }
     83 
     84                 }
     85             }
     86         } // end of methods for
     87     }
     88     /** 第二部分结束 **/
     89 
     90     /** 第三部分开始 **/
     91     if (ProtocolUtils.isGeneric(generic)) {
     92         map.put(GENERIC_KEY, generic);
     93         map.put(METHODS_KEY, ANY_VALUE);
     94     } else {
     95         String revision = Version.getVersion(interfaceClass, version);
     96         if (revision != null && revision.length() > 0) {
     97             map.put(REVISION_KEY, revision);
     98         }
     99 
    100         // 生成包装类
    101         String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
    102         // 添加方法到map中
    103         if (methods.length == 0) {
    104             logger.warn("No method found in service interface " + interfaceClass.getName());
    105             map.put(METHODS_KEY, ANY_VALUE);
    106         } else {
    107             map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
    108         }
    109     }
    110 
    111     // 添加token
    112     if (!ConfigUtils.isEmpty(token)) {
    113         if (ConfigUtils.isDefault(token)) {
    114             map.put(TOKEN_KEY, UUID.randomUUID().toString());
    115         } else {
    116             map.put(TOKEN_KEY, token);
    117         }
    118     }
    119 
    120     // export service
    121     // 此处map.toString() = {side=provider, application=dubbo-server, methods=hello, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368}
    122     String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    123     Integer port = this.findConfigedPorts(protocolConfig, name, map);
    124     // 此处url=dubbo://192.168.5.16:20880/com.grimmjx.edu.HelloService?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=com.grimmjx.edu.HelloService&methods=hello&pid=11917&side=provider&timeout=100&timestamp=1559973693109
    125     URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    126     if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
    127         url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
    128                 .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    129     }
    130     /** 第三部分结束 **/
    131 
    132     /** 第四部分开始 **/
    133     // 开始发布服务
    134     String scope = url.getParameter(SCOPE_KEY);
    135     // don't export when none is configured
    136     // //配置为none不暴露
    137     if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    138 
    139         // export to local if the config is not remote (export to remote only when config is remote)
    140         // 配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
    141         if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
    142             exportLocal(url);
    143         }
    144         // export to remote if the config is not local (export to local only when config is local)
    145         // 如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
    146         if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
    147             if (!isOnlyInJvm() && logger.isInfoEnabled()) {
    148                 logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
    149             }
    150             if (CollectionUtils.isNotEmpty(registryURLs)) {
    151                 // 发布服务
    152                 for (URL registryURL : registryURLs) {
    153                     //if protocol is only injvm ,not register
    154                     if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
    155                         continue;
    156                     }
    157                     url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
    158                     URL monitorUrl = loadMonitor(registryURL);
    159                     if (monitorUrl != null) {
    160                         url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
    161                     }
    162                     if (logger.isInfoEnabled()) {
    163                         logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
    164                     }
    165 
    166                     // For providers, this is used to enable custom proxy to generate invoker
    167                     String proxy = url.getParameter(PROXY_KEY);
    168                     if (StringUtils.isNotEmpty(proxy)) {
    169                         registryURL = registryURL.addParameter(PROXY_KEY, proxy);
    170                     }
    171 
    172                     // 生成Invoker
    173                     // Invoker是十分重要的对象,可向它发起invoke调用
    174                     Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
    175                     // 持有this和invoker
    176                     // 此处的invoker.getUrl()=registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D10738%26side%3Dprovider%26timeout%3D100%26timestamp%3D1559895851366&pid=10738&registry=zookeeper&timestamp=1559895851283
    177                     DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    178 
    179                     // 发布,并生成Exporter
    180                     Exporter<?> exporter = protocol.export(wrapperInvoker);
    181                     exporters.add(exporter);
    182                 }
    183 
    184                 // 没有注册中心,仅发布服务
    185             } else {
    186                 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
    187                 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    188 
    189                 Exporter<?> exporter = protocol.export(wrapperInvoker);
    190                 exporters.add(exporter);
    191             }
    192             /**
    193              * @since 2.7.0
    194              * ServiceData Store
    195              */
    196             MetadataReportService metadataReportService = null;
    197             if ((metadataReportService = getMetadataReportService()) != null) {
    198                 metadataReportService.publishProvider(url);
    199             }
    200         }
    201     }
    202     this.urls.add(url);
    203     /** 第四部分结束 **/
    204 }

    6.org.apache.dubbo.registry.integration.RegistryProtocol#export,为什么是RegistryProtocol?注释里写的很清楚了,这里的url是registry://开头的。

     1 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
     2     // 获取注册地址
     3     URL registryUrl = getRegistryUrl(originInvoker);
     4     // url to export locally
     5     // 获取provider url
     6     URL providerUrl = getProviderUrl(originInvoker);
     7     // Subscribe the override data
     8     // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
     9     //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    10     //  subscription information to cover.
    11     final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    12     final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    13     overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    14     // 要注册的url
    15     providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    16     //export invoker
    17     // 导出服务
    18     final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    19     // url to registry
        .... 38 }

      之后上面调用顺序图的蓝色部分的代码不做分析,主要是创建一个NettyServer(默认)。可自行研究

    三.服务注册

    调用顺序

      首先讲一下大致的服务注册的调用顺序图,我们只分析红色部分。

    -org.apache.dubbo.registry.integration.RegistryProtocol#register

      -org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry

        -org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry

          -org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry

      -org.apache.dubbo.registry.support.FailbackRegistry#register

        -org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister

    源码分析

    1.org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry,具体还要看下一步

     1 @Override
     2 public Registry getRegistry(URL url) {
     3     url = URLBuilder.from(url)
     4             .setPath(RegistryService.class.getName())
     5             .addParameter(INTERFACE_KEY, RegistryService.class.getName())
     6             .removeParameters(EXPORT_KEY, REFER_KEY)
     7             .build();
     8     String key = url.toServiceStringWithoutResolving();
     9     // Lock the registry access process to ensure a single instance of the registry
    10     LOCK.lock();
    11     try {
    12         // 缓存中获取
    13         Registry registry = REGISTRIES.get(key);
    14         if (registry != null) {
    15             return registry;
    16         }
    17         
    18         //create registry by spi/ioc
    19         // 用Dubbo SPI创建Registry
    20         registry = createRegistry(url);
    21         if (registry == null) {
    22             throw new IllegalStateException("Can not create registry " + url);
    23         }
    24         // 写入缓存
    25         REGISTRIES.put(key, registry);
    26         return registry;
    27     } finally {
    28         // Release the lock
    29         LOCK.unlock();
    30     }
    31 }

    2.org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry,创建一个ZooKeeperRegistry实例并返回

    1 @Override
    2 public Registry createRegistry(URL url) {
    3     return new ZookeeperRegistry(url, zookeeperTransporter);
    4 }

    3.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry,主要做的就是利用zk创建Zk客户端

     1 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
     2     super(url);
     3     if (url.isAnyHost()) {
     4         throw new IllegalStateException("registry address == null");
     5     }
     6     String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
     7     if (!group.startsWith(PATH_SEPARATOR)) {
     8         group = PATH_SEPARATOR + group;
     9     }
    10     this.root = group;
    11     zkClient = zookeeperTransporter.connect(url);
    12     zkClient.addStateListener(state -> {
    13         if (state == StateListener.RECONNECTED) {
    14             try {
    15                 recover();
    16             } catch (Exception e) {
    17                 logger.error(e.getMessage(), e);
    18             }
    19         }
    20     });
    21 }

    4.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister,连接好zk后,是不是就要创建服务提供者的节点了?所以这一步就是注册服务

    1 @Override
    2 public void doRegister(URL url) {
    3     try {
    4         // toUrlPath(url) = /dubbo/com.grimmjx.edu.HelloService/providers/dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D11917%26side%3Dprovider%26timeout%3D100%26timestamp%3D1559973693109
    5         zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    6     } catch (Throwable e) {
    7         throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    8     }
    9 }

    这一步之后,我们用zkCli来连接zk看一下节点数据

  • 相关阅读:
    4.24成果(冲刺1.7)
    4.23成果(冲刺1.6)
    4.22成果(冲刺1.5)
    4.21成果(冲刺1.4)
    4.20成果(冲刺1.3)
    4.19成果(冲刺1.2)
    4.18成果(冲刺1.1)
    计划会议
    需求评审
    电梯演讲
  • 原文地址:https://www.cnblogs.com/GrimMjx/p/10990970.html
Copyright © 2011-2022 走看看