zoukankan      html  css  js  c++  java
  • dubbo服务引用与集群容错

    服务引用无非就是做了两件事

    • 将spring的schemas标签信息转换bean,然后通过这个bean的信息,连接、订阅zookeeper节点信息创建一个invoker

    • invoker的信息创建一个动态代理对象

    时序图:

    最终返回一个被调用接口的动态代理对象。

    在调用代理对象的方法时,会进入InvokerInvocationHandle类的逻辑。

    跟踪源码的时候,发现消费端调用invoke的时候要调用一连串的Invoker实现类,一直纠结这些Invoker是用来做什么的?

    Invoker的创建应该是入口,也就是从referenceConfig类开始

    然后找到RegistryProtocol.doRefer方法

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
    
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        
        return cluster.join(directory);
    }

    也就是这一行:
    cluster.join(directory);

    在执行join方法的时候,会通过SPI机制找到cluster的扩展实例,默认的时候FailoverCluster
    但是调试发现第一步创建的实例化对象是MockClusterWrapper类而不是FailoverCluster
    查阅资料 dubbo中的mock机制 再结合源码总结如下:
    在dubbo的配置文件  classpath:/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.cluster.Cluster中,failover对应的是FailoverCluster类:
    但是ExtensionLoader在实例化对象时,有个比较特殊的地方,那就是在实例化完成之后,会自动套上当前的ExtensionLoader中的Wrapper类,上面的mock所对应的MockClusterWrapper就是这样的一个Wrapper:也就是实例化出来的FailoverCluster会被套上一层MockClusterWrapper,总结一下就是:
    Cluster$Adaptive -> 定位到内部key为failover的对象 ->FailoverCluster->外部套上MockClusterWrapper
     
    所以时序图是这样的:
     
    官网集群容错介绍图:
     
    根据以上时序图查看源码如下:
    MockClusterInvoker.java
    public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
    
            String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
            if (value.length() == 0 || value.equalsIgnoreCase("false")) {
                //no mock
                //执行到这一行的时候开始进入集群 cluster -> AbstractClusterInvoker
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                if (logger.isWarnEnabled()) {
                    logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
                }
                //force:direct mock
                result = doMockInvoke(invocation, null);
            } else {
                //fail-mock
                try {
                    result = this.invoker.invoke(invocation);
                } catch (RpcException e) {
                    if (e.isBiz()) {
                        throw e;
                    } else {
                        if (logger.isWarnEnabled()) {
                            logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                        }
                        result = doMockInvoke(invocation, e);
                    }
                }
            }
            return result;
        }

      AbstractClusterInvoker.java

    public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
    
            // binding attachments into invocation.
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                ((RpcInvocation) invocation).addAttachments(contextAttachments);
            }
             
             //选择出可用的invoker集合
            List<Invoker<T>> invokers = list(invocation);
            // 初始化负载均衡策略
            LoadBalance loadbalance = initLoadBalance(invokers, invocation);
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance);
        }
    
    
       protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
            // -> AbstractDirectory.java
            return directory.list(invocation);
        }

     AbstractDirectory.java

    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
            if (destroyed) {
                throw new RpcException("Directory already destroyed .url: " + getUrl());
            }
            
    // 模板方法,由子类实现
    // -> RegistryDirectory.java 或者 StaticDirectory.java
    List<Invoker<T>> invokers = doList(invocation); List<Router> localRouters = this.routers; // local reference if (localRouters != null && !localRouters.isEmpty()) { for (Router router : localRouters) { try { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) { //将invokers返回后,下面来到了Router,开始进入路由,现在我们到了序号6,此时到了MockInvokersSelector类, //他是Router接口的实现类,从官网的介绍图中我们也可以看到Router分为Script和Condition两种,翻译过来也就是脚本路由和条件路由 invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } } return invokers; }

        RegistryDirectory.java

    public List<Invoker<T>> doList(Invocation invocation) {
            if (forbidden) {
                // 1. No service provider 2. Service providers are disabled
                throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                        "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
                                + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
            }
            List<Invoker<T>> invokers = null;
            Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
            if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
                String methodName = RpcUtils.getMethodName(invocation);
                Object[] args = RpcUtils.getArguments(invocation);
                if (args != null && args.length > 0 && args[0] != null
                        && (args[0] instanceof String || args[0].getClass().isEnum())) {
                    invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
                }
                if (invokers == null) {
                    invokers = localMethodInvokerMap.get(methodName);
                }
                if (invokers == null) {
                    invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
                }
                if (invokers == null) {
                    Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                    if (iterator.hasNext()) {
                        invokers = iterator.next();
                    }
                }
            }
            return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
        }

    MockInvokersSelector.java

    public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
                                          URL url, final Invocation invocation) throws RpcException {
            if (invocation.getAttachments() == null) {
                return getNormalInvokers(invokers);
            } else {
                String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
                if (value == null) {
                    //拿到能正常执行的invokers,并将其返回
                    return getNormalInvokers(invokers);
                } else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
                    return getMockedInvokers(invokers);
                }
            }
            return invokers;
        }
    
    
        private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
            if (!hasMockProviders(invokers)) {
                return invokers;
            } else {
                List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
                for (Invoker<T> invoker : invokers) {
                    if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
                        sInvokers.add(invoker);
                    }
                }
                return sInvokers;
            }
        }
    上面出现的这两个关键词,其实无非就是做两件事
    在Directory中找出本次集群中的全部invokers
    在Router中,将上一步的全部invokers挑选出能正常执行的invokers
    回到AbstractClusterInvoker.java
     
          ......
            //选择出可用的invoker集合
            List<Invoker<T>> invokers = list(invocation);
             // 初始化负载均衡策略
            LoadBalance loadbalance = initLoadBalance(invokers, invocation);
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance);
    从上面步骤已经挑选出能正常执行的invokers了,但是假如2个做集群,但是这两个都是正常的,到底要执行哪一个呢?
     根据官网的描述
    在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。
    所以这个时候是到了FailoverClusterInvoker类,但是如果你配置的是Failfast Cluster(快速失败),Failsafe Cluster(失败安全),Failback Cluster(失败自动恢复),Forking Cluster(并行调用多个服务器,只要一个成功即返回),Broadcast Cluster(广播调用所有提供者,逐个调用,任意一台报错则报错)他也会到达相应的类
    FailoverClusterInvoker.java
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            String methodName = RpcUtils.getMethodName(invocation);
            int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyinvokers = list(invocation);
                    // check again
                    checkInvokers(copyinvokers, invocation);
                }
                // 通过负载均衡算法选择一个Invoker,然后调用
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + methodName
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyinvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le.getCode(), "Failed to invoke the method "
                    + methodName + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyinvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + le.getMessage(), le.getCause() != null ? le.getCause() : le);
        }
    
        protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            if (invokers == null || invokers.isEmpty()) {
                return null;
            }
            String methodName = invocation == null ? "" : invocation.getMethodName();
    
            boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
            {
                //ignore overloaded method
                if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                    stickyInvoker = null;
                }
                //ignore concurrency problem
                if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                    if (availablecheck && stickyInvoker.isAvailable()) {
                        return stickyInvoker;
                    }
                }
            }
            Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
    
            if (sticky) {
                stickyInvoker = invoker;
            }
            return invoker;
        }
    
        private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            if (invokers == null || invokers.isEmpty())
                return null;
            if (invokers.size() == 1)
                return invokers.get(0);
            Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    
            //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
            if ((selected != null && selected.contains(invoker))
                    || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
                try {
                    Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                    if (rinvoker != null) {
                        invoker = rinvoker;
                    } else {
                        //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                        int index = invokers.indexOf(invoker);
                        try {
                            //Avoid collision
                            invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                        } catch (Exception e) {
                            logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                        }
                    }
                } catch (Throwable t) {
                    logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
                }
            }
            return invoker;
        }
    在这个集群容错的整体架构过程中,dubbo其实也就是三件事
    1.在Directory中找出本次集群中的全部invokers
    2.在Router中,将上一步的全部invokers挑选出能正常执行的invokers
    3.在LoadBalance中,将上一步的能正常的执行invokers中,根据配置的负载均衡策略,挑选出需要执行的invoker
     
  • 相关阅读:
    mysql总结
    spirngmvc整合mybatis实现CRUD
    java lesson09总结
    Java lesson08 Homework
    java Lesson08总结
    【DFS】bzoj2435 [Noi2011]道路修建
    【BFS】bzoj2252 [2010Beijing wc]矩阵距离
    【BFS】bzoj1054 [HAOI2008]移动玩具
    【搜索】bzoj3109 [cqoi2013]新数独
    【搜索】【约数个数定理】[HAOI2007]反素数ant
  • 原文地址:https://www.cnblogs.com/laowz/p/10165713.html
Copyright © 2011-2022 走看看