zoukankan      html  css  js  c++  java
  • Dubbo系列之 (三)Registry注册中心-注册(2)

    辅助链接

    Dubbo系列之 (一)SPI扩展

    Dubbo系列之 (二)Registry注册中心-注册(1)

    Dubbo系列之 (三)Registry注册中心-注册(2)

    引导

    本章主要介绍下AbstractRegistry、FailbackRegistry的作用和源码。

    AbstractRegistry

    首先,直接引出这个类的作用,该类主要把服务提供者信息缓存本地文件上,文件目录是:当前用户目录下的/.dubbo/dubbo-registry-${application}-${hos}-${port}.cache。
    在解读源码前,先阅读下AbstractRegistry类的成员变量,从成员变量中可以看到这个类是怎么完成数据的本地化存储的。

        // URL 地址分隔符
        private static final char URL_SEPARATOR = ' ';
    
        //URL地址正则表达式,任何空白符
        private static final String URL_SPLIT = "\s+";
        
        // 参数保存到本地文件的最大重试次数
        private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3;
    
        // 需要保存的参数
        private final Properties properties = new Properties();
        
        // 保存线程,可以看出是否异步保存
        private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
        
        // 是否同步保存
        private boolean syncSaveFile;
    
        // 上一次保存的版本,每次保存更新+1
        private final AtomicLong lastCacheChanged = new AtomicLong();
    
        // 保存重试的次数
        private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger();
    
        // 服务注册的URL保存在这里
        private final Set<URL> registered = new ConcurrentHashSet<>();
    
        // 订阅的URL,key:消费端订阅者URL,values: 通知监听器
        private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
    
        // 订阅的URL,key:消费端订阅者URL,values: Map ,key:服务提供者的名字(默认为providers,configurations,routers),和服务提供者URL
        private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
    
        //当前注册URL,于指定的注册中心连接的URL
        private URL registryUrl;
        
        //本地文件
        private File file;
    

    入口,构造函数

    public AbstractRegistry(URL url) {
            // 保存与注册中心连接的url.
            setUrl(url);
    
            //判断是否需要缓存本地文件,默认需要,文件地址
            if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
                // Start file save timer
                // 是否同步保存,默认是异步
                syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
    
                //文件名和路径一般在,当前用户目录下的/.dubbo/dubbo-registry-${application}-${hos}-${port}.cache
                //例如dubbo-registry-dubbo-demo-annotation-provider-106.52.187.48-2181.cache
                String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
                String filename = url.getParameter(FILE_KEY, defaultFilename);
                File file = null;
                //创建文件
                if (ConfigUtils.isNotEmpty(filename)) {
                    file = new File(filename);
                    if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                        if (!file.getParentFile().mkdirs()) {
                            throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                        }
                    }
                }
    
                this.file = file;
                //在启动订阅中心时,我们需要读取本地缓存文件,以便将来进行注册表容错处理。 其实就是把本地文件file的内容 放入参数properties里
                loadProperties();
                // 进行通知url.getBackupUrls(),第一个参数就是url 自己本身
                notify(url.getBackupUrls());
            }
        }
    

    上面的注释已经非常的清晰了,这里就不在描述,需要关注的是notify()这个函数,所以当每个服务注册和订阅时,首次创建注册中心都会进行notify操作。具体来看下notify方法。

    protected void notify(List<URL> urls) {
    // 这里是注册中心链接的url,里面包括了服务提供方的信息(key:interface等)
            if (CollectionUtils.isEmpty(urls)) {
                return;
            }
            // 这里循环所有的订阅URL
            for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
                URL url = entry.getKey();
    
                // 查看订阅的url 是否是订阅当前的注册服务。不是的话,轮训下一个
                if (!UrlUtils.isMatch(url, urls.get(0))) {
                    continue;
                }
    
                // 这里订阅的URL的通知监听器
                Set<NotifyListener> listeners = entry.getValue();
                if (listeners != null) {
                    // 然后进行依次遍历通知
                    for (NotifyListener listener : listeners) {
                        try {
    
                            notify(url, listener, filterEmpty(url, urls));
                        } catch (Throwable t) {
                            logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
        }
    

    接下来看下具体的notify(URL url, NotifyListener listener, List urls)

    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((CollectionUtils.isEmpty(urls))
                    && !ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
    
            //result,key: providers,configurators,routers ,values:urls.
            Map<String, List<URL>> result = new HashMap<>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) { // 这里再一次判断,订阅URL 和服务提供者URL 是否匹配
                    String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                    List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
    
            Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
    
                //监听通知
                listener.notify(categoryList);
                // 我们将在每次通知后更新缓存文件。
                // 当我们的注册表由于网络抖动而出现订阅失败时,我们至少可以返回现有的缓存URL。
                saveProperties(url);
            }
        }
    

    接着看下saveProperties,

    private void saveProperties(URL url) {
            if (file == null) {
                return;
            }
    
            try {
                StringBuilder buf = new StringBuilder();
                // 得到该订阅URL 的所有服务提供者URLS,并放入buf中
                Map<String, List<URL>> categoryNotified = notified.get(url);
                if (categoryNotified != null) {
                    for (List<URL> us : categoryNotified.values()) {
                        for (URL u : us) {
                            if (buf.length() > 0) {
                                buf.append(URL_SEPARATOR);
                            }
                            buf.append(u.toFullString());
                        }
                    }
                }
                // key 服务接口,value :提供者URL.
                properties.setProperty(url.getServiceKey(), buf.toString());
                long version = lastCacheChanged.incrementAndGet(); // 新增一个版本
                if (syncSaveFile) { //同步保存
                    doSaveProperties(version); 
                } else { // 异步保存
                    registryCacheExecutor.execute(new SaveProperties(version));
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    

    从上面可以知道,把消费端的订阅的服务信息存入了file文件中,doSaveProperties就是文件操作,不进行分析。再一次强调下,消费端订阅时,会订阅某个具体服务下3个节点(providers,configurations,routers)。

    FailbackRegistry

    接着,FailbackRegistry继承自AbstractRegistry。
    其构造函数如下,可以得知除了调用AbstractRegistry构造方法外,并且创建一个HashedWheelTimer类型的定时器。

    public FailbackRegistry(URL url) {
            super(url);
            this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
    
            // since the retry task will not be very much. 128 ticks is enough.
            //集运时间轮转的重试线程器
            retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
        }
    

    并且FailbackRegistry 成员记录一组注册失败和订阅失败的集合,然后通过retryTimer定式扫描这些失败集合,重新发起订阅和注册。之后会单独拿一章节来讲解这个时间轮算法。
    下面是失败集合:

    // 这里是注册失败的urls
        private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
    
    // 这里是取消注册失败的urls
        private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
    
    // 这里是订阅失败的urls
        private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
    
    // 这里是取消订阅失败的urls
        private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
    
    // 这里是通知notify()方法失败异常时的url集合,会进行重新通知
        private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
    

    本章的内容比较简单,主要是接上一章节Dubbo系列之 (二)Registry注册中心-注册(1)的内容,使其完整。目前我们已经完成大部分dubbo是如何与注册中心交互的,接下来的章节我们讲继续分享dubbo服务的导出和订阅等内容。

  • 相关阅读:
    深入理解C++右值引用
    并发编程的原子性和顺序性
    LLVM简介
    APK及相关的Android路径
    UE4资源移动与跨项目迁移
    OpenGL简介
    IDEA无限试用插件
    使用idea搭建springcloud
    .NET Core 微服务架构 Steeltoe 使用(基于 Spring Cloud)
    微服务:注册中心ZooKeeper、Eureka、Consul 、Nacos对比
  • 原文地址:https://www.cnblogs.com/liferecord/p/13497411.html
Copyright © 2011-2022 走看看