zoukankan      html  css  js  c++  java
  • dubbo学习(六)服务字典

    文章部分描述来自官方文档,本文仅对其进行润色。

    概述

    我们上一节在分析服务导入的时候,createProxy 方法中,调用栈出现了 RegistryDirectory 这个类,且在这个类中进行了很多复杂的操作,最后还创建了一个 invoker ,那么这个 RegistryDirectory 到底是什么呢? 我们需要知道 invoker 是对一个 service 的某一个服务方法的抽象 。

    服务目录是什么

    来自官方的这两段话可以帮助我们理解

    服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。在一个服务集群中,服务提供者数量并不是一成不变的,如果集群中新增了一台机器,相应地在服务目录中就要新增一条服务提供者记录。或者,如果服务提供者的配置修改了,服务目录中的记录也要做相应的更新。

    实际上服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。Invoker 有什么用呢?看名字就知道了,这是一个具有远程调用功能的对象。讲到这大家应该知道了什么是服务目录了,它可以看做是 Invoker 集合,且这个集合中的元素会随注册中心的变化而进行动态调整。

    源码分析

    知道了服务字典的功能,我们看一下服务字典说涉及到的类

    1297993-20200414152004182-1051672435.png

    我们先看一下接口定义

    public interface Directory<T> extends Node {
    
        /**
         * get service type.
         *
         * @return service type.
         */
        Class<T> getInterface();
    
        /**
         * list invokers.
         *
         * @return invokers
         */
        List<Invoker<T>> list(Invocation invocation) throws RpcException;
    
        List<Invoker<T>> getAllInvokers();
    
    }
    
    
    public interface Node {
    
        /**
         * get url.
         *
         * @return url.
         */
        URL getUrl();
    
        /**
         * is available.
         *
         * @return available.
         */
        boolean isAvailable();
    
        /**
         * destroy.
         */
        void destroy();
    
    }
    
    

    RegistryDirectory 和 StaticDirectory 都继承 AbstractDirectory ,并使用模板模式。

    public abstract class AbstractDirectory<T> implements Directory<T> {
    
        // logger
        private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);
        //某个 service 的 url 
        private final URL url;
    
        private volatile boolean destroyed = false;
    
        private volatile URL consumerUrl;
        //路由相关
        protected RouterChain<T> routerChain;
    
    
    
        @Override
        public List<Invoker<T>> list(Invocation invocation) throws RpcException {
            if (destroyed) {
                throw new RpcException("Directory already destroyed .url: " + getUrl());
            }
    
            return doList(invocation);
        }
    
        ...
    
        //交由子类实现
        protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
    
    

    StaticDirectory

    StaticDirectory 即静态服务目录,顾名思义,它内部存放的 Invoker 是不会变动的。所以,理论上它和不可变 List 的功能很相似。下面我们来看一下这个类的实现。

    public class StaticDirectory<T> extends AbstractDirectory<T> {
        ....
        @Override
        protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
            List<Invoker<T>> finalInvokers = invokers;
            if (routerChain != null) {
                try {
                    //调用父类的路由信息查找对应的 invoker 
                    finalInvokers = routerChain.route(getConsumerUrl(), invocation);
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
            return finalInvokers == null ? Collections.emptyList() : finalInvokers;
        }
    
    
    }
    
    

    RegistryDirectory

    RegistryDirectory 是一种动态服务目录,实现了 NotifyListener 接口。当注册中心服务配置发生变化后,RegistryDirectory 可收到与当前服务相关的变化。收到变更通知后,RegistryDirectory 可根据配置变更信息刷新 Invoker 列表。 RegistryDirectory 中有几个比较重要的逻辑 :

    • Invoker 的列举逻辑
    • 接收服务配置变更的逻辑
    • Invoker 列表的刷新逻辑。

    后面两个逻辑可以查看上一节的方法调用栈。

    1297993-20200410152801789-1266734247.png

        @Override
        public List<Invoker<T>> doList(Invocation invocation) {
            if (forbidden) {
                // 1. No service provider 2. Service providers are disabled
                throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                        getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                        NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                        ", please check status of providers(disabled, not registered or in blacklist).");
            }
    
            if (multiGroup) {
                return this.invokers == null ? Collections.emptyList() : this.invokers;
            }
    
            List<Invoker<T>> invokers = null;
            try {
                // Get invokers from cache, only runtime routers will be executed.
                invokers = routerChain.route(getConsumerUrl(), invocation);
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
    
            return invokers == null ? Collections.emptyList() : invokers;
        }
    

    我们看一下 RD 的类结构,可以看到两个listener 内部类 1297993-20200414154647178-911565026.png

        private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
            private RegistryDirectory directory;
            private URL url;
    
            ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
                this.directory = directory;
                this.url = url;
                this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
            }
    
            @Override
            protected void notifyOverrides() {
                // to notify configurator/router changes
                directory.refreshInvoker(Collections.emptyList());
            }
        }
    
        private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
            List<RegistryDirectory> listeners = new ArrayList<>();
    
            ConsumerConfigurationListener() {
                this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
            }
    
            void addNotifyListener(RegistryDirectory listener) {
                this.listeners.add(listener);
            }
    
            @Override
            protected void notifyOverrides() {
                listeners.forEach(listener -> listener.refreshInvoker(Collections.emptyList()));
            }
        }
    
    

    可以看到更新的操作最终都是调用了 refreshInvoker 方法 。

        private void refreshInvoker(List<URL> invokerUrls) {
            Assert.notNull(invokerUrls, "invokerUrls should not be null");
            // invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务
            if (invokerUrls.size() == 1
                    && invokerUrls.get(0) != null
                    && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
                this.forbidden = true; // Forbid to access
                this.invokers = Collections.emptyList();
                routerChain.setInvokers(this.invokers);
                //删除所有的 invokers
                destroyAllInvokers(); // Close all invokers
            } else {
                this.forbidden = false; // Allow to access
                Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
                if (invokerUrls == Collections.<URL>emptyList()) {
                    invokerUrls = new ArrayList<>();
                }
                if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                    invokerUrls.addAll(this.cachedInvokerUrls);
                } else {
                    this.cachedInvokerUrls = new HashSet<>();
                    this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
                }
                if (invokerUrls.isEmpty()) {
                    return;
                }
                //<url,invoker>之间的映射
                Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
    
                /**
                 * If the calculation is wrong, it is not processed.
                 *
                 * 1. The protocol configured by the client is inconsistent with the protocol of the server.
                 *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
                 * 2. The registration center is not robust and pushes illegal specification data.
                 *
                 */
                if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                            .toString()));
                    return;
                }
    
                List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
                // pre-route and build cache, notice that route cache should build on original Invoker list.
                // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
                routerChain.setInvokers(newInvokers);
                this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
                this.urlInvokerMap = newUrlInvokerMap;
    
                try {
                    //删除没有使用的 invoker
                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
                } catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
            }
        }
    

    看完了底层更新的刷新 invoker 操作,我们看一下 notify 方法。

        @Override
        public synchronized void notify(List<URL> urls) {
            Map<String, List<URL>> categoryUrls = urls.stream()
                    .filter(Objects::nonNull)
                    .filter(this::isValidCategory)
                    .filter(this::isNotCompatibleFor26x)
                    .collect(Collectors.groupingBy(this::judgeCategory));
    
            List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
            this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
    
            List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
            toRouters(routerURLs).ifPresent(this::addRouters);
    
            // providers
            List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
            /**
             * 3.x added for extend URL address
             */
            ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
            List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
            if (supportedListeners != null && !supportedListeners.isEmpty()) {
                for (AddressListener addressListener : supportedListeners) {
                    providerURLs = addressListener.notify(providerURLs, getUrl(),this);
                }
            }
            refreshOverrideAndInvoker(providerURLs);
        }
    
    
    
        private void refreshOverrideAndInvoker(List<URL> urls) {
            // mock zookeeper://xxx?mock=return null
            //重写 url 
            overrideDirectoryUrl();
            //刷新invoker 
            refreshInvoker(urls);
        }
    
    
        private void overrideDirectoryUrl() {
            // merge override parameters
            this.overrideDirectoryUrl = directoryUrl;
            List<Configurator> localConfigurators = this.configurators; // local reference
            doOverrideUrl(localConfigurators);
    
            // CONSUMER_CONFIGURATION_LISTENER 是刚才我们前面说的内部类 
            List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference
            doOverrideUrl(localAppDynamicConfigurators);
            if (serviceConfigurationListener != null) {
                List<Configurator> localDynamicConfigurators = serviceConfigurationListener.getConfigurators(); // local reference
                doOverrideUrl(localDynamicConfigurators);
            }
        }
    
        private void doOverrideUrl(List<Configurator> configurators) {
            if (CollectionUtils.isNotEmpty(configurators)) {
                for (Configurator configurator : configurators) {
                    this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
                }
            }
        }
    
    
    

    总结

    文章分析了服务目录的源码。

    参考资料

    • http://dubbo.apache.org/zh-cn/docs/source_code_guide/directory.html
  • 相关阅读:
    2、MapStruct 深入理解
    1、MapStruct的应用
    Spring中的注解
    java中的三个内置注解
    依赖注入集合属性
    List Set Map的特点
    为类类型的属性依赖注入值
    java常用专业术语
    Bean的作用域范围
    Bean的生命周期
  • 原文地址:https://www.cnblogs.com/Benjious/p/12698695.html
Copyright © 2011-2022 走看看