zoukankan      html  css  js  c++  java
  • Dubbo源码分析(七)服务目录

     

    前言

    在上一章节的内容中,我们分析了服务引用的具体流程。在大多数情况下,为避免单点故障,我们的应用会部署在多台服务器上。对于我们的Dubbo而言,就会出现多个服务提供者。而且这些服务也并非是一成不变的,那么就有这样一个问题:
    有新的服务提供者加入或者禁用、修改已有的服务提供者,那么服务消费者怎么及时感知它们的变化呢?

    一、服务目录

    或许你还有印象 ,在服务引用的时候,我们曾经有用到它。这个就是服务目录。
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);

    那么,什么是服务目录呢?

    简单来说,服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。

    服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。多个服务提供者,就构成了服务目录中的一个Invoker 集合。

    假设我们有三个服务提供者,那么服务目录,RegistryDirectory对象中保存的Invoker如下:

     

    我们再看下这个类的继承关系:

     

    我们看到,RegistryDirectory实现了Directory接口和NotifyListener接口。那么,我们重点关注两个实现。

    1、获取Invocation集合

    上面我们看到,RegistryDirectory会保存服务提供者Invocation的集合。那么,就得有获取的方法。获取的方法很简单,就是从本地缓存中查询即可。

    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
        public List<Invoker<T>> doList(Invocation invocation) {
            List<Invoker<T>> invokers = null;
            
            //方法名和List<Invoker<T>>的映射
            Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
            if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
                //获取请求的方法名,比如insertInfoUser
                String methodName = RpcUtils.getMethodName(invocation);
                //参数列表
                Object[] args = RpcUtils.getArguments(invocation);
                if (args != null && args.length > 0 && args[0] != null
                        && (args[0] instanceof String || args[0].getClass().isEnum())) {
                    invokers = localMethodInvokerMap.get(methodName + "." + args[0]);
                }
                if (invokers == null) {
                     // 通过方法名获取 Invoker 列表
                    invokers = localMethodInvokerMap.get(methodName);
                }
                if (invokers == null) {
                    // 通过星号 * 获取 Invoker 列表
                    invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
                }
                if (invokers == null) {
                    Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                    if (iterator.hasNext()) {
                        invokers = iterator.next();
                    }
                }
            }
            return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
        }
    }
    

    2、动态更新

    服务目录并非是一成不变的,如果有的新的服务提供者加入,或者剔除已有的服务提供者,那么服务目录需要及时更新信息。所以它实现了NotifyListener接口,用于刷新Invoker集合。

    • 触发点

    既然是通知,首先我们就要弄清楚它是在哪里触发的。以zookeeper为例,在服务引用的时候,它会监听服务提供者数据节点的数据变化。

    public void childChanged(String parentPath, List<String> currentChilds) {
        ZookeeperRegistry.this.notify(url, listener, 
                toUrlsWithEmpty(url, parentPath, currentChilds));
    }
    

    如上代码,toUrlsWithEmpty方法会将当前最新的子节点数据转换成List<URL>对象,然后调用ZookeeperRegistry.this.notify,此方法最终将调用到服务目录中的刷新方法。

    • 刷新 Invoker 列表

    refreshInvoker 方法是保证 RegistryDirectory 随注册中心变化而变化的关键所在。

    private void refreshInvoker(List<URL> invokerUrls) {
        
        //如果invokerUrls中只要一个元素,且协议头为empty,就销毁所有的服务
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && "empty".equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; 
            this.methodInvokerMap = null; 
            //销毁所有服务引用
            destroyAllInvokers(); 
        } else {
            this.forbidden = false; 
            //原有的urlInvokerMap
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; 
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                // 添加缓存 url 到 invokerUrls 中
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                // 缓存 invokerUrls
                this.cachedInvokerUrls.addAll(invokerUrls);
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            //将invokerUrls转换为Invoker
            //这里会根据url中的协议名称调用对应的Protocol实现
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
            //将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); 
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("......");
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : 
                                                        newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                //销毁无用的服务引用Invoker
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }
    

    如上代码,我们分为这样几个步骤来理解。

    • 从注册中心中,获取当前最新的invokerUrls
    • 调用toInvokers(invokerUrls)方法。此方法根据URL中的协议名称,调用对应的Protocol实现,来引用服务。比如DubboProtocol.refer ,它返回已经构建好的Invoker对象集合。
    • 调用toMethodInvokers(newUrlInvokerMap)方法,根据最新的InvokerMap,重置方法名和Invoker对象的映射关系。
    • 更新methodInvokerMap缓存。

    我们刚刚看到,上面获取Invocation集合的时候,就是从methodInvokerMap对象中获取数据。那么在这里改变这个对象的属性,就相当于更新了服务目录的Invoker信息。

    • 销毁

    在更新完服务目录Invoker之后,还要销毁无用的服务引用Invoker。

    private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, 
                        Map<String, Invoker<T>> newUrlInvokerMap) {
                        
        //如果新的Invoker为空,则销毁全部
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            destroyAllInvokers();
            return;
        }
        
        List<String> deleted = null;
        if (oldUrlInvokerMap != null) {
            Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
            //遍历旧的Invoker
            for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
                //判断新的newInvokers是否包含旧的Invoker对象
                if (!newInvokers.contains(entry.getValue())) {
                    if (deleted == null) {
                        deleted = new ArrayList<String>();
                    }
                    // 若不包含,则将旧的 Invoker 对应的 url 存入 deleted 列表中
                    deleted.add(entry.getKey());
                }
            }
        }
        if (deleted != null) {
            for (String url : deleted) {
                if (url != null) {
                    // 从 oldUrlInvokerMap 中移除 url 对应的 Invoker
                    Invoker<T> invoker = oldUrlInvokerMap.remove(url);
                    if (invoker != null) {
                        try {
                            //调用销毁方法
                            invoker.destroy();
                            if (logger.isDebugEnabled()) {
                                logger.debug("destory invoker[" + invoker.getUrl() + "] success. ");
                            }
                        } 
                    }
                }
            }
        }
    }
    

    以上代码看起来比较长,其实也很简单。就是通过对比两个InvokerMap,如果新的InvokerMap中不包含旧的InvokerMap中的节点,那么这个Invoker就是要被销毁的。

    我们在上一章节分析服务引用的时候,我们提到了可以对服务引用进行监听。那么这里,就是触发服务销毁监听方法的地方。

    public class ListenerInvokerWrapper<T> implements Invoker<T> {
    
        public void destroy() {
            try {
                invoker.destroy();
            } finally { 
                //自定义监听器
                if (listeners != null && !listeners.isEmpty()) {
                    for (InvokerListener listener : listeners) {
                        if (listener != null) {
                            try {
                                //调用监听器方法
                                listener.destroyed(invoker);
                            } catch (Throwable t) {
                                logger.error(t.getMessage(), t);
                            }
                        }
                    }
                }
            }
        }
    }
    

    服务目录是集群容错和负载均衡机制的基础部分,为什么这样说呢?

    当有多个服务提供者的时候:

    我们怎么选取其中的一个服务去调用,这是负载均衡机制
    当调用服务失败后,我们怎么处理当前的请求?抛出异常亦或是重试?这是集群容错机制

    有了服务目录,我们才能获取所有的服务提供者列表,并感知注册中心的数据变化,及时更新目录中的Invoker对象信息。

  • 相关阅读:
    mysql 的安装
    nginx的安装
    修改网站默认目录
    配置yum仓库 安装httpd服务
    安装 VMware Tools
    phpstrom + xdebug 断点调试
    公网IP访问服务器
    mysql in操作和find_in_set函数
    网页授权有时候获取不到openid 的坑
    解决Required Integer parameter 'id' is not present的一种方法
  • 原文地址:https://www.cnblogs.com/shoshana-kong/p/14701720.html
Copyright © 2011-2022 走看看