zoukankan      html  css  js  c++  java
  • Dubbo学习源码总结系列四--集群容错机制

            Dubbo提供了哪些集群容错机制?如何实现的?

            提供了六种集群容错机制,包括Failover(失败自动切换,尝试其他服务器)、Failfast(失败立即抛出异常)、Failsafe(失败忽略异常)、Failback(失败自动恢复,记录日志并定时重试)、Forking(并行调用多个服务,一个成功立即返回)、Broadcast(广播调用所有提供者,任意一个报错则报错);

            下面我们看看基础类的实现:

            1、ClusterInvoker默认实现类:AbstractClusterInvoker,给ClusterInvoker子类提供了实现框架。

            

            主要方法有以下几个:

            invoke()方法,给子类的doInvoke()方法准备输入参数invoker列表和loadbalance实例,实现逻辑为:

            (1)首先判断当前invoker是否被销毁(销毁则直接抛出异常);

            (2)从Directory目录(通常是注册中心返回的服务列表)中得到invoker列表,调用directory.list()方法;

            (3)根据invoker列表中的第一个invoker的Url负载均衡配置信息,动态加载并实例化一个loadbalance子类(如果没有配置则使用缺省loadbalance);

            (4)设置invokerId;

            (5)调用子类的doInvoke()方法执行调用过程。

     1     public Result invoke(final Invocation invocation) throws RpcException {
     2 
     3         checkWhetherDestroyed();
     4 
     5         LoadBalance loadbalance;
     6 
     7         List<Invoker<T>> invokers = list(invocation);
     8         if (invokers != null && invokers.size() > 0) {
     9             loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
    10                     .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    11         } else {
    12             loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    13         }
    14         RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    15         return doInvoke(invocation, invokers, loadbalance);
    16     }

            select()方法在子类的doInvoke()方法中调用,通过负载均衡策略选择一个invoker,实现逻辑为:

            a)首先通过负载均衡策略选择一个invoker,如果这个invoker在之前维护的selected列表中(将使用过的invoker记录在selected本地缓存中),或者这个invoker不可用,则跳转到步骤b重新选择;否则直接返回这个invoker;

            b)重新选择reselect()方法,先从给定的invokers列表中选出不在selected(使用过)列表中的invoker列表reselectInvokers,在reselectInvokers中通过负载均衡策略选出一个invoker返回;如果之前给定的invokes列表都在selected中(都使用过),则筛选出selected列表中所有可用的invoker,再使用负载均衡策略选择出一个invoker返回;这种策略将重复选择的几率降到最低。

     1     /**
     2      * Select a invoker using loadbalance policy.</br>
     3      * a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or, if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
     4      * b)Reslection, the validation rule for reselection: selected > available. This rule guarantees that the selected invoker has the minimum chance to be one in the previously selected list, and also guarantees this invoker is available.
     5      *
     6      * @param loadbalance load balance policy
     7      * @param invocation
     8      * @param invokers invoker candidates
     9      * @param selected  exclude selected invokers or not
    10      * @return
    11      * @throws RpcExceptione
    12      */
    13     protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    14         if (invokers == null || invokers.size() == 0)
    15             return null;
    16         String methodName = invocation == null ? "" : invocation.getMethodName();
    17 
    18         boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
    19         {
    20             //ignore overloaded method
    21             if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
    22                 stickyInvoker = null;
    23             }
    24             //ignore cucurrent problem
    25             if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
    26                 if (availablecheck && stickyInvoker.isAvailable()) {
    27                     return stickyInvoker;
    28                 }
    29             }
    30         }
    31         Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
    32 
    33         if (sticky) {
    34             stickyInvoker = invoker;
    35         }
    36         return invoker;
    37     }

             

     1     private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
     2         if (invokers == null || invokers.size() == 0)
     3             return null;
     4         if (invokers.size() == 1)
     5             return invokers.get(0);
     6         // If we only have two invokers, use round-robin instead.
     7         if (invokers.size() == 2 && selected != null && selected.size() > 0) {
     8             return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
     9         }
    10         Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    11 
    12         //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    13         if ((selected != null && selected.contains(invoker))
    14                 || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
    15             try {
    16                 Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
    17                 if (rinvoker != null) {
    18                     invoker = rinvoker;
    19                 } else {
    20                     //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
    21                     int index = invokers.indexOf(invoker);
    22                     try {
    23                         //Avoid collision
    24                         invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
    25                     } catch (Exception e) {
    26                         logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
    27                     }
    28                 }
    29             } catch (Throwable t) {
    30                 logger.error("clustor relselect fail reason is :" + t.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", t);
    31             }
    32         }
    33         return invoker;
    34     }

            

     1     /**
     2      * Reselect, use invokers not in `selected` first, if all invokers are in `selected`, just pick an available one using loadbalance policy.
     3      *
     4      * @param loadbalance
     5      * @param invocation
     6      * @param invokers
     7      * @param selected
     8      * @return
     9      * @throws RpcException
    10      */
    11     private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
    12                                 List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
    13             throws RpcException {
    14 
    15         //Allocating one in advance, this list is certain to be used.
    16         List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
    17 
    18         //First, try picking a invoker not in `selected`.
    19         if (availablecheck) { // invoker.isAvailable() should be checked
    20             for (Invoker<T> invoker : invokers) {
    21                 if (invoker.isAvailable()) {
    22                     if (selected == null || !selected.contains(invoker)) {
    23                         reselectInvokers.add(invoker);
    24                     }
    25                 }
    26             }
    27             if (reselectInvokers.size() > 0) {
    28                 return loadbalance.select(reselectInvokers, getUrl(), invocation);
    29             }
    30         } else { // do not check invoker.isAvailable()
    31             for (Invoker<T> invoker : invokers) {
    32                 if (selected == null || !selected.contains(invoker)) {
    33                     reselectInvokers.add(invoker);
    34                 }
    35             }
    36             if (reselectInvokers.size() > 0) {
    37                 return loadbalance.select(reselectInvokers, getUrl(), invocation);
    38             }
    39         }
    40         // Just pick an available invoker using loadbalance policy
    41         {
    42             if (selected != null) {
    43                 for (Invoker<T> invoker : selected) {
    44                     if ((invoker.isAvailable()) // available first
    45                             && !reselectInvokers.contains(invoker)) {
    46                         reselectInvokers.add(invoker);
    47                     }
    48                 }
    49             }
    50             if (reselectInvokers.size() > 0) {
    51                 return loadbalance.select(reselectInvokers, getUrl(), invocation);
    52             }
    53         }
    54         return null;
    55     }

            2、Failover Cluster:失败自动切换,当出现失败,重试其它服务器 。

            通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。

            集群配置如下(该配置为缺省配置):

            <dubbo:service cluster="failover" />或<dubbo:reference cluster="failover" />

            重试次数配置有三种方式:

            <dubbo:service retries="2" />

            <dubbo:reference retries="2" />   

            <dubbo:reference>

                    <dubbo:method name="findFoo" retries="2" />

            </dubbo:reference>

            源码详解如下:

            FailoverCluster类,实际使用了FailoverClusterInvoker类;

     1 /**
     2  * {@link FailoverClusterInvoker}
     3  *
     4  */
     5 public class FailoverCluster implements Cluster {
     6 
     7     public final static String NAME = "failover";
     8 
     9     public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    10         return new FailoverClusterInvoker<T>(directory);
    11     }
    12 
    13 }

            FailoverClusterInvoker类,继承了AbstractClusterInvoker类,通过doInvoke()方法实现了Failover机制:失败自动切换,当出现失败,重试其它服务器。

            doInvoke()被父类AbstractClusterInvoker的invoke()方法调用,实现逻辑为:

            (1)根据参数retries得到重试次数(默认0),设置循环最大次数len=retries+1;

            (2)以最大次数len开始循环:

            (3)首次循环,执行父类的select()方法,使用负载均衡机制选出一个invoker,执行invoker.invoke(invocation)返回结果;如果执行成功则返回退出;如果抛出异常,记录异常,继续循环;

            (4)检查invokers是否有效,重新执行步骤3,直到成功返回,或者循环到达最大次数len;

     1     public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
     2         List<Invoker<T>> copyinvokers = invokers;
     3         checkInvokers(copyinvokers, invocation);
    //根据参数retries得到重试次数(默认0),设置循环最大次数len=retries+1 4 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; 5 if (len <= 0) { 6 len = 1; 7 } 8 // retry loop. 9 RpcException le = null; // last exception. 10 List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. 11 Set<String> providers = new HashSet<String>(len); 12 for (int i = 0; i < len; i++) { 13 //Reselect before retry to avoid a change of candidate `invokers`. 14 //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
    //检查invokers是否有效 15 if (i > 0) { 16 checkWhetherDestroyed(); 17 copyinvokers = list(invocation); 18 // check again 19 checkInvokers(copyinvokers, invocation); 20 }
    //执行父类的select()方法,使用负载均衡机制选出一个invoker,执行invoker.invoke(invocation)返回结果;如果执行成功则返回退出;如果抛出异常,记录异常,继续循环 21 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); 22 invoked.add(invoker); 23 RpcContext.getContext().setInvokers((List) invoked); 24 try { 25 Result result = invoker.invoke(invocation); 26 if (le != null && logger.isWarnEnabled()) { 27 logger.warn("Although retry the method " + invocation.getMethodName() 28 + " in the service " + getInterface().getName() 29 + " was successful by the provider " + invoker.getUrl().getAddress() 30 + ", but there have been failed providers " + providers 31 + " (" + providers.size() + "/" + copyinvokers.size() 32 + ") from the registry " + directory.getUrl().getAddress() 33 + " on the consumer " + NetUtils.getLocalHost() 34 + " using the dubbo version " + Version.getVersion() + ". Last error is: " 35 + le.getMessage(), le); 36 } 37 return result; 38 } catch (RpcException e) { 39 if (e.isBiz()) { // biz exception. 40 throw e; 41 } 42 le = e; 43 } catch (Throwable e) { 44 le = new RpcException(e.getMessage(), e); 45 } finally { 46 providers.add(invoker.getUrl().getAddress()); 47 } 48 } 49 throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " 50 + invocation.getMethodName() + " in the service " + getInterface().getName() 51 + ". Tried " + len + " times of the providers " + providers 52 + " (" + providers.size() + "/" + copyinvokers.size() 53 + ") from the registry " + directory.getUrl().getAddress() 54 + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " 55 + Version.getVersion() + ". Last error is: " 56 + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); 57 }

            3、Failfast Cluster,快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

            FailfastClusterInvoker类,继承了AbstractClusterInvoker类,通过doInvoke()方法实现了Failfast机制:失败立即抛出异常。

            doInvoke()被父类AbstractClusterInvoker的invoke()方法调用:

     1     public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
     2         checkInvokers(invokers, invocation);
     3         Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
     4         try {
     5             return invoker.invoke(invocation);
     6         } catch (Throwable e) {
     7             if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
     8                 throw (RpcException) e;
     9             }
    10             throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
    11         }
    12     }

            4、Failsafe Cluster,失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

            FailfastClusterInvoker类,继承了AbstractClusterInvoker类,通过doInvoke()方法实现了Failsafe机制:失败出现异常直接忽略。

            doInvoke()被父类AbstractClusterInvoker的invoke()方法调用:

     1     public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
     2         try {
     3             checkInvokers(invokers, invocation);
     4             Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
     5             return invoker.invoke(invocation);
     6         } catch (Throwable e) {
     7             logger.error("Failsafe ignore exception: " + e.getMessage(), e);
     8             return new RpcResult(); // ignore
     9         }
    10     }

            5、Failback Cluster,失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

            FailbackClusterInvoker类,继承了AbstractClusterInvoker类,通过doInvoke()方法实现了Failback机制。

            doInvoke()被父类AbstractClusterInvoker的invoke()方法调用,记录每次失败的调用到failed这个map里,用一个定时任务执行器重试失败的任务;

     1     private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
     2         if (retryFuture == null) {
     3             synchronized (this) {
     4                 if (retryFuture == null) {
     5                     retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
     6 
     7                         public void run() {
     8                             // collect retry statistics
     9                             try {
    10                                 retryFailed();
    11                             } catch (Throwable t) { // Defensive fault tolerance
    12                                 logger.error("Unexpected error occur at collect statistic", t);
    13                             }
    14                         }
    15                     }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
    16                 }
    17             }
    18         }
    19         failed.put(invocation, router);
    20     }
    21 
    22     void retryFailed() {
    23         if (failed.size() == 0) {
    24             return;
    25         }
    26         for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
    27                 failed).entrySet()) {
    28             Invocation invocation = entry.getKey();
    29             Invoker<?> invoker = entry.getValue();
    30             try {
    31                 invoker.invoke(invocation);
    32                 failed.remove(invocation);
    33             } catch (Throwable e) {
    34                 logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
    35             }
    36         }
    37     }
    38 
    39     protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    40         try {
    41             checkInvokers(invokers, invocation);
    42             Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    43             return invoker.invoke(invocation);
    44         } catch (Throwable e) {
    45             logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
    46                     + e.getMessage() + ", ", e);
    47             addFailed(invocation, this);
    48             return new RpcResult(); // ignore
    49         }
    50     }

            6、Forking Cluster,并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。

            ForkingClusterInvoker类,继承了AbstractClusterInvoker类,通过doInvoke()方法实现了Forking机制。

            doInvoke()被父类AbstractClusterInvoker的invoke()方法调用,调用逻辑为:

            (1)通过select()获得足够多的invoker加入selected;

            (2)循环selected,每次从线程池获得一个线程对一个invoker发起调用请求,每次将返回的结果放入一个阻塞队列中,如果有异常,将最后一个invoker发生的异常放入阻塞队列;

            (3)从阻塞队列弹出第一个结果,如果不是异常,就直接返回,否则抛出异常;

     1     public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
     2         checkInvokers(invokers, invocation);
     3         final List<Invoker<T>> selected;
     4         final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
     5         final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
     6         if (forks <= 0 || forks >= invokers.size()) {
     7             selected = invokers;
     8         } else {
     9             selected = new ArrayList<Invoker<T>>();
    10             for (int i = 0; i < forks; i++) {
    11                 // TODO. Add some comment here, refer chinese version for more details.
    12                 Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
    13                 if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
    14                     selected.add(invoker);
    15                 }
    16             }
    17         }
    18         RpcContext.getContext().setInvokers((List) selected);
    19         final AtomicInteger count = new AtomicInteger();
    20         final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
    21         for (final Invoker<T> invoker : selected) {
    22             executor.execute(new Runnable() {
    23                 public void run() {
    24                     try {
    25                         Result result = invoker.invoke(invocation);
    26                         ref.offer(result);
    27                     } catch (Throwable e) {
    28                         int value = count.incrementAndGet();
    29                         if (value >= selected.size()) {
    30                             ref.offer(e);
    31                         }
    32                     }
    33                 }
    34             });
    35         }
    36         try {
    37             Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
    38             if (ret instanceof Throwable) {
    39                 Throwable e = (Throwable) ret;
    40                 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
    41             }
    42             return (Result) ret;
    43         } catch (InterruptedException e) {
    44             throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
    45         }
    46     }
  • 相关阅读:
    pgspider timescale 扩展docker 镜像
    pgspider perl 语言扩展
    postgresql_anonymizer 方便的数据脱敏扩展
    pgspider jsonb_deep_sum 扩展docker 镜像以及简单试用
    pgspider pgddl 扩展doker 镜像以及试用
    pgspider wal2json doker 镜像
    pgspider Citus节点数据移动操作
    pgspider Citus worker 账户密码问题
    pgspider Citu 副本以及节点移除简单学习
    Citus 官方docker demo 中membership-manager原理简单说明
  • 原文地址:https://www.cnblogs.com/markcd/p/8504674.html
Copyright © 2011-2022 走看看