zoukankan      html  css  js  c++  java
  • 7.6 服务远程暴露

    为了安全:服务启动的ip全部使用10.10.10.10

    远程服务的暴露总体步骤:

    • 将ref封装为invoker
    • 将invoker转换为exporter
    • 启动netty
    • 注册服务到zookeeper
    • 订阅
    • 返回新的exporter实例

    7.4 服务远程暴露 - 创建Exporter与启动netty服务端中,实现了前三步,本节实现第四步:注册服务到zk。总体代码如下:RegistryProtocol.export(final Invoker<T> originInvoker)

    1         final Registry registry = getRegistry(originInvoker);//创建ZookeeperRegistry实例:创建CuratorClient,并启动会话。
    2         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);//获取真正要注册在zk上的url
    3         registry.register(registedProviderUrl);//创建节点(即注册服务到zk上)

    说明:

    • 第一句代码用来创建ZookeeperRegistry实例:创建CuratorClient,并启动会话。
    • 第二句代码获取真正要注册在zk上的url
    • 第三句代码实现创建节点(即注册服务到zk上)

    一  创建ZookeeperRegistry实例

    1  RegistryProtocol.getRegistry(final Invoker<?> originInvoker)

     1     /**
     2      * 根据invoker的地址获取registry实例
     3      */
     4     private Registry getRegistry(final Invoker<?> originInvoker) {
     5         URL registryUrl = originInvoker.getUrl();
     6         if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
     7             String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);//zookeeper
     8             registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
     9         }
    10         return registryFactory.getRegistry(registryUrl);
    11     }

    首先对originInvoker中的url进行处理:

    • 将协议换成zookeeper
    • 去掉registry=zookeeper的参数

    来看一下originInvoker的url:(解码后的)

    registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider&timestamp=1507262031554&pid=2791&registry=zookeeper&timestamp=1507262031521

    说明:

    • 第一个红色部分代表协议:zookeeper
    • 第二个红色部分是export参数
    • 第三个红色部分是registry=zookeeper

    经过处理之后的registryUrl为:

    zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider&timestamp=1507262031554&pid=2791&timestamp=1507262031521

    之后使用注册工厂来创建注册中心。

    2  RegistryFactory$Adaptive.getRegistry(com.alibaba.dubbo.common.URL registryUrl)

     1 public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {
     2     public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
     3         if (arg0 == null)
     4             throw new IllegalArgumentException("url == null");
     5         com.alibaba.dubbo.common.URL url = arg0;
     6         String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );//zookeeper
     7         if(extName == null)
     8             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
     9         com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
    10         return extension.getRegistry(arg0);
    11     }
    12 }

    这里获取到的extension是ZookeeperRegistryFactory,之后,使用ZookeeperRegistryFactory进行Registry的创建。首先来看一下ZookeeperRegistryFactory的继承图:

    getRegistry方法在ZookeeperRegistryFactory的父类AbstractRegistryFactory中。

    3  AbstractRegistryFactory.getRegistry(URL registryUrl)

     1     public Registry getRegistry(URL url) {
     2         url = url.setPath(RegistryService.class.getName())
     3                 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
     4                 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
     5         String key = url.toServiceString();
     6         // 锁定注册中心获取过程,保证注册中心单一实例
     7         LOCK.lock();
     8         try {
     9             Registry registry = REGISTRIES.get(key);
    10             if (registry != null) {
    11                 return registry;
    12             }
    13             registry = createRegistry(url);
    14             if (registry == null) {
    15                 throw new IllegalStateException("Can not create registry " + url);
    16             }
    17             REGISTRIES.put(key, registry);
    18             return registry;
    19         } finally {
    20             // 释放锁
    21             LOCK.unlock();
    22         }
    23     }

    流程:

    • 先处理url,之后获取Registry的key,然后根据该key从Map<String, Registry> REGISTRIES注册中心集合缓存中获取Registry,如果有,直接返回,如果没有,创建Registry,之后存入缓存,最后返回。

    首先处理传入的registryUrl:

    • 设置:path=com.alibaba.dubbo.registry.RegistryService
    • 添加参数:interface=com.alibaba.dubbo.registry.RegistryService
    • 去除export参数

    最终得到的registryUrl如下:

    zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=2791&timestamp=1507262031521

    之后,很具上述的registryUrl创建Registry的key,该{ key : Registry }最终会被存储在Map<String, Registry> REGISTRIES注册中心集合(该属性是ZookeeperRegistryFactory父类AbstractRegistryFactory的一个属性)中。

    根据registryUrl创建Registry的key:url.toServiceString()

     1     public String toServiceString() {
     2         return buildString(true, false, true, true);
     3     }
     4 
     5     private String buildString(boolean appendUser, boolean appendParameter, boolean useIP, boolean useService, String... parameters) {
     6         StringBuilder buf = new StringBuilder();
     7         if (protocol != null && protocol.length() > 0) {  //protocol://
     8             buf.append(protocol);
     9             buf.append("://");
    10         }
    11         if (appendUser && username != null && username.length() > 0) {  //protocol://username:password@host:port/group/interface{path}:version/parameters
    12             buf.append(username);
    13             if (password != null && password.length() > 0) {
    14                 buf.append(":");
    15                 buf.append(password);
    16             }
    17             buf.append("@");
    18         }
    19         String host;
    20         if (useIP) {
    21             host = getIp();
    22         } else {
    23             host = getHost();
    24         }
    25         if (host != null && host.length() > 0) {
    26             buf.append(host);
    27             if (port > 0) {
    28                 buf.append(":");
    29                 buf.append(port);
    30             }
    31         }
    32         String path;
    33         if (useService) {
    34             path = getServiceKey();
    35         } else {
    36             path = getPath();
    37         }
    38         if (path != null && path.length() > 0) {
    39             buf.append("/");
    40             buf.append(path);
    41         }
    42         if (appendParameter) {
    43             buildParameters(buf, true, parameters);
    44         }
    45         return buf.toString();
    46     }
    47 
    48     public String getServiceKey() {
    49         String inf = getServiceInterface();//先获取interface参数,如果没有的话,取path的值,这里都是com.alibaba.dubbo.registry.RegistryService
    50         if (inf == null) return null;
    51         StringBuilder buf = new StringBuilder();
    52         String group = getParameter(Constants.GROUP_KEY);
    53         if (group != null && group.length() > 0) {
    54             buf.append(group).append("/"); //interfacegroup
    55         }
    56         buf.append(inf);
    57         String version = getParameter(Constants.VERSION_KEY);
    58         if (version != null && version.length() > 0) {
    59             buf.append(":").append(version);
    60         }
    61         return buf.toString();
    62     }

    最终得到的应该是这样的形式:protocol://username:password@host:port/group/interface{path}:version?key1=value1&key2=value2...。

    这里key=zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService

    之后来到了真正创建Registry的地方。

     1 public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
     2     private ZookeeperTransporter zookeeperTransporter;
     3 
     4     public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
     5         this.zookeeperTransporter = zookeeperTransporter;
     6     }
     7 
     8     public Registry createRegistry(URL url) {
     9         return new ZookeeperRegistry(url, zookeeperTransporter);
    10     }
    11 }

    这里的zookeeperTransporter对象是一个com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter$Adaptive对象。

    在创建ZookeeperRegistry之前来看一下其继承图:

    new ZookeeperRegistry(registryUrl, ZookeeperTransporter$Adaptive对象)

     1     private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
     2     private final static String DEFAULT_ROOT = "dubbo";
     3     private final String root;
     4     private final Set<String> anyServices = new ConcurrentHashSet<String>();
     5     private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
     6     private final ZookeeperClient zkClient;
     7 
     8     public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
     9         super(url);
    10         if (url.isAnyHost()) {
    11             throw new IllegalStateException("registry address == null");
    12         }
    13         String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);//dubbo
    14         if (!group.startsWith(Constants.PATH_SEPARATOR)) {
    15             group = Constants.PATH_SEPARATOR + group;
    16         }
    17         this.root = group;// /dubbo
    18         zkClient = zookeeperTransporter.connect(url);//创建zk客户端,启动会话
    19         zkClient.addStateListener(new StateListener() {//监听重新连接成功事件,重新连接成功后,之前已经完成注册和订阅的url要重新进行注册和订阅(因为临时节点可能已经跪了)
    20             public void stateChanged(int state) {
    21                 if (state == RECONNECTED) {
    22                     try {
    23                         recover();
    24                     } catch (Exception e) {
    25                         logger.error(e.getMessage(), e);
    26                     }
    27                 }
    28             }
    29         });
    30     }

    new FailbackRegistry(registryUrl)

     1     private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
     2     // 失败重试定时器,定时检查是否有请求失败,如有,无限次重试
     3     private final ScheduledFuture<?> retryFuture;
     4     private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
     5     private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
     6     private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
     7     private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
     8     private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
     9     private AtomicBoolean destroyed = new AtomicBoolean(false);
    10 
    11     public FailbackRegistry(URL url) {
    12         super(url);
    13         int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);//5*1000
    14         this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
    15             public void run() {
    16                 // 检测并连接注册中心
    17                 try {
    18                     retry();
    19                 } catch (Throwable t) { // 防御性容错
    20                     logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
    21                 }
    22             }
    23         }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    24     }

    new AbstractRegistry(registryUrl)

     1     // URL地址分隔符,用于文件缓存中,服务提供者URL分隔
     2     private static final char URL_SEPARATOR = ' ';
     3     // URL地址分隔正则表达式,用于解析文件缓存中服务提供者URL列表
     4     private static final String URL_SPLIT = "\s+";
     5     // 本地磁盘缓存,其中特殊的key值.registies记录注册中心列表,其它均为notified服务提供者列表
     6     private final Properties properties = new Properties();
     7     // 文件缓存定时写入
     8     private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
     9     //是否是同步保存文件
    10     private final boolean syncSaveFile;
    11     // 本地磁盘缓存文件
    12     private File file;
    13     private final AtomicLong lastCacheChanged = new AtomicLong();
    14     private final Set<URL> registered = new ConcurrentHashSet<URL>();//已经注册的url集合
    15     private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();//已经订阅的<URL, Set<NotifyListener>>
    16     private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();//已经通知的<URL, Map<String, List<URL>>>
    17     private URL registryUrl;//注册url
    18     private AtomicBoolean destroyed = new AtomicBoolean(false);
    19 
    20     public AbstractRegistry(URL url) {
    21         setUrl(url);
    22         // 启动文件保存定时器
    23         syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
    24         String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
    25         File file = null;
    26         if (ConfigUtils.isNotEmpty(filename)) {
    27             file = new File(filename);
    28             if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
    29                 if (!file.getParentFile().mkdirs()) {//创建文件所在的文件夹 /Users/jigangzhao/.dubbo/
    30                     throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
    31                 }
    32             }
    33         }
    34         this.file = file;
    35         loadProperties();
    36         notify(url.getBackupUrls());
    37     }

    先简单的总结一下:父子三代分别做的事情:

    • AbstractRegistry主要用来维护缓存文件
    • FailbackRegistry主要用来做失败重试操作(包括:注册失败/反注册失败/订阅失败/反订阅失败/通知失败的重试);也提供了供ZookeeperRegistry使用的zk重连后的恢复工作的方法。
    • ZookeeperRegistry创建zk客户端,启动会话;并且调用FailbackRegistry实现zk重连后的恢复工作

    先看AbstractRegistry

    • 设置属性registryUrl=url:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
    • 创建文件/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache的文件夹/Users/jigangzhao/.dubbo
    • 设置属性file:/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache文件,该文件存储信息将是这样的:

      com.alibaba.dubbo.demo.DemoService=empty://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5259&side=provider&timestamp=1507294508053

    • 如果file存在,将file中的内容写入properties属性;既然有读file,那么是什么时候写入file的呢?AbstractRegistry创建了一个含有一个名字为DubboSaveRegistryCache的后台线程的FixedThreadPool,只在在notify(URL url, NotifyListener listener, List<URL> urls)方法中会被调用,我们此处由于ConcurrentMap<URL, Set<NotifyListener>> subscribed为空,所以AbstractRegistry(URL url)中的notify(url.getBackupUrls())不会执行,此处也不会创建文件。
    • 最后是notify(url.getBackupUrls())(TODO 这里后续会写

    再来看FailbackRegistry:

    只做了一件事,启动了一个含有一个名为DubboRegistryFailedRetryTimer的后台线程的ScheduledThreadPool,线程创建5s后开始第一次执行retry(),之后每隔5s执行一次。来看一下retry()

      1     /**
      2      * 将所有注册失败的url(failedRegistered中的url)进行注册,之后从failedRegistered进行移除;
      3      * 将所有反注册失败的url(failedUnregistered中的url)进行反注册,之后从failedUnregistered进行移除;
      4      * 将所有订阅失败的url(failedSubscribed中的url)进行重新订阅,之后从failedSubscribed进行移除;
      5      * 将所有反订阅失败的url(failedUnsubscribed中的url)进行反订阅,之后从failedUnsubscribed进行移除;
      6      * 将所有通知失败的url(failedNotified中的url)进行通知,之后从failedNotified进行移除;
      7      */
      8     protected void retry() {
      9         if (!failedRegistered.isEmpty()) {
     10             Set<URL> failed = new HashSet<URL>(failedRegistered);
     11             if (failed.size() > 0) {
     12                 if (logger.isInfoEnabled()) {
     13                     logger.info("Retry register " + failed);
     14                 }
     15                 try {
     16                     for (URL url : failed) {
     17                         try {
     18                             doRegister(url);
     19                             failedRegistered.remove(url);
     20                         } catch (Throwable t) { // 忽略所有异常,等待下次重试
     21                             logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
     22                         }
     23                     }
     24                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
     25                     logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
     26                 }
     27             }
     28         }
     29         if (!failedUnregistered.isEmpty()) {
     30             Set<URL> failed = new HashSet<URL>(failedUnregistered);
     31             if (failed.size() > 0) {
     32                 if (logger.isInfoEnabled()) {
     33                     logger.info("Retry unregister " + failed);
     34                 }
     35                 try {
     36                     for (URL url : failed) {
     37                         try {
     38                             doUnregister(url);
     39                             failedUnregistered.remove(url);
     40                         } catch (Throwable t) { // 忽略所有异常,等待下次重试
     41                             logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
     42                         }
     43                     }
     44                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
     45                     logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
     46                 }
     47             }
     48         }
     49         if (!failedSubscribed.isEmpty()) {
     50             Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
     51             for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
     52                 if (entry.getValue() == null || entry.getValue().size() == 0) {
     53                     failed.remove(entry.getKey());
     54                 }
     55             }
     56             if (failed.size() > 0) {
     57                 if (logger.isInfoEnabled()) {
     58                     logger.info("Retry subscribe " + failed);
     59                 }
     60                 try {
     61                     for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
     62                         URL url = entry.getKey();
     63                         Set<NotifyListener> listeners = entry.getValue();
     64                         for (NotifyListener listener : listeners) {
     65                             try {
     66                                 doSubscribe(url, listener);//listener需要一个一个订阅,每订阅一个,就将该listener从当前的url监听列表中移除
     67                                 listeners.remove(listener);
     68                             } catch (Throwable t) { // 忽略所有异常,等待下次重试
     69                                 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
     70                             }
     71                         }
     72                     }
     73                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
     74                     logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
     75                 }
     76             }
     77         }
     78         if (!failedUnsubscribed.isEmpty()) {
     79             Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
     80             for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
     81                 if (entry.getValue() == null || entry.getValue().size() == 0) {
     82                     failed.remove(entry.getKey());
     83                 }
     84             }
     85             if (failed.size() > 0) {
     86                 if (logger.isInfoEnabled()) {
     87                     logger.info("Retry unsubscribe " + failed);
     88                 }
     89                 try {
     90                     for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
     91                         URL url = entry.getKey();
     92                         Set<NotifyListener> listeners = entry.getValue();
     93                         for (NotifyListener listener : listeners) {
     94                             try {
     95                                 doUnsubscribe(url, listener);//listener需要一个一个反订阅,每反订阅一个,就将该listener从当前的url监听列表中移除
     96                                 listeners.remove(listener);
     97                             } catch (Throwable t) { // 忽略所有异常,等待下次重试
     98                                 logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
     99                             }
    100                         }
    101                     }
    102                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
    103                     logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    104                 }
    105             }
    106         }
    107         if (!failedNotified.isEmpty()) {
    108             Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
    109             for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {
    110                 if (entry.getValue() == null || entry.getValue().size() == 0) {
    111                     failed.remove(entry.getKey());
    112                 }
    113             }
    114             if (failed.size() > 0) {
    115                 if (logger.isInfoEnabled()) {
    116                     logger.info("Retry notify " + failed);
    117                 }
    118                 try {
    119                     for (Map<NotifyListener, List<URL>> values : failed.values()) {
    120                         for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
    121                             try {
    122                                 NotifyListener listener = entry.getKey();
    123                                 List<URL> urls = entry.getValue();
    124                                 listener.notify(urls);
    125                                 values.remove(listener);
    126                             } catch (Throwable t) { // 忽略所有异常,等待下次重试
    127                                 logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    128                             }
    129                         }
    130                     }
    131                 } catch (Throwable t) { // 忽略所有异常,等待下次重试
    132                     logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    133                 }
    134             }
    135         }
    136     }

    最后回到我们的主角:ZookeeperRegistry

    首先是为属性设置root=/dubbo,之后创建zk客户端,启动会话,最后创建了一个StateListener监听器,监听重新连接成功事件,重新连接成功后,之前已经完成注册和订阅的url要重新进行注册和订阅(因为临时节点可能已经跪了)。

    来看创建zk客户端,启动会话的代码,这是此处最核心的部分:

    ZookeeperTransporter$Adaptive.connect(com.alibaba.dubbo.common.URL registryUrl)

     1     public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) {
     2         if (arg0 == null)
     3             throw new IllegalArgumentException("url == null");
     4         com.alibaba.dubbo.common.URL url = arg0;
     5         String extName = url.getParameter("client", url.getParameter("transporter", "zkclient"));//curator
     6         if(extName == null)
     7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])");
     8         com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName);
     9         return extension.connect(arg0);
    10     }

    这里创建的extension是CuratorZookeeperTransporter实例。

    1 public class CuratorZookeeperTransporter implements ZookeeperTransporter {
    2     public ZookeeperClient connect(URL url) {
    3         return new CuratorZookeeperClient(url);
    4     }
    5 }

    new CuratorZookeeperClient(registryUrl)

     1     private final CuratorFramework client;
     2 
     3     public CuratorZookeeperClient(URL url) {
     4         super(url);
     5         try {
     6             CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
     7                     .connectString(url.getBackupAddress())
     8                     .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
     9                     .connectionTimeoutMs(5000);
    10             String authority = url.getAuthority();
    11             if (authority != null && authority.length() > 0) {
    12                 builder = builder.authorization("digest", authority.getBytes());
    13             }
    14             client = builder.build();
    15             client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
    16                 public void stateChanged(CuratorFramework client, ConnectionState state) {
    17                     if (state == ConnectionState.LOST) {
    18                         CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
    19                     } else if (state == ConnectionState.CONNECTED) {
    20                         CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
    21                     } else if (state == ConnectionState.RECONNECTED) {
    22                         CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
    23                     }
    24                 }
    25             });
    26             client.start();
    27         } catch (Exception e) {
    28             throw new IllegalStateException(e.getMessage(), e);
    29         }
    30     }

    这里首先执行父类AbstractZookeeperClient的构造器来初始化一些参数,之后创建CuratorFramework客户端,然后添加了ConnectionStateListener监听器,监听连接断开/连接成功/重新连接成功事件,之后作出相应的操作(实际上这里只有重新连接成功事件会被处理,而处理器实际上就是ZookeeperRegistry构造器中的那个执行recover()的StateListener),

        protected void stateChanged(int state) {
            for (StateListener sessionListener : getSessionListeners()) {
                sessionListener.stateChanged(state);//此处查找实现类,只有ZookeeperRegistry构造器中的那个StateListener
            }
        }

    最后阻塞,直到创建会话完成。

    来看一下父类AbstractZookeeperClient

    1     private final URL url;
    2     private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
    3     private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
    4     private volatile boolean closed = false;
    5 
    6     public AbstractZookeeperClient(URL url) {
    7         this.url = url;
    8     }

    说明:

    • 设置属性url=registryUrl:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
    • 创建了一个Set<StateListener> stateListeners,ZookeeperRegistry构造器中的那个执行recover()的StateListener就将会放在这里

    至此,一个完整的ZookeeperRegistry实例就创建完成了,来看一下属性:

    • ZookeeperClient zkClientCuratorZookeeperClient实例
      • CuratorFramework client:CuratorFrameworkImpl实例
      • String url:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
      • Set<StateListener> stateListeners:{ 监听了重连成功事件的执行recover()的StateListener }
    • String root="/dubbo"
    • URL registryUrl = zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
    • Set<URL> registered:0//已经注册的url集合,此处为空
    • ConcurrentMap<URL, Set<NotifyListener>> subscribed:0//已经订阅的<URL, Set<NotifyListener>>
    • ConcurrentMap<URL, Map<String, List<URL>>> notified:0//已经通知的<URL, Map<String, List<URL>>>
    • Set<URL> failedRegistered:0//注册失败的url
    • Set<URL> failedUnregistered:0//反注册失败的url
    • ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed:0//订阅失败的url
    • ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed:0//反订阅失败的url
    • ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified:0//通知失败的url
    • ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners:0

    还有一个定时线程:DubboRegistryFailedRetryTimer每隔5s执行一次retry(),进行失败重试。

    最后,该ZookeeperRegistry会存储在ZookeeperRegistry的父类的static属性Map<String, Registry> REGISTRIES中:

    Map<String, Registry> REGISTRIES:{ "zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService" : ZookeeperRegistry实例 }

    二  获取真正要注册到zk的节点url

    1 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
     1     /**
     2      * 1 获取originInvoker的export参数值:就是providerUrl
     3      * 2 去除providerUrl中所有参数名是"."开头的,然后去除参数monitor
     4      */
     5     private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
     6         URL providerUrl = getProviderUrl(originInvoker);
     7         //注册中心看到的地址
     8         final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameter(Constants.MONITOR_KEY);
     9         return registedProviderUrl;
    10     }
    11 
    12     /**
    13      * 从invoker的URL中的Map<String, String> parameters中获取key为export的地址providerUrl:
    14      */
    15     private URL getProviderUrl(final Invoker<?> origininvoker) {
    16         String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
    17         if (export == null || export.length() == 0) {
    18             throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
    19         }
    20         URL providerUrl = URL.valueOf(export);
    21         return providerUrl;
    22     }
    23 
    24     //过滤URL中不需要输出的参数(以点号开头的)
    25     private static String[] getFilteredKeys(URL url) {
    26         Map<String, String> params = url.getParameters();
    27         if (params != null && !params.isEmpty()) {
    28             List<String> filteredKeys = new ArrayList<String>();
    29             for (Map.Entry<String, String> entry : params.entrySet()) {
    30                 if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) {
    31                     filteredKeys.add(entry.getKey());
    32                 }
    33             }
    34             return filteredKeys.toArray(new String[filteredKeys.size()]);
    35         } else {
    36             return new String[]{};
    37         }
    38     }

    最后得到的registedProviderUrl是:

    • dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=4758&side=provider&timestamp=1507289961588

    三  注册服务到zk

    registry.register(registedProviderUrl);//创建节点(即注册服务到zk上)

    这里的registry是ZookeeperRegistry。register(registedProviderUrl)方法在ZookeeperRegistry的父类FailbackRegistry中实现。

    1  FailbackRegistry.register(registedProviderUrl)

     1     @Override
     2     public void register(URL url) {
     3         if (destroyed.get()){
     4             return;
     5         }
     6         super.register(url);
     7         failedRegistered.remove(url);
     8         failedUnregistered.remove(url);
     9         try {
    10             // 向服务器端发送注册请求
    11             doRegister(url);
    12         } catch (Exception e) {
    13             Throwable t = e;
    14             // 如果开启了启动时检测check=true,则直接抛出异常,不会加入到failedRegistered中
    15             boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
    16                     && url.getParameter(Constants.CHECK_KEY, true)
    17                     && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
    18             boolean skipFailback = t instanceof SkipFailbackWrapperException;
    19             if (check || skipFailback) {
    20                 if (skipFailback) {
    21                     t = t.getCause();
    22                 }
    23                 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
    24             } else {
    25                 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    26             }
    27             // 将失败的注册请求记录到失败列表,定时重试
    28             failedRegistered.add(url);
    29         }
    30     }

    首先调用父类AbstractRegistry的register(registedProviderUrl)将当前的registeredProviderUrl放到Set<URL> registered属性中,如下:

    1     public void register(URL url) {
    2         if (url == null) {
    3             throw new IllegalArgumentException("register url == null");
    4         }
    5         if (logger.isInfoEnabled()) {
    6             logger.info("Register: " + url);
    7         }
    8         registered.add(url);
    9     }

    之后,从failedRegistered和failedUnregistered两个url集合中删除该url。然后执行真正的服务注册(创建节点,doRegister(url)),如果在创建过程中抛出异常,如果url的协议不是consumer并且开启了check=true的属性并且当前存储的URL registryUrl也有check=true的话,那么直接抛出异常,不会将该url加入到failedRegistered集合;当然抛出的异常如果是SkipFailbackWrapperException,那么也会直接抛出异常,不会将该url加入到failedRegistered集合。否则,会将该url加入到failedRegistered集合,然后DubboRegistryFailedRetryTimer线程会每隔5s执行一次doRegister(url)。

    我们来看真正doRegister(url)。

    2  ZookeeperRegistry.doRegister(registedProviderUrl)

    1     protected void doRegister(URL url) {
    2         try {
    3             zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    4         } catch (Throwable e) {
    5             throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    6         }
    7     }

    首先是对入参registedProviderUrl进行一顿处理,

     1     private String toUrlPath(URL url) {
     2         return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
     3     }
     4 
     5     private String toCategoryPath(URL url) {
     6         return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
     7     }
     8 
     9     private String toServicePath(URL url) {
    10         String name = url.getServiceInterface();
    11         if (Constants.ANY_VALUE.equals(name)) {
    12             return toRootPath();
    13         }
    14         return toRootDir() + URL.encode(name);// /dubbo/com.alibaba.dubbo.demo.DemoService
    15     }
    16 
    17     private String toRootDir() {
    18         if (root.equals(Constants.PATH_SEPARATOR)) {
    19             return root;
    20         }
    21         return root + Constants.PATH_SEPARATOR;// /dubbo/
    22     }
    23 
    24     private String toRootPath() {
    25         return root;
    26     }

    这里就体现了上边的ZookeeperRegistry的root属性的作用。最终实际上得到的是:/dubbo/interface/category/encode过的export,该节点也将是创建在zk上的节点。

    • /dubbo是根节点
    • /interface是服务接口
    • /category是providers/consumers/routers/configurators等

    最终得到的url是:

    • /dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5148%26side%3Dprovider%26timestamp%3D1507291294629
    • 解码后:/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5148&side=provider&timestamp=1507291294629

    最后执行zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true))来创建节点,该方法由CuratorZookeeperClient的父类AbstractZookeeperClient来执行:

     1     public void create(String path, boolean ephemeral) {
     2         int i = path.lastIndexOf('/');
     3         if (i > 0) {
     4             create(path.substring(0, i), false);
     5         }
     6         if (ephemeral) {
     7             createEphemeral(path);
     8         } else {
     9             createPersistent(path);
    10         }
    11     }

    这里实际上是通过递归分别创建持久化的/dubbo,/dubbo/com.alibaba.dubbo.demo.DemoService以及/dubbo/com.alibaba.dubbo.demo.DemoService/providers节点;最后创建临时节点/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5148%26side%3Dprovider%26timestamp%3D1507291294629,而实际上,如果使用了curator的话,可以直接使用递归创建节点即可(结合zk的特性,只有最后一个字节点可以是临时节点,父节点一定是持久化节点),这里这样的写法应该是兼容不能递归创建节点的Zkclient客户端。值得注意的是,url.getParameter(Constants.DYNAMIC_KEY, true)为true则最终创建的节点是临时节点,否则是持久化节点。

    创建节点的操作是在CuratorZookeeperClient中进行的。

     1     public void createPersistent(String path) {
     2         try {
     3             client.create().forPath(path);
     4         } catch (NodeExistsException e) {
     5         } catch (Exception e) {
     6             throw new IllegalStateException(e.getMessage(), e);
     7         }
     8     }
     9 
    10     public void createEphemeral(String path) {
    11         try {
    12             client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
    13         } catch (NodeExistsException e) {
    14         } catch (Exception e) {
    15             throw new IllegalStateException(e.getMessage(), e);
    16         }
    17     }

    到此为止,我们去zk上看一下节点的创建情况。

    或者从zkui上看一下:

    隐藏掉的是ip:10.10.10.10。

    到目前为止,我们再来看看ZookeeperRegistry的属性变化。相较于注册前:

    • Set<URL> registered:[ dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5214&side=provider&timestamp=1507293238549 ]
  • 相关阅读:
    近期学习(3)
    近期学习(1)
    近期学习(2)
    今日练习
    《明朝那些事儿》
    记一次针对恶意攻击者的渗透测试
    Kali Linux使用问题记录
    MySQL floor()报错原理
    使用复合设计模式扩展持久化的CURD,Select能力
    c#/js代码命名规范及代码规范
  • 原文地址:https://www.cnblogs.com/java-zhao/p/7629027.html
Copyright © 2011-2022 走看看