zoukankan      html  css  js  c++  java
  • Dubbo(六):zookeeper注册中心的应用

      Dubbo中有一个非常本质和重要的功能,那就是服务的自动注册与发现,而这个功能是通过注册中心来实现的。而dubbo中考虑了外部许多的注册组件的实现,zk,redis,etcd,consul,eureka...

      各自实现方式各有不同,但是对外表现都是一致的:都实现了 Registry 接口!

      今天我们就来看看最常用的注册中心 Zookeeper 的接入实现吧!

    1. dubbo在 zookeeper 中的服务目录划分

      注册中心的作用主要分发服务的发布,与服务订阅,及服务推送,服务查询!而zk中,则以服务主单位进行目录划分的。

      整个zookeeper的路径概览:

        /dubbo: root 目录, 持久化目录 (可通过 group=xxx 自定义root目录)
        /dubbo/xxx.xx.XxxService: 每个服务一个路径,持久化目录
        /dubbo/xxx.xx.XxxService/configurators: 配置存放路径,默认为空
        /dubbo/xxx.xx.XxxService/providers: 服务提供者目录,所有提供者以其子路径形式存储,持久化目录。各服务提供者以临时目录形式存在,路径样例如: dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D112756%26release%3D%26side%3Dprovider%26timestamp%3D1588548412331
        /dubbo/xxx.xx.XxxService/consumers: 服务消费者目录, 持久化路径。所有消费者以其子路径形式存储,路径样例如下:consumer%3A%2F%2F192.168.1.4%2Forg.apache.dubbo.rpc.service.GenericService%3Fapplication%3Ddubbo-demo-api-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26generic%3Dtrue%26interface%3Dorg.apache.dubbo.demo.DemoService%26pid%3D139620%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1588596195728
        /dubbo/xxx.xx.XxxService/routers: 路由配置信息,默认为空

    2. zookeeper 组件的接入

      Zookeeper 是在本地服务通过socket端口暴露之后,再调用 RegistryFactoryWrapper 进行获取的。

      注册时的URL如下: registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613&pid=135556&registry=zookeeper&timestamp=1588518545808

      zk的调用分两种情况,一是作为服务提供者时会在 ServiceConfig#doExportUrlsFor1Protocol 中,进行远程服务暴露时会拉起。二是在消费者在进行远程调用时会 ReferenceConfig#createProxy 时拉取以便获取提供者列表。我们以ServiceConfig为例看看其调用zk的过程:

        // ServiceConfig#doExportUrlsFor1Protocol 
        // 此处 PROTOCOL 是一个SPI类,默认实例是DubboProtocol,但其在处理具体协议时会根据协议类型做出相应选择
        // 此处 协议为 registry, 所以会选择 RegistryProtocol 进行export() 处理
        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
    
        // 而在RegistryProtocol中,又有几个重要属性,是dubbo进行依赖注入完成的
        // org.apache.dubbo.registry.integration.RegistryProtocol    
        public void setCluster(Cluster cluster) {
            this.cluster = cluster;
        }
        public void setProtocol(Protocol protocol) {
            this.protocol = protocol;
        }
        public void setRegistryFactory(RegistryFactory registryFactory) {
            this.registryFactory = registryFactory;
        }
    
    
        // org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export
        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            // 检查是否是要进行服务注册,即检查协议字段是否是 registry, 即前缀是 registry://
            // 注册完成后即返回
            if (UrlUtils.isRegistry(invoker.getUrl())) {
                // ProtocolFilterWrapper.export()
                return protocol.export(invoker);
            }
            return new ListenerExporterWrapper<T>(protocol.export(invoker),
                    Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                            .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
        }
        // org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export
        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            // 同样判断是否是注册请求
            if (UrlUtils.isRegistry(invoker.getUrl())) {
                // RegistryProtocol.export()
                return protocol.export(invoker);
            }
            return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
        }
        // org.apache.dubbo.registry.integration.RegistryProtocol#export
        @Override
        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            URL registryUrl = getRegistryUrl(originInvoker);
            // url to export locally
            URL providerUrl = getProviderUrl(originInvoker);
    
            // Subscribe the override data
            // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
            //  the same service. Because the subscribed is cached key with the name of the service, it causes the
            //  subscription information to cover.
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
            providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
            // url to registry
            // 获取registry实例,如 zk client
            final Registry registry = getRegistry(originInvoker);
            final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
    
            // decide if we need to delay publish
            boolean register = providerUrl.getParameter(REGISTER_KEY, true);
            if (register) {
                // 注册服务地址到注册中心
                register(registryUrl, registeredProviderUrl);
            }
    
            // register stated url on provider model
            registerStatedUrl(registryUrl, registeredProviderUrl, register);
    
            // Deprecated! Subscribe to override rules in 2.6.x or before.
            // 订阅目录
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
            exporter.setRegisterUrl(registeredProviderUrl);
            exporter.setSubscribeUrl(overrideSubscribeUrl);
    
            notifyExport(exporter);
            //Ensure that a new exporter instance is returned every time export
            return new DestroyableExporter<>(exporter);
        }
        /**
         * Get an instance of registry based on the address of invoker
         *
         * @param originInvoker
         * @return
         */
        protected Registry getRegistry(final Invoker<?> originInvoker) {
            URL registryUrl = getRegistryUrl(originInvoker);
            // RegistryFactory 又是通过 SPI 机制生成的    
            // 会根据具体的注册中心的类型创建调用具体实例,如此处为: zookeeper, 所以会调用 ZookeeperRegistryFactory.getRegistry()
            return registryFactory.getRegistry(registryUrl);
        }
        // 所有 RegistryFactory 都会被包装成 RegistryFactoryWrapper, 以便修饰
        // org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry
        @Override
        public Registry getRegistry(URL url) {
            // 对于zk, 会调用 ZookeeperRegistryFactory
            return new ListenerRegistryWrapper(registryFactory.getRegistry(url),
                    Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class)
                            .getActivateExtension(url, "registry.listeners")));
        }
        // org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL)
        @Override
        public Registry getRegistry(URL url) {
            if (destroyed.get()) {
                LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                        "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
                return DEFAULT_NOP_REGISTRY;
            }
    
            url = URLBuilder.from(url)
                    .setPath(RegistryService.class.getName())
                    .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                    .removeParameters(EXPORT_KEY, REFER_KEY)
                    .build();
            String key = createRegistryCacheKey(url);
            // Lock the registry access process to ensure a single instance of the registry
            LOCK.lock();
            try {
                Registry registry = REGISTRIES.get(key);
                if (registry != null) {
                    return registry;
                }
                //create registry by spi/ioc
                // 调用子类方法创建 registry 实例,此处为 ZookeeperRegistry.createRegistry
                registry = createRegistry(url);
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }
                REGISTRIES.put(key, registry);
                return registry;
            } finally {
                // Release the lock
                LOCK.unlock();
            }
        }
        // org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry
        @Override
        public Registry createRegistry(URL url) {
            // 最终将Zk组件接入到应用中了,后续就可以使用zk提供的相应功能了
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }

      至此,zookeeper被接入了。我们先来看下 zookeeper 注册中心构造方法实现:

        // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry
        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
            // 抽象父类处理
            super(url);
            if (url.isAnyHost()) {
                throw new IllegalStateException("registry address == null");
            }
            // group=dubbo
            String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
            if (!group.startsWith(PATH_SEPARATOR)) {
                group = PATH_SEPARATOR + group;
            }
            this.root = group;
            zkClient = zookeeperTransporter.connect(url);
            zkClient.addStateListener((state) -> {
                if (state == StateListener.RECONNECTED) {
                    logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.
    " +
                            " Since ephemeral ZNode will not get deleted for a connection lose, " +
                            "there's no need to re-register url of this instance.");
                    ZookeeperRegistry.this.fetchLatestAddresses();
                } else if (state == StateListener.NEW_SESSION_CREATED) {
                    logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                    try {
                        ZookeeperRegistry.this.recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                } else if (state == StateListener.SESSION_LOST) {
                    logger.warn("Url of this instance will be deleted from registry soon. " +
                            "Dubbo client will try to re-register once a new session is created.");
                } else if (state == StateListener.SUSPENDED) {
    
                } else if (state == StateListener.CONNECTED) {
    
                }
            });
        }
        // org.apache.dubbo.registry.support.FailbackRegistry#FailbackRegistry
        public FailbackRegistry(URL url) {
            super(url);
            // retry.period=5000
            this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
    
            // since the retry task will not be very much. 128 ticks is enough.
            // 当连接失败时,使用后台定时重试
            retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
        }
        // org.apache.dubbo.registry.support.AbstractRegistry#AbstractRegistry
        public AbstractRegistry(URL url) {
            setUrl(url);
            // 默认会开启file.cache 选项,所以一般会要求存在该文件
            if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
                // Start file save timer
                syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
                // 默认路径 $USER_HOME/.dubbo/dubbo-registry-dubbo-demo-api-provider-127.0.0.1-2181.cache
                String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
                String filename = url.getParameter(FILE_KEY, defaultFilename);
                File file = null;
                if (ConfigUtils.isNotEmpty(filename)) {
                    file = new File(filename);
                    if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                        if (!file.getParentFile().mkdirs()) {
                            throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                        }
                    }
                }
                this.file = file;
                // When starting the subscription center,
                // we need to read the local cache file for future Registry fault tolerance processing.
                // 如果文件存在先从其中加载原有配置
                loadProperties();
                notify(url.getBackupUrls());
            }
        }
        // 如果存在注册中心缓存文件,则从其中加载各属性值
        // 其值为 xxx.xxxService=xxx 格式
        private void loadProperties() {
            if (file != null && file.exists()) {
                InputStream in = null;
                try {
                    in = new FileInputStream(file);
                    properties.load(in);
                    if (logger.isInfoEnabled()) {
                        logger.info("Load registry cache file " + file + ", data: " + properties);
                    }
                } catch (Throwable e) {
                    logger.warn("Failed to load registry cache file " + file, e);
                } finally {
                    if (in != null) {
                        try {
                            in.close();
                        } catch (IOException e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
                }
            }
        }
        // org.apache.dubbo.registry.support.AbstractRegistry#notify(java.util.List<org.apache.dubbo.common.URL>)
        protected void notify(List<URL> urls) {
            if (CollectionUtils.isEmpty(urls)) {
                return;
            }
    
            for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
                URL url = entry.getKey();
    
                if (!UrlUtils.isMatch(url, urls.get(0))) {
                    continue;
                }
    
                Set<NotifyListener> listeners = entry.getValue();
                if (listeners != null) {
                    for (NotifyListener listener : listeners) {
                        try {
                            notify(url, listener, filterEmpty(url, urls));
                        } catch (Throwable t) {
                            logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
        }

      该构造函数的主要作用是建立与zkServer的连接实例,添加状态监听,以及本地缓存文件的处理。

    3. Zookeeper 服务提供者注册

      上一节我们看到在初始化 zookeeper 时,是在export()过程中通过 getRegistry() 实现的。同样,在export()过程中,在获取了注册中心实例后,还需要将服务地址注册上去,才算功成。服务注册的时序图如下:

        // org.apache.dubbo.registry.integration.RegistryProtocol#export
        @Override
        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            URL registryUrl = getRegistryUrl(originInvoker);
            // url to export locally
            URL providerUrl = getProviderUrl(originInvoker);
    
            // Subscribe the override data
            // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
            //  the same service. Because the subscribed is cached key with the name of the service, it causes the
            //  subscription information to cover.
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
            providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
            // url to registry
            // 1. 获取注册中心实例
            final Registry registry = getRegistry(originInvoker);
            final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
    
            // decide if we need to delay publish
            // 2. 将 registeredProviderUrl 注册上去
            boolean register = providerUrl.getParameter(REGISTER_KEY, true);
            if (register) {
                register(registryUrl, registeredProviderUrl);
            }
    
            // register stated url on provider model
            registerStatedUrl(registryUrl, registeredProviderUrl, register);
    
            // Deprecated! Subscribe to override rules in 2.6.x or before.
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
            exporter.setRegisterUrl(registeredProviderUrl);
            exporter.setSubscribeUrl(overrideSubscribeUrl);
    
            notifyExport(exporter);
            //Ensure that a new exporter instance is returned every time export
            return new DestroyableExporter<>(exporter);
        }
        private void register(URL registryUrl, URL registeredProviderUrl) {
            // 此处获取 registry 实际是registry的一个 wrapper, registryUrl 以 zookeeper:// 开头
            Registry registry = registryFactory.getRegistry(registryUrl);
            // 此处的 registeredProviderUrl 如: dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=135556&release=&side=provider&timestamp=1588518573613
            registry.register(registeredProviderUrl);
        }
        // org.apache.dubbo.registry.ListenerRegistryWrapper#register
        @Override
        public void register(URL url) {
            try {
                // 调用实际的 registry
                registry.register(url);
            } finally {
                if (CollectionUtils.isNotEmpty(listeners)) {
                    RuntimeException exception = null;
                    for (RegistryServiceListener listener : listeners) {
                        if (listener != null) {
                            try {
                                listener.onRegister(url);
                            } catch (RuntimeException t) {
                                logger.error(t.getMessage(), t);
                                exception = t;
                            }
                        }
                    }
                    if (exception != null) {
                        throw exception;
                    }
                }
            }
        }
        // org.apache.dubbo.registry.support.FailbackRegistry#register
        @Override
        public void register(URL url) {
            if (!acceptable(url)) {
                logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
                return;
            }
            super.register(url);
            removeFailedRegistered(url);
            removeFailedUnregistered(url);
            try {
                // Sending a registration request to the server side
                // 调用zk进行url注册
                doRegister(url);
            } catch (Exception e) {
                Throwable t = e;
    
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true)
                        && !CONSUMER_PROTOCOL.equals(url.getProtocol());
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
    
                // Record a failed registration request to a failed list, retry regularly
                addFailedRegistered(url);
            }
        }
        // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
        @Override
        public void doRegister(URL url) {
            try {
                // 创建zk node, dynamic=true, 默认创建的节点为临时节点
                // url地址如:  dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider&timestamp=1588639020690
                // 此地址通过 toUrlPath 转换为 zookeeper 的目录地址,比如 providers 目录,consumers 目录...
                zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
        // 将提供者信息转移为zk目录
        private String toUrlPath(URL url) {
            // 找到分类目录,最下级为当前提供者的所有url信息encode后的值
            // 即 /dubbo/interfaceName/providers/xxxx
            return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
        }
        private String toCategoryPath(URL url) {
            // category='', 默认为 providers
            return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
        }
        private String toServicePath(URL url) {
            String name = url.getServiceInterface();
            if (ANY_VALUE.equals(name)) {
                return toRootPath();
            }
            // /dubbo/interfaceName
            return toRootDir() + URL.encode(name);
        }

      服务注册主要就分两步:1. 获取registry实例(通过SPI机制); 2. 将服务的url转换为对应的路径,以临时节点的形式create到zkServer; 以服务名作为父路径,以自身服务url作为叶子路径,可以很方便在相应路径上找到所有的提供者或消费者。

    4. zookeeper 服务下线处理

      当应用要关闭,或者注册失败时,需要进行服务下线。当然,如果应用没有及时做下线处理,zk会通过其自身的临时节点过期机制,也会将该服务做下线处理。从而避免消费者或管理台看到无效的服务存在。

      应用服务的主动下线操作是由 ShutdownHookCallbacks 来实现的,其时序图如下:

        // org.apache.dubbo.config.bootstrap.DubboBootstrap#DubboBootstrap
        private DubboBootstrap() {
            configManager = ApplicationModel.getConfigManager();
            environment = ApplicationModel.getEnvironment();
            // 在 DubboBootstrap 创建时就创建几个关闭钩子
            DubboShutdownHook.getDubboShutdownHook().register();
            // 将 DubboBootstrap 的销毁动作添加到 DubboShutdownHook 的执行队列中,以便在关闭时一起调用
            ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
                @Override
                public void callback() throws Throwable {
                    DubboBootstrap.this.destroy();
                }
            });
        }
        /**
         * Register the ShutdownHook
         */
        public void register() {
            if (registered.compareAndSet(false, true)) {
                DubboShutdownHook dubboShutdownHook = getDubboShutdownHook();
                Runtime.getRuntime().addShutdownHook(dubboShutdownHook);
                dispatch(new DubboShutdownHookRegisteredEvent(dubboShutdownHook));
            }
        }
        // org.apache.dubbo.config.bootstrap.DubboBootstrap#destroy
        public void destroy() {
            if (destroyLock.tryLock()) {
                try {
                    // DubboShutdownHook 实现的destroy方法
                    DubboShutdownHook.destroyAll();
    
                    if (started.compareAndSet(true, false)
                            && destroyed.compareAndSet(false, true)) {
    
                        unregisterServiceInstance();
                        unexportMetadataService();
                        unexportServices();
                        unreferServices();
    
                        destroyRegistries();
                        DubboShutdownHook.destroyProtocols();
                        destroyServiceDiscoveries();
    
                        clear();
                        shutdown();
                        release();
                    }
                } finally {
                    destroyLock.unlock();
                }
            }
        }
        // org.apache.dubbo.config.DubboShutdownHook#destroyAll
        public static void destroyAll() {
            if (destroyed.compareAndSet(false, true)) {
                AbstractRegistryFactory.destroyAll();
                destroyProtocols();
            }
        }
        // org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll
        /**
         * Close all created registries
         */
        public static void destroyAll() {
            if (!destroyed.compareAndSet(false, true)) {
                return;
            }
    
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Close all registries " + getRegistries());
            }
            // Lock up the registry shutdown process
            LOCK.lock();
            try {
                // 获取所有注册的地址,一一进行destroy()操作
                for (Registry registry : getRegistries()) {
                    try {
                        registry.destroy();
                    } catch (Throwable e) {
                        LOGGER.error(e.getMessage(), e);
                    }
                }
                REGISTRIES.clear();
            } finally {
                // Release the lock
                LOCK.unlock();
            }
        }
        // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#destroy
        @Override
        public void destroy() {
            super.destroy();
            try {
                // (解绑)销毁动作只需执行一次,一次会将所有需要解绑的地址全部操作完成
                zkClient.close();
            } catch (Exception e) {
                logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
        // org.apache.dubbo.registry.support.FailbackRegistry#destroy
        @Override
        public void destroy() {
            super.destroy();
            retryTimer.stop();
        }
        // org.apache.dubbo.registry.support.AbstractRegistry#destroy
        @Override
        public void destroy() {
            if (logger.isInfoEnabled()) {
                logger.info("Destroy registry:" + getUrl());
            }
            Set<URL> destroyRegistered = new HashSet<>(getRegistered());
            if (!destroyRegistered.isEmpty()) {
                for (URL url : new HashSet<>(getRegistered())) {
                    if (url.getParameter(DYNAMIC_KEY, true)) {
                        try {
                            // 此处会将在该 register 中注册的所有地址进行解绑,所以,当前实例只需调用一次destroy()即可
                            unregister(url);
                            if (logger.isInfoEnabled()) {
                                logger.info("Destroy unregister url " + url);
                            }
                        } catch (Throwable t) {
                            logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
            Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
            if (!destroySubscribed.isEmpty()) {
                for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                    URL url = entry.getKey();
                    for (NotifyListener listener : entry.getValue()) {
                        try {
                            unsubscribe(url, listener);
                            if (logger.isInfoEnabled()) {
                                logger.info("Destroy unsubscribe url " + url);
                            }
                        } catch (Throwable t) {
                            logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
            AbstractRegistryFactory.removeDestroyedRegistry(this);
        }
        // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister
        @Override
        public void doUnregister(URL url) {
            try {
                // 注册地址如: /dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613
                zkClient.delete(toUrlPath(url));
            } catch (Throwable e) {
                throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

      其实就是一个关闭钩子,进行路径的主动删除操作。

    5. zookeeper 消费者服务订阅

      前面两个操作,服务的注册与解绑,可以针对提供者,也可以针对消费者。但是对服务订阅则更多是针对消费者!因其要时刻关注提供者的变化,接收注册中心的消息推送。其以 ReferenceConfig.get() 入口。所以,我们以消费者的视图进行解析zk的订阅过程:

        // org.apache.dubbo.registry.integration.RegistryProtocol#refer
        @Override
        @SuppressWarnings("unchecked")
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            url = getRegistryUrl(url);
            Registry registry = registryFactory.getRegistry(url);
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            // group="a,b" or group="*"
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
            String group = qs.get(GROUP_KEY);
            if (group != null && group.length() > 0) {
                if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                    return doRefer(getMergeableCluster(), registry, type, url);
                }
            }
            return doRefer(cluster, registry, type, url);
        }
    
        // org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
        private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
            URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (directory.isShouldRegister()) {
                directory.setRegisteredConsumerUrl(subscribeUrl);
                registry.register(directory.getRegisteredConsumerUrl());
            }
            directory.buildRouterChain(subscribeUrl);
            // 注册消费者监听到 zk 中
            // 添加路径 providers, configurators, routers, 以便在同时监听几个目录
            directory.subscribe(toSubscribeUrl(subscribeUrl));
    
            Invoker<T> invoker = cluster.join(directory);
            List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
            if (CollectionUtils.isEmpty(listeners)) {
                return invoker;
            }
    
            RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
            for (RegistryProtocolListener listener : listeners) {
                listener.onRefer(this, registryInvokerWrapper);
            }
            return registryInvokerWrapper;
        }
        private static URL toSubscribeUrl(URL url) {
            return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
        }
    
        // org.apache.dubbo.registry.ListenerRegistryWrapper#subscribe
        @Override
        public void subscribe(URL url, NotifyListener listener) {
            try {
                registry.subscribe(url, listener);
            } finally {
                if (CollectionUtils.isNotEmpty(listeners)) {
                    RuntimeException exception = null;
                    for (RegistryServiceListener registryListener : listeners) {
                        if (registryListener != null) {
                            try {
                                registryListener.onSubscribe(url);
                            } catch (RuntimeException t) {
                                logger.error(t.getMessage(), t);
                                exception = t;
                            }
                        }
                    }
                    if (exception != null) {
                        throw exception;
                    }
                }
            }
        }
        // org.apache.dubbo.registry.support.FailbackRegistry#subscribe
        @Override
        public void subscribe(URL url, NotifyListener listener) {
            super.subscribe(url, listener);
            removeFailedSubscribed(url, listener);
            try {
                // Sending a subscription request to the server side
                doSubscribe(url, listener);
            } catch (Exception e) {
                Throwable t = e;
    
                List<URL> urls = getCacheUrls(url);
                if (CollectionUtils.isNotEmpty(urls)) {
                    notify(url, listener, urls);
                    logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
                } else {
                    // If the startup detection is opened, the Exception is thrown directly.
                    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                            && url.getParameter(Constants.CHECK_KEY, true);
                    boolean skipFailback = t instanceof SkipFailbackWrapperException;
                    if (check || skipFailback) {
                        if (skipFailback) {
                            t = t.getCause();
                        }
                        throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                    } else {
                        logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                    }
                }
    
                // Record a failed registration request to a failed list, retry regularly
                addFailedSubscribed(url, listener);
            }
        }
    
        // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
        @Override
        public void doSubscribe(final URL url, final NotifyListener listener) {
            try {
                if (ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), k);
                            }
                        }
                    });
                    zkClient.create(root, false);
                    List<String> services = zkClient.addChildListener(root, zkListener);
                    if (CollectionUtils.isNotEmpty(services)) {
                        for (String service : services) {
                            service = URL.decode(service);
                            anyServices.add(service);
                            subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                        }
                    }
                } else {
                    // configurators
                    List<URL> urls = new ArrayList<>();
                    // /dubbo/org.apache.dubbo.demo.DemoService/configurators 
                    // url 样例: provider://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.56.1&bind.port=20880&category=configurators&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider&timestamp=1588639020690
                    // 它会经过 toCategoriesPath() 转换为多个控制目录,依次注册
                    for (String path : toCategoriesPath(url)) {
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                        // listener 是 ConcurrentHashMap 中的 ConcurrentHashMap, 所以迭代也是两层的
                        // 当目录发生变更时,运行 notify() 方法
                        ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        // 与提供者的服务注册不一样,消费者的订阅路径是 持久化的
                        zkClient.create(path, false);
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            // 在添加监听的同时,也返回了当前目录下的所有路径,解析该值即可以得到相应的提供者等信息
                            // toUrlsWithEmpty 将消费者和提供者参数进行整合,以便排出优先级进行语义覆盖
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    // 自身注册完成后,主动触发一次 notify() 以刷新信息
                    notify(url, listener, urls);
                }
            } catch (Throwable e) {
                throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
        private String[] toCategoriesPath(URL url) {
            String[] categories;
            if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
                categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
            } else {
                categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
            }
            String[] paths = new String[categories.length];
            for (int i = 0; i < categories.length; i++) {
                // servicePath: /dubbo/interfaceName
                paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
            }
            return paths;
        }
        
        // 处理在添加监听时获取到的childPath信息,作为首次订阅时的依据
        private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
            List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
            if (urls == null || urls.isEmpty()) {
                // 针对url未匹配情况,添加一个  empty:// url 返回
                int i = path.lastIndexOf(PATH_SEPARATOR);
                String category = i < 0 ? path : path.substring(i + 1);
                URL empty = URLBuilder.from(consumer)
                        .setProtocol(EMPTY_PROTOCOL)
                        .addParameter(CATEGORY_KEY, category)
                        .build();
                urls.add(empty);
            }
            return urls;
        }
        private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
            List<URL> urls = new ArrayList<>();
            if (CollectionUtils.isNotEmpty(providers)) {
                for (String provider : providers) {
                    // 包含 :// 
                    if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
                        // 解码提供者url
                        URL url = URLStrParser.parseEncodedStr(provider);
                        // 判断消费者与提供者是否匹配,或者提供者是否被禁用
                        if (UrlUtils.isMatch(consumer, url)) {
                            urls.add(url);
                        }
                    }
                }
            }
            return urls;
        }
        // org.apache.dubbo.common.utils.UrlUtils#isMatch
        public static boolean isMatch(URL consumerUrl, URL providerUrl) {
            String consumerInterface = consumerUrl.getServiceInterface();
            String providerInterface = providerUrl.getServiceInterface();
            //FIXME accept providerUrl with '*' as interface name, after carefully thought about all possible scenarios I think it's ok to add this condition.
            // 接口名称判定
            if (!(ANY_VALUE.equals(consumerInterface)
                    || ANY_VALUE.equals(providerInterface)
                    || StringUtils.isEquals(consumerInterface, providerInterface))) {
                return false;
            }
            // category 类型判定
            if (!isMatchCategory(providerUrl.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY),
                    consumerUrl.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY))) {
                return false;
            }
            // 启用禁用标识判定
            if (!providerUrl.getParameter(ENABLED_KEY, true)
                    && !ANY_VALUE.equals(consumerUrl.getParameter(ENABLED_KEY))) {
                return false;
            }
    
            String consumerGroup = consumerUrl.getParameter(GROUP_KEY);
            String consumerVersion = consumerUrl.getParameter(VERSION_KEY);
            String consumerClassifier = consumerUrl.getParameter(CLASSIFIER_KEY, ANY_VALUE);
    
            String providerGroup = providerUrl.getParameter(GROUP_KEY);
            String providerVersion = providerUrl.getParameter(VERSION_KEY);
            String providerClassifier = providerUrl.getParameter(CLASSIFIER_KEY, ANY_VALUE);
            // 其他判定
            return (ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup))
                    && (ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))
                    && (consumerClassifier == null || ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
        }
        

      事实上,其步骤与提供者是类似的:1. 获取registry实例; 2. 注册自身消费标识到consumers目录; 3. 订阅providers,configurators,routers 子目录变更; 另外,在做订阅的同时,也拉取了提供者服务列表达到初始化的作用。

    6. zookeeper 服务信息通知notify

      当注册中心发现服务提供者发生了变化时,将会将该信息推送给订阅了相应路径的客户端。这个客户端首先会被前面设置的 zkListener 处理。

        // ZookeeperRegistry.doSubscribe
        @Override
        public void doSubscribe(final URL url, final NotifyListener listener) {
            ...
                    List<URL> urls = new ArrayList<>();
                    for (String path : toCategoriesPath(url)) {
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                        // 服务回调将由 ZookeeperRegistry.this.notify() 处理
                        ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkClient.create(path, false);
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    notify(url, listener, urls);
            ...
        }
        // org.apache.dubbo.registry.support.FailbackRegistry#notify            
        @Override
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            try {
                doNotify(url, listener, urls);
            } catch (Exception t) {
                // Record a failed registration request to a failed list, retry regularly
                addFailedNotified(url, listener, urls);
                logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }
        // org.apache.dubbo.registry.support.FailbackRegistry#doNotify
        protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
            super.notify(url, listener, urls);
        }
        // org.apache.dubbo.registry.support.AbstractRegistry#notify(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.util.List<org.apache.dubbo.common.URL>)
        /**
         * Notify changes from the Provider side.
         *
         * @param url      consumer side url
         * @param listener listener
         * @param urls     provider latest urls
         */
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((CollectionUtils.isEmpty(urls))
                    && !ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
            // keep every provider's category.
            Map<String, List<URL>> result = new HashMap<>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                    List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
                // 再交由最原始传入的 listener 进行处理, 即 RegistryDirectory.notify()
                listener.notify(categoryList);
                // We will update our cache file after each notification.
                // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
                saveProperties(url);
            }
        }
        // org.apache.dubbo.registry.integration.RegistryDirectory#notify
        @Override
        public synchronized void notify(List<URL> urls) {
            Map<String, List<URL>> categoryUrls = urls.stream()
                    .filter(Objects::nonNull)
                    .filter(this::isValidCategory)
                    .filter(this::isNotCompatibleFor26x)
                    .collect(Collectors.groupingBy(this::judgeCategory));
    
            List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
            this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
    
            List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
            // 添加到所有提供者列表中,以便在后续请求可以将最新的地址信息给consumer使用
            toRouters(routerURLs).ifPresent(this::addRouters);
    
            // providers
            List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
            /**
             * 3.x added for extend URL address
             */
            ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
            List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
            if (supportedListeners != null && !supportedListeners.isEmpty()) {
                for (AddressListener addressListener : supportedListeners) {
                    providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
                }
            }
            refreshOverrideAndInvoker(providerURLs);
        }

      

    7. zookeeper 服务解除事件订阅

      除了服务下线外,还需要解除订阅关系。以便zookeeper不用再维护其监听推送。

        // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnsubscribe
        @Override
        public void doUnsubscribe(URL url, NotifyListener listener) {
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners != null) {
                ChildListener zkListener = listeners.get(listener);
                if (zkListener != null) {
                    if (ANY_VALUE.equals(url.getServiceInterface())) {
                        String root = toRootPath();
                        zkClient.removeChildListener(root, zkListener);
                    } else {
                        // 将路径列举出来,一个个解除监听
                        for (String path : toCategoriesPath(url)) {
                            zkClient.removeChildListener(path, zkListener);
                        }
                    }
                }
            }
        }
        // 解除监听操作同样是由关闭钩子 触发的
        // org.apache.dubbo.registry.support.AbstractRegistry#destroy
        @Override
        public void destroy() {
            if (logger.isInfoEnabled()) {
                logger.info("Destroy registry:" + getUrl());
            }
            Set<URL> destroyRegistered = new HashSet<>(getRegistered());
            if (!destroyRegistered.isEmpty()) {
                for (URL url : new HashSet<>(getRegistered())) {
                    if (url.getParameter(DYNAMIC_KEY, true)) {
                        try {
                            unregister(url);
                            if (logger.isInfoEnabled()) {
                                logger.info("Destroy unregister url " + url);
                            }
                        } catch (Throwable t) {
                            logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
            Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
            if (!destroySubscribed.isEmpty()) {
                for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                    URL url = entry.getKey();
                    for (NotifyListener listener : entry.getValue()) {
                        try {
                            // 解除监听操作
                            unsubscribe(url, listener);
                            if (logger.isInfoEnabled()) {
                                logger.info("Destroy unsubscribe url " + url);
                            }
                        } catch (Throwable t) {
                            logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
            AbstractRegistryFactory.removeDestroyedRegistry(this);
        }
        // org.apache.dubbo.registry.support.FailbackRegistry#unsubscribe
        @Override
        public void unsubscribe(URL url, NotifyListener listener) {
            super.unsubscribe(url, listener);
            removeFailedSubscribed(url, listener);
            try {
                // Sending a canceling subscription request to the server side
                // 调用 ZookeeperRegistry.doUnregister(), 删除路径监听
                doUnsubscribe(url, listener);
            } catch (Exception e) {
                Throwable t = e;
    
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
    
                // Record a failed registration request to a failed list, retry regularly
                addFailedUnsubscribed(url, listener);
            }
        }

      和前面的下线操作意义相同,只是一个是 delete 路径,一个是解除变更监听。

      dubbo 中以服务为单位进行注册通知管理,避免了一个因应用提供者提供许多服务,而导致消费者必须处理大量数据的情况。各消费者只关注与自己相关的服务路径,从而最小化影响。通过临时节点和心跳机制,保证了各服务的准确体现。这是zk比较擅长的。虽然该点功能小,却意义重大。

  • 相关阅读:
    php基础之简单运算
    选择平淡
    php基础之控制结构
    关于三元运算符的初步应用及理解
    VS2015 遇到异常。这可能是由某个扩展导致的
    C#中如何去除窗体默认的关闭按钮
    (转载)SQL基础--> 约束(CONSTRAINT)
    SQL Server安装后设置SQL Server验证登录
    附加数据库 对于 ""失败,无法打开物理文件 操作系统错误 5:拒绝访问 SQL Sever
    SQL Server数据库操作(二)
  • 原文地址:https://www.cnblogs.com/yougewe/p/12831556.html
Copyright © 2011-2022 走看看