zoukankan      html  css  js  c++  java
  • dubbo源码—service reference

    service reference

    在编写好服务之后,dubbo会将服务export出去,这个时候就可以编写consumer来调用这个服务了。dubbo作为一个rpc框架,使用者使用远程服务和使用本地服务是类似的,不用关心远程服务在哪里,怎么引用的,因为dubbo包含了自动发现和引用服务的功能。

    dubbo引用服务主要工作:

    • 创建proxy和Invoker(DubboInvoker里面会启动NettyClient)
    • 将consumer注册到注册中心
    • 订阅configurators、providers、routers

    通过Java代码引用

    ReferenceConfig<TestDubboService> reference = new ReferenceConfig();
    reference.setApplication(new ApplicationConfig(appName));
    reference.setRegistry(new RegistryConfig(dubboRegistry));
    reference.setInterface(TestDubboService.class);
    reference.setTimeout(timeout);
    TestDubboService service = (TestDubboService)reference.get();
    

    使用Java代码配置很明显,直接使用ReferenceConfig.get获取一个proxy

    通过spring配置引用

    <dubbo:reference interface="com.test.service.TestDubboService" id="testDubboService" />

    spring解析该dubbo自定义标签的时候(请提前学习spring如何解析自定义标签),会初始化ReferenceBean,该bean是一个factoryBean并且继承自ReferenceConfig,在getBean方法中调用了ReferenceConfig.get,接下来的方式就和上面“使用Java代码引用”一致了。所以dubbo引用服务的工作就主要在于如何创建proxy。

    ReferenceConfig初始化

    ReferenceConfig的主要作用是配置并引用远程服务,创建远程服务的本地代理。ReferenceBean继承自ReferenceConfig,ReferenceConfig是一个FactoryBean ,实现了getObject方法,在spring容器初始化完成的时候会初始化配置为非lazyInit的bean,也就会调用ReferenceBean.getObject方法,里面会调用ReferenceConfig.get方法,从而触发ServiceConfig的初始化方法ServiceConfig.init。

    inti方法的主要逻辑是:

    1. 判断是否已经初始化,get方法是同步方法,所以只需直接判断标志位initialized即可
    2. 判断配置的interface是否正确
    3. 判断-D参数配置或者配置文件是否配置是直连提供者
    4. 配置application、module、registries、monitor等
    5. 检查stub和mock配置(类似provider端的检查)
    6. 搜集需要配置到URL中的参数,先将参数收集到map中,URL参数在refer的过程中极其重要,dubbo中的所有配置几乎都是靠URL传递,从URL中获取或者设置到URL中
    7. 创建远程服务的本地代理,createProxy

    如何创建proxy

    由于是远程服务,consumer需要有一个代理来处理consumer发起的远程过程调用。dubbo通过远程调用的可执行体Invoker的代理来实现。接下来主要就是先创建Invoker,然后创建Invoker的proxy。

    创建Invoker调用堆栈如下

    createProxy的主要功能:

    1. 判断ReferenceConfig中是否配置了url,如果配置了url,则不从registry中获取,直接使用配置的url。
    2. 没有配置url,从registry配置拼装url
    3. 根据上面配置好的URL来refer对应的服务,创建远程服务的可执行体Invoker
    4. 将所有的invokers聚合成一个可执行实体MockClusterInvoker
    5. 给MockClusterInvoker创建一个代理类,这个代理类就是我们在consumer端使用的远程服务代理,该代理实现了对应的service接口,对应的InvocationHandler就是作为代理类构造方法入参的MockClusterInvoker,在后面一节分析中会说明consumer怎么发起调用

    下面主要说说Invoker的创建过程:

    1. 和provider一样,会调用ProtocolFilterWrapper#refer和ProtocolListenerWrapper#refer,分别构造filter链和调用对应的listener.referred
    2. RegistryProtocol#refer会判断是否配置了group
    3. 根据上面的调用堆栈会调用ZookeeperRegistry#doSubscribe,该方法中会订阅providers、configurators等,并通过notify方法来调用到AbstractRegistry#notify,里面会针对每一个category调用对应listener.notify,consumer端的listener是RegistryDirectory,所以这里会调用RegistryDirectory#notify
    4. RegistryDirectory#notify,这个方法也是registry对应节点变化后监听的listener,会对每一种监听的节点类型做处理,这里先只看provider的处理,调用refreshInvoker方法
    5. refreshInvoker方法就是将配置好的url转换为Invoker,如果转化后的invoker至少有一个,并且少于原来的invoker(缓存的invoker),则会把废弃的invoker销毁掉(destroy)

    这里具体说明一下consumer订阅,ZookeeperRegistry#doSubscribe中会将url中配置的category取出来拼接成registry的目录节点形式,然后订阅这些节点

    // RegistryProtocol类
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
      // ... 省略中间代码
      // 这里调用的是RegistryDirectory.subscribe方法
      // 在这里将consumer端想要订阅的category添加到url,包括providers,configurators,routers
      directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                                                    Constants.PROVIDERS_CATEGORY 
                                                    + "," + Constants.CONFIGURATORS_CATEGORY 
                                                    + "," + Constants.ROUTERS_CATEGORY));
      return cluster.join(directory);
    }
    
    // ZookeeperRegistry在doSubscribe调用自己的下面这个方法,将URL中的category转化为registry中的目录对应的url
    private String[] toCategoriesPath(URL url) {
      String[] categroies;
      if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
        // 如果配置的category是*,则取所有的category:providers,consumer,routers,configurators
        categroies = new String[] {Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY, 
                                   Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
      } else {
        categroies = url.getParameter(Constants.CATEGORY_KEY, new String[] {Constants.DEFAULT_CATEGORY});
      }
      String[] paths = new String[categroies.length];
      for (int i = 0; i < categroies.length; i ++) {
        // 将category拼接成registry中的目录形式,类似:/dubbo/com.test.service.TestDubboService/providers
        paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i];
      }
      return paths;
    }
    
    
    protected void doSubscribe(final URL url, final NotifyListener listener) {
      try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
    		// ... 省略中间代码
        } else {
          List<URL> urls = new ArrayList<URL>();
          for (String path : toCategoriesPath(url)) {
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
              // 如果之前该路径没有添加过listener,则创建一个map来放置listener
              zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
              listeners = zkListeners.get(url);
            }
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
              // 如果没有添加过对于子节点的listener,则创建
              listeners.putIfAbsent(listener, new ChildListener() {
                public void childChanged(String parentPath, List<String> currentChilds) {
                  ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                }
              });
              zkListener = listeners.get(listener);
            }
            zkClient.create(path, false);
            // 添加listener到该目录及其子节点
            List<String> children = zkClient.addChildListener(path, zkListener);
            if (children != null) {
              urls.addAll(toUrlsWithEmpty(url, path, children));
            }
          }
          // 这个方法本身会导致监听的目录及其子节点变化,直接调用notify
          notify(url, listener, urls);
        }
      } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
      }
    }
    

    上面是consumer的订阅部分的源码,在consumer订阅的时候会调用FailbackRegistry#notify,接下来就是将url转换为Invoker,接下来的调用链路可以参考上面方法调用堆栈的图,转化的主要代码为:

    com.alibaba.dubbo.registry.integration.RegistryDirectory#refreshInvoker
    com.alibaba.dubbo.registry.integration.RegistryDirectory#toInvokers
    

    这两个方法中的源代码注释较为详细了就不再赘述了。

    在toInvokers方法中会调用DubboProtocol#refer,在该方法中启动NettyClient。

    总结

    provider提供服务后,consumer端就可以找到并引用该服务,接下来就可以像使用本地服务一样使用该服务了,发起远程该过程调用。

  • 相关阅读:
    关于深拷贝和浅拷贝的理解
    Oracle数据库入门
    单例模式入门
    oracle存储过程 (还没动手实践、剩余内容找时间在处理、游标还没接触)
    Oracle用户名及默认密码
    oracle数据库怎么创建数据库实例
    Java 强、弱、软、虚,你属于哪一种?
    内存溢出和内存泄漏的区别
    aop中execution 表达式
    JPA 中@Enumerated
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/8253974.html
Copyright © 2011-2022 走看看