zoukankan      html  css  js  c++  java
  • dubbo源码分析三:consumer注册及生成代理对象

    本章我们将分析一下consumer向注册中心注册,并获取服务端相应的信息,根据这些信息生产代理对象的过程和源码。

    1.类图 

    上图展示了部分消费者注册及生成代理对象过程中需要使用到的类和接口,其中:

        spring适配涉及到的类:DubboNamespaceHandler、DubboBeanDefinitionParser、ReferenceBean;

        配置信息存储:ReferenceConfig、RegistryConfig、MonitorConfig、ProtocolConfig、ConsumerConfig等;

        应用协议:Protocol、DubboProtocol、HessianProtocol、ThriftProtocol、RmiProtocol、AbstractProxyProtocol、AbstractProtocol等;

        注册相关:RegistryProtocol、RegistryFactory、Registry、ZookeeperRegistryFactory、ZookeeperRegistry等

        代理和集群相关:Proxy、JdcProxyFactory、AbstractProxyFactory、InvokerInvocationHandler、Cluster、FailoverCluster、FailoverClusterInvoker、AbstractClusterInvoker、Invoker等;

    2.时序图 

        我们通过时序图来分析一下在consumer注册及生产代理对象的过程中,上面的类是如何串联在一起的:

        a.spring容器通过DubboBeanDefinitionParser类的对象来解析xml文件中的标签,生成ReferenceConfig等配置对象;

        b.ReferenceConfig的init()等方法被调用;

        c.通过spi机制确定Protocol接口的实现对象为RegistryProtocol的对象,调用它的refer()方法;

        d.通过spi机制确定RegistryFactory接口的实现对象为ZookeeperRegistryFactory,调用它的getRegistry()方法,生产ZookeeperRegistry对象;

        e.调用RegistryProtocol对象的doRefer()方法后,并调用FailoverCluster的join()方法,生成FailoverClusterInvoker的对象;

        f.调用JdkProxyFactory的getProxy()方法,生成consumer使用接口的代理对象。

    3.核心代码  

      1 private void init() {
      2     if (initialized) {//判断是否初始化完毕
      3         return;
      4     }
      5     initialized = true;
      6     if (interfaceName == null || interfaceName.length() == 0) {
      7         throw new IllegalStateException("<dubbo:reference interface="" /> interface not allow null!");
      8     }
      9     // 获取消费者全局配置
     10     checkDefault();//检查
     11     appendProperties(this);
     12     if (getGeneric() == null && getConsumer() != null) {
     13         setGeneric(getConsumer().getGeneric());
     14     }
     15     if (ProtocolUtils.isGeneric(getGeneric())) {
     16         interfaceClass = GenericService.class;
     17     } else {
     18         try {
     19             interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
     20                     .getContextClassLoader());
     21         } catch (ClassNotFoundException e) {
     22             throw new IllegalStateException(e.getMessage(), e);
     23         }
     24         checkInterfaceAndMethods(interfaceClass, methods);
     25     }
     26     String resolve = System.getProperty(interfaceName);
     27     String resolveFile = null;
     28     if (resolve == null || resolve.length() == 0) {
     29         resolveFile = System.getProperty("dubbo.resolve.file");
     30         if (resolveFile == null || resolveFile.length() == 0) {
     31             File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
     32             if (userResolveFile.exists()) {
     33                 resolveFile = userResolveFile.getAbsolutePath();
     34             }
     35         }
     36         if (resolveFile != null && resolveFile.length() > 0) {
     37             Properties properties = new Properties();
     38             FileInputStream fis = null;
     39             try {
     40                 fis = new FileInputStream(new File(resolveFile));
     41                 properties.load(fis);
     42             } catch (IOException e) {
     43                 throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
     44             } finally {
     45                 try {
     46                     if(null != fis) fis.close();
     47                 } catch (IOException e) {
     48                     logger.warn(e.getMessage(), e);
     49                 }
     50             }
     51             resolve = properties.getProperty(interfaceName);
     52         }
     53     }
     54     if (resolve != null && resolve.length() > 0) {
     55         url = resolve;
     56         if (logger.isWarnEnabled()) {
     57             if (resolveFile != null && resolveFile.length() > 0) {
     58                 logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
     59             } else {
     60                 logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
     61             }
     62         }
     63     }
     64     if (consumer != null) {
     65         if (application == null) {
     66             application = consumer.getApplication();
     67         }
     68         if (module == null) {
     69             module = consumer.getModule();
     70         }
     71         if (registries == null) {
     72             registries = consumer.getRegistries();//获取注册信息
     73         }
     74         if (monitor == null) {
     75             monitor = consumer.getMonitor();//获取监控信息
     76         }
     77     }
     78     if (module != null) {
     79         if (registries == null) {
     80             registries = module.getRegistries();
     81         }
     82         if (monitor == null) {
     83             monitor = module.getMonitor();
     84         }
     85     }
     86     if (application != null) {
     87         if (registries == null) {
     88             registries = application.getRegistries();
     89         }
     90         if (monitor == null) {
     91             monitor = application.getMonitor();
     92         }
     93     }
     94     checkApplication();
     95     checkStubAndMock(interfaceClass);
     96     Map<String, String> map = new HashMap<String, String>();
     97     Map<Object, Object> attributes = new HashMap<Object, Object>();
     98     map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
     99     map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
    100     map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    101     if (ConfigUtils.getPid() > 0) {
    102         map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    103     }
    104     if (! isGeneric()) {
    105         String revision = Version.getVersion(interfaceClass, version);
    106         if (revision != null && revision.length() > 0) {
    107             map.put("revision", revision);
    108         }
    109  
    110         String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
    111         if(methods.length == 0) {
    112             logger.warn("NO method found in service interface " + interfaceClass.getName());
    113             map.put("methods", Constants.ANY_VALUE);
    114         }
    115         else {
    116             map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
    117         }
    118     }
    119     map.put(Constants.INTERFACE_KEY, interfaceName);
    120     appendParameters(map, application);
    121     appendParameters(map, module);
    122     appendParameters(map, consumer, Constants.DEFAULT_KEY);
    123     appendParameters(map, this);
    124     String prifix = StringUtils.getServiceKey(map);
    125     if (methods != null && methods.size() > 0) {
    126         for (MethodConfig method : methods) {
    127             appendParameters(map, method, method.getName());
    128             String retryKey = method.getName() + ".retry";
    129             if (map.containsKey(retryKey)) {
    130                 String retryValue = map.remove(retryKey);
    131                 if ("false".equals(retryValue)) {
    132                     map.put(method.getName() + ".retries", "0");
    133                 }
    134             }
    135             appendAttributes(attributes, method, prifix + "." + method.getName());
    136             checkAndConvertImplicitConfig(method, map, attributes);
    137         }
    138     }
    139     //attributes通过系统context进行存储.
    140     StaticContext.getSystemContext().putAll(attributes);
    141     ref = createProxy(map);//获取代理对象
    142 }
    143  
    144 private T createProxy(Map<String, String> map) {
    145     URL tmpUrl = new URL("temp", "localhost", 0, map);
    146     final boolean isJvmRefer;
    147     if (isInjvm() == null) {
    148         if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
    149             isJvmRefer = false;
    150         } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
    151             //默认情况下如果本地有服务暴露,则引用本地服务.
    152             isJvmRefer = true;
    153         } else {
    154             isJvmRefer = false;
    155         }
    156     } else {
    157         isJvmRefer = isInjvm().booleanValue();
    158     }
    159      
    160     if (isJvmRefer) {
    161         URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
    162         invoker = refprotocol.refer(interfaceClass, url);//获取invoker对象
    163         if (logger.isInfoEnabled()) {
    164             logger.info("Using injvm service " + interfaceClass.getName());
    165         }
    166     } else {
    167         if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
    168             String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
    169             if (us != null && us.length > 0) {
    170                 for (String u : us) {
    171                     URL url = URL.valueOf(u);
    172                     if (url.getPath() == null || url.getPath().length() == 0) {
    173                         url = url.setPath(interfaceName);
    174                     }
    175                     if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    176                         urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
    177                     } else {
    178                         urls.add(ClusterUtils.mergeUrl(url, map));
    179                     }
    180                 }
    181             }
    182         } else { // 通过注册中心配置拼装URL
    183             List<URL> us = loadRegistries(false);
    184             if (us != null && us.size() > 0) {
    185                 for (URL u : us) {
    186                     URL monitorUrl = loadMonitor(u);
    187                     if (monitorUrl != null) {
    188                         map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
    189                     }
    190                     urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
    191                 }
    192             }
    193             if (urls == null || urls.size() == 0) {
    194                 throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", 
    195                 please config <dubbo:registry address="..." /> to your spring config.");
    196             }
    197         }
    198         for(URL invo : urls){
    199             System.out.println("invoker's url : "+invo.toFullString());
    200         }
    201         if (urls.size() == 1) {
    202             invoker = refprotocol.refer(interfaceClass, urls.get(0));//获取invoker对象
    203         } else {
    204             List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    205             URL registryURL = null;
    206             for (URL url : urls) {
    207                 invokers.add(refprotocol.refer(interfaceClass, url));
    208                 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    209                     registryURL = url; // 用了最后一个registry url
    210                 }
    211             }
    212             if (registryURL != null) { // 有 注册中心协议的URL
    213                 // 对有注册中心的Cluster 只用 AvailableCluster
    214                 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
    215                 invoker = cluster.join(new StaticDirectory(u, invokers));
    216             }  else { // 不是 注册中心的URL
    217                 invoker = cluster.join(new StaticDirectory(invokers));
    218             }
    219              
    220         }
    221     }
    222     System.out.println("real invoker's url :"+invoker.getUrl().toFullString());
    223     Boolean c = check;
    224     if (c == null && consumer != null) {
    225         c = consumer.isCheck();
    226     }
    227     if (c == null) {
    228         c = true; // default true
    229     }
    230     if (c && ! invoker.isAvailable()) {
    231         throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " +
    232                                         (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() 
    233                                          + 
    234                                          " use dubbo version " + Version.getVersion());
    235     }
    236     if (logger.isInfoEnabled()) {
    237         logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    238     }
    239     // 创建服务代理
    240     return (T) proxyFactory.getProxy(invoker);
    241 }
    ReferenceConfig

    创建代理的过程:

    1.      获取消费者配置

    2.      获取配置的注册中心,通过配置中心配置拼装URL,线上应该是个配置中心集群

    3.      遍历注册中心List<URL>集合

    加载监控中心URL,如果配置了监控中心在注册中心url加上MONITOR_KEY

    根据配置的引用服务参数给注册中URL上加上REFER_KEY

    4.      遍历注册中心List<URL>集合,这里注册中心url包含了monitorUrl和referUrl

    protocol.refer(interface,url)调用protocol引用服务返回invoker可执行对象(这个invoker并不是简单的DubboInvoker, 而是由RegistryProtocol构建基于目录服务的集群策略Invoker, 这个invoker可以通过目录服务list出真正可调用的远程服务invoker)

    对于注册中心Url设置集群策略为AvailableCluster, 由AvailableCluster将所有对象注册中调用的invoker伪装成一个invoker

    5.      通过代理工厂创建远程服务代理返回给使用着proxyFactory.getProxy(invoker);

    procotol.refer(interface, url) 引用服务的过程

    1.      经过ProtocolListenerWrapper, ProtocolFilterWrapper由于是注册中心url调用RegistryProtocol.refer

    2.      获取注册中心协议zookeeper, Redis, 还是dubbo, 并根据注册中心协议通过注册器工厂RegistryFactory.getRegistry(url)获取注册器Registry用来跟注册中心交互

    3.      根据配置的group分组

    4.      创建注册服务目录RegistryDirectory并设置注册器

    5.      构建订阅服务的subscribeUrl

    6.      通过注册器registry向注册中心注册subscribeUrl消费端url

    7.      目录服务registryDirectory.subscribe(subscribeUrl)订阅服务(这里我们以开源版本zookeeper为注册中心为例来讲解, dubbo协议的注册中心有点不一样)

    其实内部也是通过注册器registry.subscribe(url,this) 这里this就是registryDirectory它实现了NotifyListener。

    服务提供者在向zookeeper注册服务/dubbo/com.alibaba.dubbo.demo.DemoService/providers/节点下写下自己的URL地址

    服务消费者向zookeepr注册服务/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/节点下写下自己的URL地址

    服务消费者向zookeeper 订阅服务/dubbo/com.alibaba.dubbo.demo.DemoService/ providers /节点下所有服务提供者URL地址

    Zookeeper通过watcher机制实现对节点的监听,节点数据变化通过节点上的watcher回调客户端, 重新生成对服务的refer

    在订阅的过程中通过获取/dubbo/com.alibaba.dubbo.demo.DemoService/providers /下的所有服务提供者的urls(类似dubbo://10.33. 37.8:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=william&pid=7356&side=consumer&timestamp=1416971340626),主动回调NotifyListener来根据urls生成对服务提供者的引用生成可执行invokers,供目录服务持有着,

    看下如下RegistryDirectory.notify(urls)方法中的代码实现

    8.      通过cluster.join(directory) 合并invoker并提供集群调用策略

    DubboProtocol.refer过程:

    1.      经过ProtocolListenerWrapper, ProtocolFilterWrapper构建监听器链和过滤器链。

    2.      DubboProtocol根据url获取ExchangeClient对象,如果是share存在就返回不存在创建新对象不是share直接创建。ExchangeClient是底层通信的客户端,对于通信层的创建功能不在这里讲解。

    3.      创建DubboInvoker, 这个invoker对象包含对远程服务提供者的长链接,是真正执行远程服务调用的可执行对象

    4.      将创建的invoker返回给目录服务

    官方文档的应用服务的序列图

    引用服务活动图:

     
     
  • 相关阅读:
    《C++ 并发编程》- 第1章 你好,C++的并发世界
    30分钟,让你成为一个更好的程序员
    程序员技术练级攻略
    谈新技术学习方法-如何学习一门新技术新编程语言
    计算机科学中最重要的32个算法
    程序员学习能力提升三要素
    一位在MIT教数学的老师总结了十条经验
    学习算法之路
    十个顶级的C语言资源助你成为优秀的程序员
    Linux中LoadAverage分析
  • 原文地址:https://www.cnblogs.com/wxd0108/p/6638594.html
Copyright © 2011-2022 走看看