zoukankan      html  css  js  c++  java
  • dubbo注册中心分析

    一、注册中心概述

    1.1 注册中心基本介绍

    Dubbo通过注册中心实现了分布式环境中各个服务的注册与发现,是各个分布式节点的纽带。其主要作用如下:

    • 动态加入。一个服务通过注册中心可以动态的把自己暴露给其他消费者。
    • 动态发现。消费者可以动态感知新的配置、路由规则和新的服务提供者。
    • 动态调整。注册中心支持参数的动态调整,新参数自动更新到所有相关服务节点。
    • 统一配置。避免本地配置导致每个服务配置不一样。

    注册中心代码结构如下图,以zookeeper:

    E:读书笔记dubbodubbo-registy-image

    dubbo-registry-api:包含了注册中心的所有API和抽象实现类。

    dubbo-registry-zookeeper:使用Zookeeper作为注册中心的实现。

    dubbo-registry-redis:使用redis作为注册中心的实现。

    dubbo-registry-nacos:使用nacos作为注册中心的实现。

    dubbo-registry-default:Dubbo基于内存的默认实现

    dubbo-registry-multicast:multicast模式的服务注册中心。

    1.2 注册中心工作流程

    注册中心工作流程比较简单:

    • 服务提供者启动时,会向注册中心写入自己的元数据信息,同时会订阅配置元数据信息。
    • 消费者启动时,会向注册中心写入自己的元数据信息,并订阅服务提供者、路由和配置元数据信息。
    • 服务治理中心(dubbo-admin)启动时,会同时订阅所有消费者、服务提供者、路由和配置元数据信息。
    • 当有服务提供者离开或有新的服务提供者加入时,注册中心服务提供者目录发生变化,变化的消息通知到消费者和服务治理中心。
    • 当消费者方发起调用时,会异步将调用,统计信息等上报给监控中心。

    二、Zookeeper注册中心

    2.1 注册中心的数据目录

    Dubbo使用Zookeeper作为注册中心时,生成的目录结构如下:

    根节点是注册中心分组,下面是多个服务接口,分组值来自于dubbo:registry中的group属性,默认是/dubbo。

    服务接口下面包含四个子目录,分别是configurators、consumers、providers、routers。

    configurators中包含了多个用于服务动态配置的URL元数据信息。

    consumers中包含多个用于消费者URL元数据信息,如下的子目录名称:

    consumer://192.168.231.1/com.alibaba.dubbo.demo.EchoService?application=echo-consumer&category=consumers&check=false&dubbo=2.0.2&interface=com.alibaba.dubbo.demo.EchoService&methods=echo,sayByy&pid=3896&qos.port=33333&side=consumer×tamp=1632326685656
    

    providers中包含多个服务提供者URL元数据信息,如下的子目录:

    dubbo://192.168.231.1:20880/com.alibaba.dubbo.demo.EchoService?anyhost=true&application=echo-provider&bean.name=com.alibaba.dubbo.demo.EchoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.EchoService&methods=echo,sayByy&pid=18332&side=provider×tamp=1632324633507
    

    routers下面包含了多个用于消费者路由策略URL元数据信息。

    三、Zookeeper注册中心源码分析

    2.6.X的Dubbo源码中有的注册中心有5种不同的实现,并且,如果这些注册中心不能满足需求,那么用户可以基于RegistryFactory和Registry自行扩展。我们在这里分析Zookeeper的实现。

    3.1 ZooKeeperRegistry继承关系

    如下图是ZooKeeperRegistry注册中心的继承关系:

    3.2 AbstractRegistry源码分析

    3.2.1 RegistryService接口

    RegistryService接口定义了注册中心基本的功能,如下:

    public interface RegistryService {
    
        /**
        注册数据,如:提供者服务、消费者地址、路由规则、配置规则等数据。
        注册时需要满足下面的约定:
        1. 当 URL 设置 check=false 参数时。 注册失败时不抛出异常,后台重试。 否则,将抛出异常。
        2、URL设置dynamic=false参数时,需要持久保存,否则注册者异常退出时自动删除。
        3、当URL设置category=routers时,表示分类存储,默认分类为providers,数据可以通过分类部分通知。
        4、重新启动注册中心时,网络抖动,数据不会丢失,包括断开连接时自动删除数据。
        5、允许URL相同但参数不同的URL共存,不能相互覆盖。
        参数:注册信息,不能为空,如 dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
        **/
        void register(URL url);
    
        /**
        注销数据:
        注销数据需要满足下面约定:
        1、如果是dynamic=false的持久化存储数据,找不到注册数据,则抛出IllegalStateException,否则忽略。
        2、url全部匹配才能注销。
        参数:注册信息,不能为空,如 dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
        **/
        void unregister(URL url);
    
        /**
        订阅符合条件的注册数据并在注册数据发生变化时自动推送
        订阅需要满足以下约定:
        1. 当 URL 设置 check=false 参数时。注册失败时不抛出异常,后台重试。
        2. 当URL设置category=routers时,只通知指定的分类数据。多个分类用逗号分隔,并允许星号匹配,表示订阅了所有分类数据。
        3. 允许interface、group、version、classifier作为条件查询,例如:interface=com.alibaba.foo.BarService&version=1.0.0
        4. 并且查询条件允许星号匹配,订阅all的所有版本所有接口的数据包,例如:interface=*&group=*&version=*&classifier=* 
        5. 当注册中心重启和网络抖动时,需要自动恢复订阅请求。
        6.允许URL相同但参数不同的URL共存,不能相互覆盖。
        7.当第一次通知完成后返回,订阅过程必须阻塞。
        参数:url 订阅条件,不允许为空,如consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
             listener:变更事件的监听器,不允许为空
        **/
        void subscribe(URL url, NotifyListener listener);
        
        /**
        退订
        需要满足以下约定: 
        1. 如果不订阅,直接忽略。 
        2. 通过完整的 URL 匹配取消订阅。
        参数:url 订阅条件,不允许为空,如consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
             listener:变更事件的监听器,不允许为空
        **/
        void unsubscribe(URL url, NotifyListener listener);
    
        /**
        查询符合条件的注册数据。 对应订阅的push模式,这是pull模式,只返回一个结果。
        **/
        List<url> lookup(URL url);
    
    }
    

    Node接口比较简单,这里就不说,在接下里的分析中会用到。

    3.2.2 AbstractRegistry初始化

    AbstractRegistry实现了Registry接口,在Registry接口中并没有定义新的方法,所以AbstractRegistry只是实现了一些上面的方法。

    AbstractRegistry的重要参数:

    
        private final Properties properties = new Properties();
        // 文件缓存定时写入
        private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
        //是否是同步保存文件
        private final boolean syncSaveFile;
        //使用原子变量来保证文件版本的一致性
        private final AtomicLong lastCacheChanged = new AtomicLong();
    
        /*记录已经注册服务的URL集合,注册的URL不仅仅可以是服务提供者的,也可以是服务消费者的。*/
        private final Set<url> registered = new ConcurrentHashSet<url>();
        
        /**消费者url订阅的监听器集合/
        private final ConcurrentMap<url, set<notifylistener="">> subscribed = new ConcurrentHashMap<url, set<notifylistener="">>();
        
        /*某个消费者被通知的服务URL集合,最外部URL的key是消费者的URL,value是一个map集合,里面的map中的key为分类名(providers、consumes、routes、configurators)四种,value是该类下的服务url集合。*/
        private final ConcurrentMap<url, map<string,="" list<url="">>> notified = new ConcurrentHashMap<url, map<string,="" list<url="">>>();
        /*注册中心URL*/
        private URL registryUrl;
        /*本地磁盘缓存文件,缓存注册中心的数据*/
        private File file;
    

    AbstractRegistry的的构造函数如下。

    public AbstractRegistry(URL url) {
            //1. 设置url,即给registryUrl赋值
            setUrl(url);
            //2. 配置中心的URL中是否配置了同步保存文件属性,否则默认为false
            syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
            //3. 配置信息本地缓存的文件名
            String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
            File file = null;
            if (ConfigUtils.isNotEmpty(filename)) {
                file = new File(filename);
                //不存在目录,则创建。
                if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                    if (!file.getParentFile().mkdirs()) {
                        throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                    }
                }
            }
            this.file = file;
            //4. 磁盘中的注册信息缓存文件存在,则先加载到properties对象中
            loadProperties();
            //
            notify(url.getBackupUrls());
    }
    
    
    private void loadProperties() {
            if (file != null && file.exists()) {
                //当本地存在配置缓存文件时
                InputStream in = null;
                try {
                    in = new FileInputStream(file);
                    //读取配置文件的内容,并加载为properties的键值对存储
                    properties.load(in);
                    if (logger.isInfoEnabled()) {
                        logger.info("Load registry store file " + file + ", data: " + properties);
                    }
                } catch (Throwable e) {
                    logger.warn("Failed to load registry store file " + file, e);
                } finally {
                    if (in != null) {
                        try {
                            in.close();
                        } catch (IOException e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
                }
            }
    }
    

    3.2.2 注册与取消注册

    这里采用ConcurrentHashSet来存储注册的服务,就是registered属性

    @Override
    public void register(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("register url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Register: " + url);
        }
        //将url添加到registered中
        registered.add(url);
    }
    
    @Override
    public void unregister(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("unregister url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unregister: " + url);
        }
       //将url从registered中移除
        registered.remove(url);
    }
    

    3.2.3 订阅与取消订阅

    通过消费者url从subscribed变量中获取该消费者的所有监听器集合,然后将该监听器放入到集合中,取消同理。

    
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("subscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("subscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Subscribe: " + url);
        }
        Set<notifylistener> listeners = subscribed.get(url);
        if (listeners == null) {
            subscribed.putIfAbsent(url, new ConcurrentHashSet<notifylistener>());
            listeners = subscribed.get(url);
        }
        listeners.add(listener);
    }
    
    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("unsubscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("unsubscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unsubscribe: " + url);
        }
        Set<notifylistener> listeners = subscribed.get(url);
        if (listeners != null) {
            listeners.remove(listener);
        }
    }
    

    3.2.4 服务的恢复

    服务恢复包括了注册服务的恢复和订阅服务的恢复,以为在内存中缓存了注册的服务和订阅的服务,因此恢复时会重新拉取这些数据,分别调用发布和订阅的方法来重新将其录入到注册中心中。

    protected void recover() throws Exception {
        //注册服务的恢复
        //1. 获取的已经注册的服务的值,即registered
        Set<url> recoverRegistered = new HashSet<url>(getRegistered());
        //2. 如果不是空,就将每一个服务重新注册到注册中心
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                //3. 调用register(URL url) 方法,将服务注册到注册中心
                register(url);
            }
        }
        // 订阅服务的恢复
        //1. 获取subscribed的值。
        Map<url, set<notifylistener="">> recoverSubscribed = new HashMap<url, set<notifylistener="">>(getSubscribed());
        //2. 如果不是null,则将订阅注册中心的服务
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<url, set<notifylistener="">> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    //3. 调用subscribe 方法,订阅注册中心的服务。
                    subscribe(url, listener);
                }
            }
        }
    }
    

    这里可能大家会有点困惑,因为在AbstractRegistry的实现中,订阅和发布就是单纯的将URL添加到相应的集合中去。这里的逻辑不就相当于再添加一次吗?其实在AbstractRegsitry的具体实现中,发布意味着还需要向注册中心真正的通过RPC建立联系。而不仅仅是将地址加入对应的集合中。

    3.2.5 通知

    notify是指将一组URL推送给订阅了该URL的订阅端。在推送的时候,会将url根据cateogry分组,之后再分别推送不同的分组。

    protected void notify(List<url> urls) {
        if (urls == null || urls.isEmpty()) return;
        // 遍历订阅URL的监听器集合,通知他们
        for (Map.Entry<url, set<notifylistener="">> entry : getSubscribed().entrySet()) {
            URL url = entry.getKey();
    
            if (!UrlUtils.isMatch(url, urls.get(0))) {
                continue;
            }
    
            Set<notifylistener> listeners = entry.getValue();
            if (listeners != null) {
                // 遍历监听器集合,通知他们
                for (NotifyListener listener : listeners) {
                    try {
                        notify(url, listener, filterEmpty(url, urls));
                    } catch (Throwable t) {
                        logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }
    
    /**
         * 通知监听器,URL 变化结果
         * @param url
         * @param listener
         * @param urls
         */
    protected void notify(URL url, NotifyListener listener, List<url> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
            && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<string, list<url="">> result = new HashMap<string, list<url="">>();
        // 将urls进行分类
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                // 按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<url> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<url>();
                    // 分类结果放入result
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        // 获得某一个消费者被通知的url集合(通知的 URL 变化结果)
        Map<string, list<url="">> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            // 添加该消费者对应的url
            notified.putIfAbsent(url, new ConcurrentHashMap<string, list<url="">>());
            categoryNotified = notified.get(url);
        }
        // 处理通知监听器URL 变化结果
        for (Map.Entry<string, list<url="">> entry : result.entrySet()) {
            String category = entry.getKey();
            List<url> categoryList = entry.getValue();
            // 把分类标实和分类后的列表放入notified的value中
            // 覆盖到 `notified`
            // 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。
            categoryNotified.put(category, categoryList);
            // 变更保存到文件
            saveProperties(url);
            //监听器通知
            listener.notify(categoryList);
        }
    }
    

    3.3 缓存机制

    缓存的存在就是用空间换时间,如果没次运行都先从注册中心获取移除可调用服务列表,则会让注册中心承受巨大的流量压力。另外没次额外的网络请求也会让整个系统性能下降,因此Dubbo在AbstractRegistry类中实现了缓存机制。在启动时会先加载本地的配置文件,当注册中心变更时,会同时将变更保存到本地文件中。

    本地配置文件加载到内存中是通过loadProperties()方法实现的,前面已经介绍过,接下来看一下本地缓存文件的更新,当接受到变更通知时,最后会执行saveProperties(url),该方法定义如下

    private void saveProperties(URL url) {
        if (file == null) {
            return;
        }
    
        try {
            StringBuilder buf = new StringBuilder();
    
            //读取到最新的订阅信息
            Map<string, list<url="">> categoryNotified = notified.get(url);
            if (categoryNotified != null) {
                for (List<url> us : categoryNotified.values()) {
                    for (URL u : us) {
                        if (buf.length() > 0) {
                            buf.append(URL_SEPARATOR);
                        }
                        buf.append(u.toFullString());
                    }
                }
            }
            properties.setProperty(url.getServiceKey(), buf.toString());
            long version = lastCacheChanged.incrementAndGet();
            //同步/异步保存文件标识
            if (syncSaveFile) {
                doSaveProperties(version);
            } else {
                registryCacheExecutor.execute(new SaveProperties(version));
            }
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
    

    先看异步保存的线程

    private class SaveProperties implements Runnable {
        private long version;
    
        private SaveProperties(long version) {
            this.version = version;
        }
    
        @Override
        public void run() {
            doSaveProperties(version);
        }
    }
    

    同步更新和异步更新本地缓存文件最后都调用doSaveProperties方法进行保存过程:

    public void doSaveProperties(long version) {
        //先检查一下自己是不是老版本
        if (version < lastCacheChanged.get()) {
            return;
        }
        if (file == null) {
            return;
        }
        // Save
        try {
            File lockfile = new File(file.getAbsolutePath() + ".lock");
            if (!lockfile.exists()) {
                lockfile.createNewFile();
            }
            RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
            try {
                FileChannel channel = raf.getChannel();
                try {
                    FileLock lock = channel.tryLock();
                    if (lock == null) {
                        throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
                    }
                    // Save
                    try {
                        if (!file.exists()) {
                            file.createNewFile();
                        }
                        FileOutputStream outputFile = new FileOutputStream(file);
                        try {
                            properties.store(outputFile, "Dubbo Registry Cache");
                        } finally {
                            outputFile.close();
                        }
                    } finally {
                        lock.release();
                    }
                } finally {
                    channel.close();
                }
            } finally {
                raf.close();
            }
        } catch (Throwable e) {
            if (version < lastCacheChanged.get()) {
                return;
            } else {
                registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
            }
            logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e);
        }
    }
    

    总结:当A线程进入原子操作区间时,先对辅助文件加锁,然后操纵主文件,操作结束后,释放辅助文件锁。

    如此一来,当A线程没有退出原子区域时候,B线程是无法进入进入原子区域的,因为获取不到文件锁。

    这样就可以保证对主文件的操作安全性。

    也许你会问:“为什么不直接对主文件加锁呢?” 答案是,对主文件加锁了,又如何操作主文件呢,因为文件锁的机制是会对文件的操作屏蔽的。

    文件锁

    3.4 FailbackRegistry源码分析

    FailbackRegistry抽象类继承了AbstractRegistry抽象类,并再次基础上增加了失败重试的机制作为抽象能力。ZookeeperRegistry和RedisRegistry继承该抽象方法后,直接可以使用。FailbackRegistry实现了subscribe、unsubscribe等通用方法,里面调用了未实现的模板方法,会由子类实现。通用方法会调用这些模板方法,如果捕获到异常,则会把URL添加到对应的重试集合中,以供定时器重试。

    3.4.1 类的属性和构造方法

    FailbackRegistry中定义了一些与重试相关的属性

    // Scheduled executor service 重试线程池,用于每个一段时间进行重试
    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
    
    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
    //失败重试定时器,定期检查是否有失败请求,有则无限次重试
    private final ScheduledFuture<!--?--> retryFuture;
    
    //发起注册失败的URL集合
    private final Set<url> failedRegistered = new ConcurrentHashSet<url>();
    //取消注册失败的URL集合
    private final Set<url> failedUnregistered = new ConcurrentHashSet<url>();
    //发起订阅失败的监听器集合
    private final ConcurrentMap<url, set<notifylistener="">> failedSubscribed = new ConcurrentHashMap<url, set<notifylistener="">>();
    //取消订阅的监听器集合
    private final ConcurrentMap<url, set<notifylistener="">> failedUnsubscribed = new ConcurrentHashMap<url, set<notifylistener="">>();
    //通知失败的URL集合
    private final ConcurrentMap<url, map<notifylistener,="" list<url="">>> failedNotified = new ConcurrentHashMap<url, map<notifylistener,="" list<url="">>>();
    //重试线程等待的时间(ms)
    private final int retryPeriod;
    

    构造函数如下:

    public FailbackRegistry(URL url) {
        //先初始化父类的构造函数
        super(url);
        //从url中获取失败重试的间隔时间,默认是5秒
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        //定时重试机制
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
    

    定时任务调用retry();方法执行重试逻辑

    // Retry the failed actions
    protected void retry() {
        //对注册失败的URL重新注册
        if (!failedRegistered.isEmpty()) {
            Set<url> failed = new HashSet<url>(failedRegistered);
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry register " + failed);
                }
                try {
                    for (URL url : failed) {
                        try {
                            //遍历URL重新进行注册
                            doRegister(url);
                            failedRegistered.remove(url);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        //对取消注册失败的URL重新取消注册
        if (!failedUnregistered.isEmpty()) {
            Set<url> failed = new HashSet<url>(failedUnregistered);
            if (!failed.isEmpty()) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry unregister " + failed);
                }
                try {
                    for (URL url : failed) {
                        try {
                            //取消注册
                            doUnregister(url);
                            failedUnregistered.remove(url);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        //重试发起订阅失败的监听集合
        if (!failedSubscribed.isEmpty()) {
            Map<url, set<notifylistener="">> failed = new HashMap<url, set<notifylistener="">>(failedSubscribed);
            for (Map.Entry<url, set<notifylistener="">> entry : new HashMap<url, set<notifylistener="">>(failed).entrySet()) {
                if (entry.getValue() == null || entry.getValue().size() == 0) {
                    failed.remove(entry.getKey());
                }
            }
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry subscribe " + failed);
                }
                try {
                    for (Map.Entry<url, set<notifylistener="">> entry : failed.entrySet()) {
                        URL url = entry.getKey();
                        Set<notifylistener> listeners = entry.getValue();
                        for (NotifyListener listener : listeners) {
                            try {
                                //重新订阅
                                doSubscribe(url, listener);
                                listeners.remove(listener);
                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                                logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                            }
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        //重试取消订阅失败的集合
        if (!failedUnsubscribed.isEmpty()) {
            Map<url, set<notifylistener="">> failed = new HashMap<url, set<notifylistener="">>(failedUnsubscribed);
            for (Map.Entry<url, set<notifylistener="">> entry : new HashMap<url, set<notifylistener="">>(failed).entrySet()) {
                if (entry.getValue() == null || entry.getValue().isEmpty()) {
                    failed.remove(entry.getKey());
                }
            }
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry unsubscribe " + failed);
                }
                try {
                    for (Map.Entry<url, set<notifylistener="">> entry : failed.entrySet()) {
                        URL url = entry.getKey();
                        Set<notifylistener> listeners = entry.getValue();
                        for (NotifyListener listener : listeners) {
                            try {
                                //重新取消订阅
                                doUnsubscribe(url, listener);
                                listeners.remove(listener);
                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                                logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                            }
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        //重试通知失败的URL集合
        if (!failedNotified.isEmpty()) {
            Map<url, map<notifylistener,="" list<url="">>> failed = new HashMap<url, map<notifylistener,="" list<url="">>>(failedNotified);
            for (Map.Entry<url, map<notifylistener,="" list<url="">>> entry : new HashMap<url, map<notifylistener,="" list<url="">>>(failed).entrySet()) {
                if (entry.getValue() == null || entry.getValue().size() == 0) {
                    failed.remove(entry.getKey());
                }
            }
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry notify " + failed);
                }
                try {
                    for (Map<notifylistener, list<url="">> values : failed.values()) {
                        for (Map.Entry<notifylistener, list<url="">> entry : values.entrySet()) {
                            try {
                                //重新通知
                                NotifyListener listener = entry.getKey();
                                List<url> urls = entry.getValue();
                                listener.notify(urls);
                                values.remove(listener);
                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                                logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                            }
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
    }
    

    FailbackRegistry继承了AbstractRegistry类,重写了注册,订阅、查询和通知等方法,并添加了重试机制。此外,还添加了四个为实现的抽象像模板。

    // ==== Template method ====
    
    protected abstract void doRegister(URL url);
    
    protected abstract void doUnregister(URL url);
    
    protected abstract void doSubscribe(URL url, NotifyListener listener);
    
    protected abstract void doUnsubscribe(URL url, NotifyListener listener);
    

    3.4.2 模板模式调用

    以订阅为例,FailbackRegistry重写了subscribe方法,但只实现了订阅的大体逻辑及一场处理等通用的东西。具体订阅,交给继承的子类实现。这就是模板模式的具体实现,代码如下

    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // 此处调用模板方法,由子类自行实现
            doSubscribe(url, listener);
        } catch (Exception e) {
           .....
    }
    

    3.5 Dubbo操作Zookeeper

    Dubbo中通过ZookeeperClient的实现类来对zookeeper进行操作,下面是ZookeeperClient接口

    public interface ZookeeperClient {
    
        void create(String path, boolean ephemeral);
    
        void delete(String path);
    
        List<string> getChildren(String path);
    
        List<string> addChildListener(String path, ChildListener listener);
    
        void removeChildListener(String path, ChildListener listener);
    
        void addStateListener(StateListener listener);
    
        void removeStateListener(StateListener listener);
    
        boolean isConnected();
    
        void close();
    
        URL getUrl();
    
    }
    

    ZookeeperClient有两个实现类,分别是CuratorZookeeperClient和ZkclientZookeeperClient,ZkclientZookeeperClient是官方提供的包,CuratorZookeeperClient是通过Apache的Curator框架实现的,默认使用CuratorZookeeperClient。

    在ZkclientZookeeperClient中,Dubbo定义了一个包装类ZkClientWrapper,在这个包装类里面对zookeeper进行操作。源码比较简单,有兴趣的可以看看。

    在ZookeeperRegistry的构造函数参数中传入了一个ZookeeperTransporter接口,该接口如下:

    @SPI("curator")
    public interface ZookeeperTransporter {
    
        @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
        ZookeeperClient connect(URL url);
    
    }
    

    通过这个个接口能获取到ZookeeperClient的实例,该接口也有两个实现CuratorZookeeperTransporter和ZkclientZookeeperTransporter,ZookeeperRegistry正是通过connect(URL url);方法获取到ZookeeperClient实例的。

    3.6 ZookeeperRegistry源码分析

    3.6.1 构造方法

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        //如果不进行配置,默认dubbo根目录就是/dubbo
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        //获取ZookeeperClient实例
        zkClient = zookeeperTransporter.connect(url);
        //zookeeper添加重连回调,会触发recover方法,进行失败任务重试
        //为什么FailbackRegistry都是用线程安全的集合,因为在这里存在线程竞争资源
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
    

    3.6.2 注册与取消注册

    对于zookeeper来说就是创建目录和删除目录呗,这里实现了FailbackRegistry的模板方法,如下:

    @Override
    /**
         * 注册的逻辑,就是在zookeeper创建节点,节点路径为toUrlPath(url)
         * 具体格式为 /{group}/{interfaceName}/{category}/{url.toFullString}
         * DYNAMIC_KEY表示是否创建永久节点,true表示不是,断开连接后会消失,所以需要进行recover
         * @param url
         */
    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    /**
         * 取消注册,就是删除那个节点
         * @param url
         */
    @Override
    protected void doUnregister(URL url) {
        try {
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

    需要注意下节点的路径生成格式,也就是toUrlPath(url)方法,格式为 /{group}/{interfaceName}/{category}/{url.toFullString},
    group一般不配置的话为dubbo,
    interfaceName对应具体接口,
    category开始就讲过,分为consumers,configuators,routers,providers,对于registry来讲category=providers
    url.toFullString就是我们的url配置。

    3.6.3 订阅和取消订阅

    订阅对于消费者来讲就是获取providers和routers,用于得到服务提供者的路由信息。

    对于提供者来讲,configurations,通过新的配置重新暴露服务。

    在ZookeeperRegistry,我们只关注如何进行订阅。doSubscribe方法支持订阅全局和订阅特定接口;如果interface=*,即订阅全局,对于新增和已存在的所有接口的改动都会触发回调;如果interface=特定接口,那么只有这个接口的子节点改变时,才触发回调。

    @Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            //如果interface=*,需要订阅所有接口
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<notifylistener, childlistener=""> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<notifylistener, childlistener="">());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<string> currentChilds) {
                            for (String child : currentChilds) {
                                child = URL.decode(child);
                                //如果有新的服务加入
                                if (!anyServices.contains(child)) {
                                    anyServices.add(child);
                                    //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
                                    //这里是用来对/dubbo下面提供者新增时的回调,相当于增量
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                                                               Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                List<string> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
                         //这里的逻辑只执行一次,一次全量
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                                                     Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                //这边是针对明确interface的订阅逻辑
                List<url> urls = new ArrayList<url>();
                //针对每种category路径进行监听
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<notifylistener, childlistener=""> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<notifylistener, childlistener="">());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        //封装回调逻辑
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<string> currentChilds) {
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    //创建节点
                    zkClient.create(path, false);
                    //增加回调
                    List<string> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //如果有子节点,直接进行触发一次,对应AbstractRegsitry的lookup方法
                //意思就是第一次订阅,如果订阅目录存在子节点,直接会触发一次
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

    这边需要注意一点的是,每次进行订阅,最重要的第一次,会使用当前订阅节点的子节点数据触发一次notify,执行对应监听器逻辑,这个在后面RegistryDirectory中会用到这个特性。

    第48行 toCategoriesPath 方法中的逻辑,是根据url中的category参数中的信息,来生成categoryUrl。如category中的参数包含多个值,则会生成多个categoryUrl。如url 中”category=consumers,routers”,则会生成如”/dubbo/org.apache.dubbo.demo.DemoService/consumers”,”/dubbo/org.apache.dubbo.demo.DemoService/routers”等的categoryURL。

    变量zkListeners的声明为:

    private final ConcurrentMap<url, concurrentmap<notifylistener,="" childlistener="">> zkListeners = new ConcurrentHashMap<>();
    

    变量zkListeners中存放的是URL和事件监听器的映射关系。其中,NotifyListener类型的监听器是用于消费端订阅URL信息后,执行事件推送,而ChildListener是用于监听某个节点下子节点列表的变化信息。我们先来看一下ChildListener监听器的如下实现。

    public void childChanged(String path, List<string> children) {
    	notify(url, listener, toUrlsWithEmpty(url, path, children));
    }
    

    以上的代码,即是监听到path的子节点列表变化后,将要执行的逻辑。其中,children列表是path的子节点列表。在toUrlWithEmpty的逻辑中,是根据当前的url获取应该向url推送的通知中应该包含的列表类型。其中,主要的逻辑是在UrlUtils的isMatch方法。在这个方法中,提供了判断两个url是否匹配的相关逻辑,简而言之是根据interface、group、version等参数来进行判断。逻辑比较简单,大家可自行查阅。

    zkClient.create(path, false);
    List<string> children = zkClient.addChildListener(path, zkListener);
    

    如上的两句代码,则是执行了节点的创建,并获取了节点下的子节点信息,同时建立了对此路径下子节点变化的事件监听。获取到子节点信息之后,进一步可以提取出相关的URL信息列表,向执行订阅的URL进行推送。

    取消订阅没什么好讲的,删除订阅数据即可。

    讲了这么多,对于lookup方法,使用消费者查找提供者的逻辑其实也很简单。使用消费者url构造出zk中provider的目录,然后返回所有子节点即可。

    /**
         * 查找消费者url 对应 提供者url实现
         * 这边的url为消费者url
         * @param url
         * @return
         */
    @Override
    public List<url> lookup(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("lookup url == null");
        }
        try {
            List<string> providers = new ArrayList<string>();
            //返回inteface下面所有category的url
            for (String path : toCategoriesPath(url)) {
                List<string> children = zkClient.getChildren(path);
                if (children != null) {
                    providers.addAll(children);
                }
            }
            //返回匹配的url
            return toUrlsWithoutEmpty(url, providers);
        } catch (Throwable e) {
            throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

    3.7 Registry工场方法分析

    3.7.1 RegistryFactory

    在dubbo中,注册中心相关的代码在dubbo-registry模块下,子模块dubbo-registry-api中定义了注册中心相关的基础代码,而在dubbo-registry-xxx模块中则定义了具体的注册中心类型实现代码,例如dubbo-registry-zookeeper模块则存放了zookeeper注册中心的实现代码。

    首先来看一下RegistryFactory接口。

    @SPI("dubbo")
    public interface RegistryFactory {
        @Adaptive({"protocol"})
        Registry getRegistry(URL url);
    }
    

    我们现在先不分析@SPI注解和@Adaptive注解。除此之外,可以发现RegistryFactory接口的定义非常简单,就只有一个getRegistry方法。其中,URL为dubbo中封装的统一的资源定位符,在其中定义了协议protocol、用户名username、密码password、host主机、path路径等等属性。

    可以发现,getRegistry是一个典型的工厂方法。给定url,生成具体的注册中心对象。在dubbo中,实现了AbstractRegistryFactory,在其中实现了getRegistry方法。

    3.7.2 AbstractRegistryFactory

    AbstractRegistryFactory实现了RegistryFactory中的getRegistry方法,其实现如下:

    @Override
    public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    
    protected abstract Registry createRegistry(URL url);
    

    从以上的代码结构上,我们可以很容易的发现,这里非常典型的应用了抽象工厂模式。方法createRegistry使用了abstract来进行修饰,从设计上此方法的逻辑就是交给抽象类AbstractRegistryFactory的具体子类来进行实现。事实上,dubbo之所以可以支持像redis、zookeeper、consul等等这么多的注册中心,就是得益于此处良好的设计。想要支持新的注册中心类型,只要实现对应的createRegistry方法即可。

    看一下以上代码的逻辑。其中,toServiceStringWithoutResolving是将URL对象转换为字符串完整表示。然后作为key值,来获取对应的注册中心。由于逻辑中存在着先判断后进行操作的逻辑,为了保证并发下的安全性,因此使用了加锁操作。

    3.7.3 ZookeeperRegistryFactory

    上面看过了AbstractRegistryFactory的定义之后,再来看一下ZooKeeperRegistryFactory。其createRegistry函数定义如下:

    public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
    
        private ZookeeperTransporter zookeeperTransporter;
    
        public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
            this.zookeeperTransporter = zookeeperTransporter;
        }
    
        @Override
        public Registry createRegistry(URL url) {
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }
    
    }
    

    我们可以发现,RegistryFactory经过层层抽象之后,终于在这里看到了真正构建了注册中心对象的代码。

    从接口RegistryFactory,到抽象类AbstractRegistryFactory,再到具体的实现类ZookeeperRegistryFactory、RedisRegistryFactory,有着非常清晰的类继承关系。AbstractRegistryFactory作为抽象工厂类,为dubbo的扩展中心提供了非常简单和优秀的扩展特性。
    </url,></notifylistener,></notifylistener,></notifylistener,></notifylistener,></notifylistener,></notifylistener,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></url,></string,></string,></string,></string,></string,></string,></url,></url,></url,></url,></url,></url,></url,></url,></dubbo:registry>

  • 相关阅读:
    转贴:Asp.Net 学习资源列表
    实现简单的写字板
    android绘图—Paint path 旋转
    Eclipse Android编程快捷键
    android Bitmap学习总结
    各种颜色对应的十六进制数
    Android surfaceView 与View 的区别
    SQLite设置_id自增的方法
    数据库表外键设置
    android自定义View的用法
  • 原文地址:https://www.cnblogs.com/ChenBingJie123/p/15414796.html
Copyright © 2011-2022 走看看