zoukankan      html  css  js  c++  java
  • Dubbo(四):服务路由的实现

      上一篇中,我们介绍了dubbo的负载均衡实现,见识了几种常用的负载均衡算法。就单个功能而言,似乎dubbo并没有太多的突出之处。事实上,一个成功的产品不必每个地方都要打破常规。更重要的是其全局优化的架构设计,以及如何使用现有的优秀解决方案为己服务。

      本篇将介绍另一种集群环境中的高可用实现:路由服务的实现。它将从另一个角度补充dubbo的集群功能完整性。

    1. 路由出现的时机?

      服务路由是什么?服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消费者可调用哪些服务提供者。

      服务路由是什么派上用场的呢?实际上,它是在进行消费都调用提供者的第一步操作。集群的几个策略的先后为: 服务路由 -> 负载均衡 -> 集群容错(重试);

      其调用入口框架是在 org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker 中的:

        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyInvokers = invokers;
            checkInvokers(copyInvokers, invocation);
            String methodName = RpcUtils.getMethodName(invocation);
            int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    // 服务路由,入口,由父类中调用
                    copyInvokers = list(invocation);
                    // check again
                    checkInvokers(copyInvokers, invocation);
                }
                // 负载均衡入口
                Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + methodName
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyInvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    // 集群容错,进行重试调用
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le.getCode(), "Failed to invoke the method "
                    + methodName + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyInvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + le.getMessage(), le.getCause() != null ? le.getCause() : le);
        }
        // org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#list
        protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
            // 直接调用对应的路径服务的 list() 方法进行路由。
            return directory.list(invocation);
        }
        // org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list
        @Override
        public List<Invoker<T>> list(Invocation invocation) throws RpcException {
            if (destroyed) {
                throw new RpcException("Directory already destroyed .url: " + getUrl());
            }
    
            return doList(invocation);
        }
        // org.apache.dubbo.registry.integration.RegistryDirectory#doList
        @Override
        public List<Invoker<T>> doList(Invocation invocation) {
            if (forbidden) {
                // 1. No service provider 2. Service providers are disabled
                throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                        getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                        NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                        ", please check status of providers(disabled, not registered or in blacklist).");
            }
    
            if (multiGroup) {
                return this.invokers == null ? Collections.emptyList() : this.invokers;
            }
    
            List<Invoker<T>> invokers = null;
            try {
                // Get invokers from cache, only runtime routers will be executed.
                invokers = routerChain.route(getConsumerUrl(), invocation);
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
    
            return invokers == null ? Collections.emptyList() : invokers;
        }
        
        // org.apache.dubbo.rpc.cluster.RouterChain#route
        public List<Invoker<T>> route(URL url, Invocation invocation) {
            List<Invoker<T>> finalInvokers = invokers;
            // 根据注册的 routers 依次调用,过滤 finalInvokers 之后返回
            for (Router router : routers) {
                finalInvokers = router.route(finalInvokers, url, invocation);
            }
            return finalInvokers;
        }

    2. dubbo提供了哪些路由策略?

      Dubbo 目前提供了三种服务路由实现,分别为条件路由 ConditionRouter、脚本路由 ScriptRouter 和标签路由 TagRouter。

      router 的创建时机:每次url发生变更后(如后台修改),都会触发一次路由信息重建。

        // org.apache.dubbo.registry.integration.RegistryDirectory#notify
        @Override
        public synchronized void notify(List<URL> urls) {
            Map<String, List<URL>> categoryUrls = urls.stream()
                    .filter(Objects::nonNull)
                    .filter(this::isValidCategory)
                    .filter(this::isNotCompatibleFor26x)
                    .collect(Collectors.groupingBy(this::judgeCategory));
    
            List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
            this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
    
            List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
            // 从url中取出相应的路由服务类,添加 routerChain 中,备用
            toRouters(routerURLs).ifPresent(this::addRouters);
    
            // providers
            List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
            /**
             * 3.x added for extend URL address
             */
            ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
            List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
            if (supportedListeners != null && !supportedListeners.isEmpty()) {
                for (AddressListener addressListener : supportedListeners) {
                    providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
                }
            }
            refreshOverrideAndInvoker(providerURLs);
        }
        // org.apache.dubbo.registry.integration.RegistryDirectory#toRouters
        /**
         * @param urls
         * @return null : no routers ,do nothing
         * else :routers list
         */
        private Optional<List<Router>> toRouters(List<URL> urls) {
            if (urls == null || urls.isEmpty()) {
                return Optional.empty();
            }
    
            List<Router> routers = new ArrayList<>();
            for (URL url : urls) {
                if (EMPTY_PROTOCOL.equals(url.getProtocol())) {
                    continue;
                }
                String routerType = url.getParameter(ROUTER_KEY);
                if (routerType != null && routerType.length() > 0) {
                    url = url.setProtocol(routerType);
                }
                try {
                    // 根据router工厂类进行创建router, 该工厂类使用 SPI 机制进行生成,实现 RouterFactory
                    // file=org.apache.dubbo.rpc.cluster.router.file.FileRouterFactory
                    // script=org.apache.dubbo.rpc.cluster.router.script.ScriptRouterFactory
                    // condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory
                    // service=org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouterFactory
                    // app=org.apache.dubbo.rpc.cluster.router.condition.config.AppRouterFactory
                    // tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
                    // mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory
                    Router router = ROUTER_FACTORY.getRouter(url);
                    if (!routers.contains(router)) {
                        routers.add(router);
                    }
                } catch (Throwable t) {
                    logger.error("convert router url to router error, url: " + url, t);
                }
            }
    
            return Optional.of(routers);
        }

      所以,整体上整个router的创建,依赖于url中的router参数,用该参数找到对应的router工厂类,然后调用其 getRouter()方法生成具体的router. 我们简单看看router的工厂类一般是什么样的?

    2.1. 路由工厂类的构建

      

    /**
     * Application level router factory
     */
    @Activate(order = 200)
    public class AppRouterFactory implements RouterFactory {
        public static final String NAME = "app";
    
        private volatile Router router;
    
        @Override
        public Router getRouter(URL url) {
            // 一个工厂类中,只有一个单例的router
            if (router != null) {
                return router;
            }
            // 双重锁 懒加载
            synchronized (this) {
                if (router == null) {
                    router = createRouter(url);
                }
            }
            return router;
        }
    
        private Router createRouter(URL url) {
            
            return new AppRouter(url);
        }
    }
    
    // 可缓存路由
    /**
     * Service level router factory
     */
    @Activate(order = 300)
    public class ServiceRouterFactory extends CacheableRouterFactory {
    
        public static final String NAME = "service";
    
        @Override
        protected Router createRouter(URL url) {
            return new ServiceRouter(url);
        }
    
    }
    
    // 条件路径工厂类
    public class ConditionRouterFactory implements RouterFactory {
    
        public static final String NAME = "condition";
    
        @Override
        public Router getRouter(URL url) {
            // 直接new对象返回
            return new ConditionRouter(url);
        }
    
    }
    
    // 文件路由工厂类,事实上它并不一个单纯的路由工厂类,它需要依赖于别的路由工厂
    public class FileRouterFactory implements RouterFactory {
    
        public static final String NAME = "file";
    
        private RouterFactory routerFactory;
        // 将别的路由工厂注入进来
        public void setRouterFactory(RouterFactory routerFactory) {
            this.routerFactory = routerFactory;
        }
    
        @Override
        public Router getRouter(URL url) {
            try {
                // Transform File URL into Script Route URL, and Load
                // file:///d:/path/to/route.js?router=script ==> script:///d:/path/to/route.js?type=js&rule=<file-content>
                String protocol = url.getParameter(ROUTER_KEY, ScriptRouterFactory.NAME); // Replace original protocol (maybe 'file') with 'script'
                String type = null; // Use file suffix to config script type, e.g., js, groovy ...
                String path = url.getPath();
                if (path != null) {
                    int i = path.lastIndexOf('.');
                    if (i > 0) {
                        type = path.substring(i + 1);
                    }
                }
                String rule = IOUtils.read(new FileReader(new File(url.getAbsolutePath())));
    
                // FIXME: this code looks useless
                boolean runtime = url.getParameter(RUNTIME_KEY, false);
                URL script = URLBuilder.from(url)
                        .setProtocol(protocol)
                        .addParameter(TYPE_KEY, type)
                        .addParameter(RUNTIME_KEY, runtime)
                        .addParameterAndEncoded(RULE_KEY, rule)
                        .build();
                // 将重新组装后的url,传递委托给注入的路由工厂进行处理
                return routerFactory.getRouter(script);
            } catch (IOException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    
    }
        
    @Activate
    public class MockRouterFactory implements RouterFactory {
        public static final String NAME = "mock";
    
        @Override
        public Router getRouter(URL url) {
            return new MockInvokersSelector();
        }
    
    }
    // 脚本路由工厂
    /**
     * ScriptRouterFactory
     * <p>
     * Example URLS used by Script Router Factory:
     * <ol>
     * <li> script://registryAddress?type=js&rule=xxxx
     * <li> script:///path/to/routerfile.js?type=js&rule=xxxx
     * <li> script://D:path	o
    outerfile.js?type=js&rule=xxxx
     * <li> script://C:/path/to/routerfile.js?type=js&rule=xxxx
     * </ol>
     * The host value in URL points out the address of the source content of the Script Router,Registry、File etc
     *
     */
    public class ScriptRouterFactory implements RouterFactory {
    
        public static final String NAME = "script";
    
        @Override
        public Router getRouter(URL url) {
            // 直接new对象返回
            return new ScriptRouter(url);
        }
    
    }
    
    // 标签路由工厂,可缓存路由(使用一个ConcurrentHashMap集合容器进行保存已创建的router)
    @Activate(order = 100)
    public class TagRouterFactory extends CacheableRouterFactory {
    
        public static final String NAME = "tag";
        // getRouter() 由 父类统一进行框架搭建,子类只需实现 createRouter() 即可
        @Override
        protected Router createRouter(URL url) {
            return new TagRouter(url);
        }
    }
    
    /**
     * If you want to provide a router implementation based on design of v2.7.0, please extend from this abstract class.
     * For 2.6.x style router, please implement and use RouterFactory directly.
     */
    public abstract class CacheableRouterFactory implements RouterFactory {
        private ConcurrentMap<String, Router> routerMap = new ConcurrentHashMap<>();
    
        @Override
        public Router getRouter(URL url) {
            return routerMap.computeIfAbsent(url.getServiceKey(), k -> createRouter(url));
        }
    
        protected abstract Router createRouter(URL url);
    }

      可以看出这些个工厂类,基本都是使用new的方法就返回了对应的路由实例类。那么是否有必要都在这些类外面包一个工厂类进行创建呢?直接创建不好吗?事实上,这只是个一种工厂模式的最佳实践,是为了更好的隐藏创建逻辑。

    2.2. 条件路由 ConditionRouter 详解

      路由功能的实现,主要分为规则解析和规则应用两个部分!

        // 构造方法,主要是解析一些参数
        public ConditionRouter(URL url) {
            this.url = url;
            // priority=1
            this.priority = url.getParameter(PRIORITY_KEY, 0);
            // force=false
            this.force = url.getParameter(FORCE_KEY, false);
            // enabled=true
            this.enabled = url.getParameter(ENABLED_KEY, true);
            // rule=xxx
            // init 方法中详细解析路由规则
            init(url.getParameterAndDecoded(RULE_KEY));
        }
        // 解析条件规则 host = 10.20.153.10 => host = 10.20.153.11
        public void init(String rule) {
            try {
                if (rule == null || rule.trim().length() == 0) {
                    throw new IllegalArgumentException("Illegal route rule!");
                }
                // 规则如: host = 10.20.153.10 => host = 10.20.153.11
                rule = rule.replace("consumer.", "").replace("provider.", "");
                int i = rule.indexOf("=>");
                // 如果没有=>, 则全部路由到 该规则指定的host中
                String whenRule = i < 0 ? null : rule.substring(0, i).trim();
                String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
                Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
                Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
                // NOTE: It should be determined on the business level whether the `When condition` can be empty or not.
                this.whenCondition = when;
                this.thenCondition = then;
            } catch (ParseException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        // 解析条件规则键值对
        private static Map<String, MatchPair> parseRule(String rule)
                throws ParseException {
            Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
            if (StringUtils.isBlank(rule)) {
                return condition;
            }
            // Key-Value pair, stores both match and mismatch conditions
            MatchPair pair = null;
            // Multiple values
            Set<String> values = null;
            // ROUTE_PATTERN = Pattern.compile("([&!=,]*)\s*([^&!=,\s]+)");
            final Matcher matcher = ROUTE_PATTERN.matcher(rule);
            while (matcher.find()) { // Try to match one by one
                String separator = matcher.group(1);
                String content = matcher.group(2);
                // Start part of the condition expression.
                if (StringUtils.isEmpty(separator)) {
                    pair = new MatchPair();
                    condition.put(content, pair);
                }
                // The KV part of the condition expression
                // &host=xxx
                else if ("&".equals(separator)) {
                    if (condition.get(content) == null) {
                        pair = new MatchPair();
                        condition.put(content, pair);
                    } else {
                        pair = condition.get(content);
                    }
                }
                // The Value in the KV part.
                else if ("=".equals(separator)) {
                    if (pair == null) {
                        throw new ParseException("Illegal route rule ""
                                + rule + "", The error char '" + separator
                                + "' at index " + matcher.start() + " before ""
                                + content + "".", matcher.start());
                    }
    
                    values = pair.matches;
                    values.add(content);
                }
                // The Value in the KV part.
                else if ("!=".equals(separator)) {
                    if (pair == null) {
                        throw new ParseException("Illegal route rule ""
                                + rule + "", The error char '" + separator
                                + "' at index " + matcher.start() + " before ""
                                + content + "".", matcher.start());
                    }
    
                    values = pair.mismatches;
                    values.add(content);
                }
                // The Value in the KV part, if Value have more than one items.
                else if (",".equals(separator)) { // Should be separated by ','
                    if (values == null || values.isEmpty()) {
                        throw new ParseException("Illegal route rule ""
                                + rule + "", The error char '" + separator
                                + "' at index " + matcher.start() + " before ""
                                + content + "".", matcher.start());
                    }
                    values.add(content);
                } else {
                    throw new ParseException("Illegal route rule "" + rule
                            + "", The error char '" + separator + "' at index "
                            + matcher.start() + " before "" + content + "".", matcher.start());
                }
            }
            return condition;
        }

      2. 接下来是如何使用这些配置好的规则

      路由服务由routerChain进行统一调用:

        // org.apache.dubbo.rpc.cluster.RouterChain#route
        /**
         *
         * @param url
         * @param invocation
         * @return
         */
        public List<Invoker<T>> route(URL url, Invocation invocation) {
            List<Invoker<T>> finalInvokers = invokers;
            for (Router router : routers) {
                finalInvokers = router.route(finalInvokers, url, invocation);
            }
            return finalInvokers;
        }
        // 以下是条件路由的route()实现:
        @Override
        public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
                throws RpcException {
            if (!enabled) {
                return invokers;
            }
    
            if (CollectionUtils.isEmpty(invokers)) {
                return invokers;
            }
            try {
                // 如果不符合路由条件,直接返回所有原样 invokers 即可 
                if (!matchWhen(url, invocation)) {
                    return invokers;
                }
                List<Invoker<T>> result = new ArrayList<Invoker<T>>();
                if (thenCondition == null) {
                    logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
                    return result;
                }
                for (Invoker<T> invoker : invokers) {
                    // 否则依次匹配每个候选 invokers, 符合条件的才返回
                    // 具体匹配实现如下:
                    if (matchThen(invoker.getUrl(), url)) {
                        result.add(invoker);
                    }
                }
                if (!result.isEmpty()) {
                    return result;
                } else if (force) {
                    logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(RULE_KEY));
                    return result;
                }
            } catch (Throwable t) {
                logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
            }
            return invokers;
        }
        // 路由源地址检测,检查要调用的服务地址是否命中了条件路由的规则
        boolean matchWhen(URL url, Invocation invocation) {
            // whenCondition 为空,代表拦截所有路径
            return CollectionUtils.isEmptyMap(whenCondition) || matchCondition(whenCondition, url, null, invocation);
        }
        // 路由目的地址匹配检测,与路由源地址匹配模式相同,仅将 whenCondition 换为 thenCondition
        private boolean matchThen(URL url, URL param) {
            return CollectionUtils.isNotEmptyMap(thenCondition) && matchCondition(thenCondition, url, param, null);
        }
        
        private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param, Invocation invocation) {
            Map<String, String> sample = url.toMap();
            boolean result = false;
            for (Map.Entry<String, MatchPair> matchPair : condition.entrySet()) {
                String key = matchPair.getKey();
                String sampleValue;
                //get real invoked method name from invocation
                if (invocation != null && (METHOD_KEY.equals(key) || METHODS_KEY.equals(key))) {
                    sampleValue = invocation.getMethodName();
                } else if (ADDRESS_KEY.equals(key)) {
                    sampleValue = url.getAddress();
                } else if (HOST_KEY.equals(key)) {
                    sampleValue = url.getHost();
                } else {
                    sampleValue = sample.get(key);
                    // 为什么要获取两次 sample.get(key); ?
                    if (sampleValue == null) {
                        sampleValue = sample.get(key);
                    }
                }
                if (sampleValue != null) {
                    // 依次调用 MatchPair.isMatch() 方法,进行验证
                    // 只要有一次验证不通过,则当前 invocation 即不符合路由条件了
                    if (!matchPair.getValue().isMatch(sampleValue, param)) {
                        return false;
                    } else {
                        result = true;
                    }
                } else {
                    //not pass the condition
                    if (!matchPair.getValue().matches.isEmpty()) {
                        return false;
                    } else {
                        result = true;
                    }
                }
            }
            return result;
        }
            // 在 MatchPair 中实现具体的判定是否当前地址是否匹配路由信息
            private boolean isMatch(String value, URL param) {
                // 只有相等匹配情况,直接取 matches 进行校验即可
                if (!matches.isEmpty() && mismatches.isEmpty()) {
                    for (String match : matches) {
                        // 简单正则匹配检测, 主要处理 * 规则
                        if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                            return true;
                        }
                    }
                    return false;
                }
                // 只有不相等匹配情况, 直接取出 mismatches 校验,反向输出即可
                if (!mismatches.isEmpty() && matches.isEmpty()) {
                    for (String mismatch : mismatches) {
                        if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                            return false;
                        }
                    }
                    return true;
                }
                // 相等和不相等两种条件都存在时,优先使用 mismatches 进行配置,然后使用 matches 匹配,即 mismatches 优先级高于 matches
                if (!matches.isEmpty() && !mismatches.isEmpty()) {
                    //when both mismatches and matches contain the same value, then using mismatches first
                    for (String mismatch : mismatches) {
                        if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                            return false;
                        }
                    }
                    for (String match : matches) {
                        if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                            return true;
                        }
                    }
                    return false;
                }
                return false;
            }
        }

    2.3. 脚本路由的实现 ScriptRouter

      构造方法中主要解析一些必要参数,以及根据类型获取操作系统的脚本解析引擎,非常重要。

        public ScriptRouter(URL url) {
            this.url = url;
            this.priority = url.getParameter(PRIORITY_KEY, SCRIPT_ROUTER_DEFAULT_PRIORITY);
            // 获取解析引擎,根据 type=javascript 等返回
            engine = getEngine(url);
            // 获取 rule=xxxx, 规则
            rule = getRule(url);
            try {
                // 有 GroovyScriptEngineImpl, NashornScriptEngine
                Compilable compilable = (Compilable) engine;
                function = compilable.compile(rule);
            } catch (ScriptException e) {
                logger.error("route error, rule has been ignored. rule: " + rule +
                        ", url: " + RpcContext.getContext().getUrl(), e);
            }
        }

      而实际路由的方法,也是直接调用脚本引擎进行脚本解析而得:

        // org.apache.dubbo.rpc.cluster.router.script.ScriptRouter#route
        @Override
        public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
            try {
                // 将参数信息封装为 Bindings, 统一传入脚本引擎
                Bindings bindings = createBindings(invokers, invocation);
                if (function == null) {
                    return invokers;
                }
                // 调用脚本引擎的 function.eval() 方法,即将参数传入规则脚本中,得到invokers
                // 并通过 getRoutedInvokers 将结果转换成 List<Invoker<T>> 类型返回
                return getRoutedInvokers(function.eval(bindings));
            } catch (ScriptException e) {
                logger.error("route error, rule has been ignored. rule: " + rule + ", method:" +
                        invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
                return invokers;
            }
        }
        /**
         * create bindings for script engine
         */
        private <T> Bindings createBindings(List<Invoker<T>> invokers, Invocation invocation) {
            Bindings bindings = engine.createBindings();
            // create a new List of invokers
            bindings.put("invokers", new ArrayList<>(invokers));
            bindings.put("invocation", invocation);
            bindings.put("context", RpcContext.getContext());
            return bindings;
        }

      上面的实现看起来还是有点抽象。我们拿出一个dubbo中的单测试样例,看一下脚本路由的使用方式:

        
        @Test
        public void testRoutePickInvokers() {
            // rule 写法,即是 javascript 的语法,不过它需要调用一些java的方法,以便识别java中传递过来的参数以及返回结果的对接
            // 该js代码脱离了java引擎应该是不可被解析的
            String rule = "var result = new java.util.ArrayList(invokers.size());" +
                    "for (i=0;i<invokers.size(); i++){ " +
                    // 获取 isAvailable() 属性进行判断是否可将该invoker列入候选列表
                    "if (invokers.get(i).isAvailable()) {" +
                    "result.add(invokers.get(i)) ;" +
                    "}" +
                    "} ; " +
                    "return result;";
            // 定义一个 route函数,并立即调用它,从而达到返回脚本结果的效果
            String script = "function route(invokers,invocation,context){" + rule + "} route(invokers,invocation,context)";
            Router router = new ScriptRouterFactory().getRouter(getRouteUrl(script));
    
            List<Invoker<String>> invokers = new ArrayList<Invoker<String>>();
            // 模型invoker 不可用
            Invoker<String> invoker1 = new MockInvoker<String>(false);
            Invoker<String> invoker2 = new MockInvoker<String>(true);
            Invoker<String> invoker3 = new MockInvoker<String>(true);
            invokers.add(invoker1);
            invokers.add(invoker2);
            invokers.add(invoker3);
            List<Invoker<String>> filteredInvokers = router.route(invokers, invokers.get(0).getUrl(), new RpcInvocation());
            Assertions.assertEquals(2, filteredInvokers.size());
            Assertions.assertEquals(invoker2, filteredInvokers.get(0));
            Assertions.assertEquals(invoker3, filteredInvokers.get(1));
        }

      所以,其实脚本路由可以写得非常灵活多变,但是维护成本有点高,它不像条件路由那样简洁明了。需要进行反复自测试后才可配置在正式环境中。

    2.4. 标签路由 TagRouter

      大概就是根据tag=xxx 选择相应的路由地址。该router还未正式发布,不过可以看一下其大概实现:

        @Override
        public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
            if (CollectionUtils.isEmpty(invokers)) {
                return invokers;
            }
    
            // since the rule can be changed by config center, we should copy one to use.
            final TagRouterRule tagRouterRuleCopy = tagRouterRule;
            if (tagRouterRuleCopy == null || !tagRouterRuleCopy.isValid() || !tagRouterRuleCopy.isEnabled()) {
                return filterUsingStaticTag(invokers, url, invocation);
            }
    
            List<Invoker<T>> result = invokers;
            // 从url中取出 dubbo.tag=xxx 值 
            String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
                    invocation.getAttachment(TAG_KEY);
    
            // if we are requesting for a Provider with a specific tag
            if (StringUtils.isNotEmpty(tag)) {
                List<String> addresses = tagRouterRuleCopy.getTagnameToAddresses().get(tag);
                // filter by dynamic tag group first
                if (CollectionUtils.isNotEmpty(addresses)) {
                    result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
                    // if result is not null OR it's null but force=true, return result directly
                    if (CollectionUtils.isNotEmpty(result) || tagRouterRuleCopy.isForce()) {
                        return result;
                    }
                } else {
                    // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
                    // dynamic tag group but force=false. check static tag
                    result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY)));
                }
                // If there's no tagged providers that can match the current tagged request. force.tag is set by default
                // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
                if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
                    return result;
                }
                // FAILOVER: return all Providers without any tags.
                else {
                    List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(),
                            tagRouterRuleCopy.getAddresses()));
                    return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
                }
            } else {
                // List<String> addresses = tagRouterRule.filter(providerApp);
                // return all addresses in dynamic tag group.
                List<String> addresses = tagRouterRuleCopy.getAddresses();
                if (CollectionUtils.isNotEmpty(addresses)) {
                    result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses));
                    // 1. all addresses are in dynamic tag group, return empty list.
                    if (CollectionUtils.isEmpty(result)) {
                        return result;
                    }
                    // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the
                    // static tag group.
                }
                return filterInvoker(result, invoker -> {
                    String localTag = invoker.getUrl().getParameter(TAG_KEY);
                    return StringUtils.isEmpty(localTag) || !tagRouterRuleCopy.getTagNames().contains(localTag);
                });
            }
        }

      其配置格式大致如下:

            String serviceStr = "---
    " +
                    "force: false
    " +
                    "runtime: true
    " +
                    "enabled: false
    " +
                    "priority: 1
    " +
                    "key: demo-provider
    " +
                    "tags:
    " +
                    "  - name: tag1
    " +
                    "    addresses: ["30.5.120.37:20881"]
    " +
                    "  - name: tag2
    " +
                    "    addresses: ["30.5.120.37:20880"]
    " +
                    "...";

    2.5. AppRouter + ServiceRouter

      这两个路由服务实际上不是独立的路由实现类,它是包装了 ConditionRouter 的实现,来完成特殊的业务逻辑。

        // org.apache.dubbo.rpc.cluster.router.condition.config.AppRouter#AppRouter
        public AppRouter(URL url) {
            // 将 application=xxx   作为路由key
            super(url, url.getParameter(CommonConstants.APPLICATION_KEY));
            this.priority = APP_ROUTER_DEFAULT_PRIORITY;
        }
        // org.apache.dubbo.rpc.cluster.router.condition.config.ListenableRouter#ListenableRouter
        public ListenableRouter(URL url, String ruleKey) {
            super(url);
            this.force = false;
            // 初始化路由服务
            this.init(ruleKey);
        }
        private synchronized void init(String ruleKey) {
            if (StringUtils.isEmpty(ruleKey)) {
                return;
            }
            // +   .condition-router
            String routerKey = ruleKey + RULE_SUFFIX;
            ruleRepository.addListener(routerKey, this);
            String rule = ruleRepository.getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP);
            if (StringUtils.isNotEmpty(rule)) {
                this.process(new ConfigChangedEvent(routerKey, DynamicConfiguration.DEFAULT_GROUP, rule));
            }
        }
        
        @Override
        public synchronized void process(ConfigChangedEvent event) {
            if (logger.isInfoEnabled()) {
                logger.info("Notification of condition rule, change type is: " + event.getChangeType() +
                        ", raw rule is:
     " + event.getContent());
            }
            if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
                routerRule = null;
                conditionRouters = Collections.emptyList();
            } else {
                try {
                    routerRule = ConditionRuleParser.parse(event.getContent());
                    generateConditions(routerRule);
                } catch (Exception e) {
                    logger.error("Failed to parse the raw condition rule and it will not take effect, please check " +
                            "if the condition rule matches with the template, the raw rule is:
     " + event.getContent(), e);
                }
            }
        }
        // 进行路由服务调用时,仅把功能委托给 conditionRouters 即可
        @Override
        public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
            if (CollectionUtils.isEmpty(invokers) || conditionRouters.size() == 0) {
                return invokers;
            }
    
            // We will check enabled status inside each router.
            for (Router router : conditionRouters) {
                invokers = router.route(invokers, url, invocation);
            }
    
            return invokers;
        }

      ServiceRouter 的实现也大致一样,只是取的 routerKey 不同而已。

        // org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouter#ServiceRouter
        public ServiceRouter(URL url) {
            // 与 AppRouter 的差别在于 routerKey 取值不同
            super(url, DynamicConfiguration.getRuleKey(url));
            this.priority = SERVICE_ROUTER_DEFAULT_PRIORITY;
        }
        // org.apache.dubbo.common.config.configcenter.DynamicConfiguration#getRuleKey
        /**
         * The format is '{interfaceName}:[version]:[group]'
         *
         * @return
         */
        static String getRuleKey(URL url) {
            return url.getColonSeparatedKey();
        }
        // org.apache.dubbo.common.URL#getColonSeparatedKey
        /**
         * The format is "{interface}:[version]:[group]"
         *
         * @return
         */
        public String getColonSeparatedKey() {
            StringBuilder serviceNameBuilder = new StringBuilder();
            serviceNameBuilder.append(this.getServiceInterface());
            append(serviceNameBuilder, VERSION_KEY, false);
            append(serviceNameBuilder, GROUP_KEY, false);
            return serviceNameBuilder.toString();
        }
        

      服务路由的出发点,是为了让用户能够更灵活地配置一些特殊的调用场景,如跨机房调用,或者应用一些异常情况比如某实例不希望再被调用。总之,应用场景总是有的,否则就是在玩自嗨。

      了解其运行原理,让我们更清楚,我们到底在路由什么!

  • 相关阅读:
    『软件介绍』SQLServer2008 基本操作
    PCA的数学原理
    PCA的数学原理
    Oracle数据处理
    UVa 11995
    Unreal Engine 4 C++ 为编辑器中Actor创建自己定义图标
    codecombat之边远地区的森林1-11关及地牢38关代码分享
    初识ecside
    how tomcat works读书笔记 七 日志记录器
    HDU 1754(线段树区间最值)
  • 原文地址:https://www.cnblogs.com/yougewe/p/12814068.html
Copyright © 2011-2022 走看看