zoukankan      html  css  js  c++  java
  • Dubbo集群容错(三)集群

    官方介绍
    为了避免单点故障,现在的应用通常至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多的服务器。这样,在同一环境下的服务提供者数量会大于1。对于服务消费者来说,同一环境下出现了多个服务提供者。这时会出现一个问题,服务消费者需要决定选择哪个服务提供者进行调用。另外服务调用失败时的处理措施也是需要考虑的,是重试呢,还是抛出异常,亦或是只打印异常等。为了处理这些问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。
     
    集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,这样服务消费者就可以专心处理远程调用相关事宜。比如发请求,接受服务提供者返回的数据等。这就是集群的作用。
    总结:集群模块主要是屏蔽处理细节问题
     
    Dubbo 提供了多种集群实现,主要有:
    • Failfast Cluster:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
    • Failsafe Cluster:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
    • Failback Cluster:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
    • Failover Cluster: 失败重试,在调用失败时,会自动切换 Invoker 进行重试
    • Forking Cluster:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。
    • Broadcast Cluster:广播调用所有提供者,逐个调用,任意一台报错则报错 [2]。通常用于通知所有提供者更新缓存或日志等本地资源信息。
     
    Cluster是一个接口,其实现类有如下,并且它还是一个SPI
    @SPI(FailoverCluster.NAME)    // failover
    public interface Cluster {
    
        @Adaptive
        <T> Invoker<T> join(Directory<T> directory) throws RpcException;
    }
    我们来看一下FailoverCluster,这个类代码就这几行,比较简单
    public class FailoverCluster implements Cluster {
        public final static String NAME = "failover";
    
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<T>(directory);
        }
    }
     
    集群工作过程可分为两个阶段:第一个阶段是在服务消费者初始化期间,第二个阶段是在服务消费者进行远程调用时
     
    接下来我们看一下Cluster Invoker,即AbstractClusterInvoker,invoke方法中,负载均衡等操作会在此执行。
        public Result invoke(final Invocation invocation) throws RpcException {
    
            checkWhetherDestroyed();
    
            LoadBalance loadbalance;
            // 列举 Invoker
            List<Invoker<T>> invokers = list(invocation);
            if (invokers != null && invokers.size() > 0) {
                // 加载 LoadBalance
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
            } else {
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            // 调用 doInvoke 进行后续操作
            return doInvoke(invocation, invokers, loadbalance);
        }

    AbstractClusterInvoker 中的 list 方法做的事情很简单,只是简单的调用了 Directory 的 list 方法,没有其他更多的逻辑了。我们将重点放在doInvoke方法上

     
     
    FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。(FailoverClusterInvoker 这个类只有一个doInvoke方法)
    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
    
        public FailoverClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            // 获取重试次数
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            // 循环调用,失败重试
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
                    // 通过调用 list 可得到最新可用的 Invoker 列表                
                    copyinvokers = list(invocation);
                    // 对 copyinvokers 进行判空检查
                    checkInvokers(copyinvokers, invocation);
                }
                // 通过负载均衡选择 Invoker
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                // 添加到 invoker 到 invoked 列表中
                invoked.add(invoker);
                // 设置 invoked 到 RPC 上下文中
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("省略掉了...");
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            // 若重试失败,则抛出异常
            throw new RpcException("异常信息,省略...");
        }
    
    }

    如上,FailoverClusterInvoker 的 doInvoke 方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list 方法列举 Invoker。整个流程大致如此,不是很难理解。下面我们看一下 select 方法的逻辑。

     
     
     
     
     
     
     
     
     
  • 相关阅读:
    学习subprocess模块...
    【排列组合】
    【约瑟夫问题】
    【craps赌博游戏】
    【洗扑克牌(乱数排列)】
    【最大访客数】
    【后序式的运算】
    【中序式转后序式】
    【python基础】之元组 集合 字典
    【费式数列(Fibonacci数列)】
  • 原文地址:https://www.cnblogs.com/caoxb/p/13140359.html
Copyright © 2011-2022 走看看