zoukankan      html  css  js  c++  java
  • dubbo学习(七)集群

    部分描述来自官方文档,本文加以润色,半原创

    集群概述

    我们先来看一下集群的作用 :

    为了处理这些问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,这样服务消费者就可以专心处理远程调用相关事宜。比如发请求,接受服务提供者返回的数据等。这就是集群的作用。

    集群容错包含多个组件 : Cluster , Cluster Invoker , Directory , Router 和 LoadBalance 等,工作过程如下 :

    1297993-20200416151244069-1990721919.jpg

    分为两个阶段 :

    1. 在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。
    2. 在服务消费者进行远程调用时,经过路由,负载均衡等组件返回最终的 Invoker ,并调用最终的invoke 方法 。

    dubbo 提供以下几种对于 fail 的处理 。

    1297993-20200303161804627-848586633.png

    源码分析

    分析之前我们先要知道Cluster invoker 和 Cluster 的区别,后者是个接口,而前面只是 invoker 一种实现。 Cluster 接口,可以看到该接口只有一个方法,目的很明确就是生成 invoker .

    @SPI(FailoverCluster.NAME)
    public interface Cluster {
    
        /**
         * Merge the directory invokers to a virtual invoker.
         *
         * @param <T>
         * @param directory
         * @return cluster invoker
         * @throws RpcException
         */
        @Adaptive
        <T> Invoker<T> join(Directory<T> directory) throws RpcException;
    
    }
    
    

    下面看一下 Cluster 的实现类。

    public class FailoverCluster extends AbstractCluster {
    
        public final static String NAME = "failover";
    
        @Override
        public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<>(directory);
        }
    
    }
    
    
    
    public abstract class AbstractCluster implements Cluster {
    
        // 最终返回的对象是 InterceptorInvokerNode ,该类是内部类,定义在下面  
        private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
            AbstractClusterInvoker<T> last = clusterInvoker;
            List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);
    
            if (!interceptors.isEmpty()) {
                for (int i = interceptors.size() - 1; i >= 0; i--) {
                    final ClusterInterceptor interceptor = interceptors.get(i);
                    final AbstractClusterInvoker<T> next = last;
                    last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
                }
            }
            return last;
        }
    
        // Directory 指的是上节的服务字典(保存着privider的各种信息)
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
        }
    
        //子类实现
        protected abstract <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException;
    
        //内部类
        protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
    
            @Override
            public Result invoke(Invocation invocation) throws RpcException {
                Result asyncResult;
                try {
    
                    //before方法
                    interceptor.before(next, invocation);
                    //执行
                    asyncResult = interceptor.intercept(next, invocation);
                } catch (Exception e) {
                    // onError callback
                    if (interceptor instanceof ClusterInterceptor.Listener) {
                        ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                        listener.onError(e, clusterInvoker, invocation);
                    }
                    throw e;
                } finally {
                    //最终会执行 after 方法
                    interceptor.after(next, invocation);
                }
    
                return asyncResult.whenCompleteWithContext((r, t) -> {
                    // onResponse callback
                    if (interceptor instanceof ClusterInterceptor.Listener) {
                        ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                        if (t == null) {
                            listener.onMessage(r, clusterInvoker, invocation);
                        } else {
                            listener.onError(t, clusterInvoker, invocation);
                        }
                    }
                });
            }
    
    
            ...
        }
    
    
    }    
    

    可以看到 AbstractCluster 很巧妙地把返回 invoker 的操作留给子类实现,然后调用 buildClusterInterceptors 方法返回一个经过封装带有拦截器的 invoker 子类,我们可以看到该 invoker 的 invoke 方法出现的 before 和 after 钩子函数,就像 Spring 的切面编程,从而实现拦截器的功能。

    ok,下面就看一下 cluster Invoker 的实现。

    Cluster Invoker 源码分析

    前面说过,集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,这个在服务引用那篇文章中分析过,就不赘述。第二个阶段是在服务消费者进行远程调用时,此时 AbstractClusterInvoker 的 invoke 方法会被调用。列举 Invoker,负载均衡等操作均会在此阶段被执行。因此下面先来看一下 invoke 方法的逻辑。

    public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
    
        @Override
        public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
    
            // 绑定 attachments 到 invocation 中.
            // binding attachments into invocation.
            Map<String, Object> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                ((RpcInvocation) invocation).addAttachments(contextAttachments);
            }
    
            //可用的 invokers 取出来
            List<Invoker<T>> invokers = list(invocation);
            //负载均衡选择
            LoadBalance loadbalance = initLoadBalance(invokers, invocation);
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            
            //子类方法实现
            return doInvoke(invocation, invokers, loadbalance);
        }
    
        ...
    
        protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                           LoadBalance loadbalance) throws RpcException;
    }    
    
    
    ================================================================
    
        // FailoverClusterInvoker 的 doInvoke 方法实现
    
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyInvokers = invokers;
            checkInvokers(copyInvokers, invocation);
            String methodName = RpcUtils.getMethodName(invocation);
            int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            // for 重试 
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyInvokers = list(invocation);
                    // check again
                    checkInvokers(copyInvokers, invocation);
                }
                
                //选择最终的 invoker , select 定义在父类中
                Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    //最终调用
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + methodName
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyInvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    //记录提供者信息,以便抛出异常可以排查
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le.getCode(), "Failed to invoke the method "
                    + methodName + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyInvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + le.getMessage(), le.getCause() != null ? le.getCause() : le);
        }
    
    
    

    这里我们知道了大概的思路,虽然 FailoverClusterInvoker 也叫 invoker , 但是最终可以远程调用的 invoker 是在 Directory (服务字典)中, FailoverClusterInvoker 只是做了个封装。

    我们看一下父类的 select 方法是如何实现获取最终的 invoker 的 。

        
        protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                                    List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    
            if (CollectionUtils.isEmpty(invokers)) {
                return null;
            }
            String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
    
            boolean sticky = invokers.get(0).getUrl()
                    .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);
    
            //ignore overloaded method
            if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                stickyInvoker = null;
            }
            //ignore concurrency problem
            if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                if (availablecheck && stickyInvoker.isAvailable()) {
                    return stickyInvoker;
                }
            }
    
            Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
    
            if (sticky) {
                stickyInvoker = invoker;
            }
            return invoker;
        }
    
        private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                                    List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    
            if (CollectionUtils.isEmpty(invokers)) {
                return null;
            }
            if (invokers.size() == 1) {
                return invokers.get(0);
            }
    
            // 调用 loadbalance 的 select 方法
            Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    
            //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
            if ((selected != null && selected.contains(invoker))
                    || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
                try {
    
                    //重新选择的逻辑
                    Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                    if (rInvoker != null) {
                        invoker = rInvoker;
                    } else {
                        //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                        int index = invokers.indexOf(invoker);
                        try {
                            //Avoid collision
                            invoker = invokers.get((index + 1) % invokers.size());
                        } catch (Exception e) {
                            logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                        }
                    }
                } catch (Throwable t) {
                    logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
                }
            }
            return invoker;
        }
    

    其中 select 方法,主要的目的 :

    select 方法的主要逻辑集中在了对粘滞连接特性的支持上。首先是获取 sticky 配置,然后再检测 invokers 列表中是否包含 stickyInvoker,如果不包含,则认为该 stickyInvoker 不可用,此时将其置空。这里的 invokers 列表可以看做是存活着的服务提供者列表,如果这个列表不包含 stickyInvoker,那自然而然的认为 stickyInvoker 挂了,所以置空。如果 stickyInvoker 存在于 invokers 列表中,此时要进行下一项检测 — 检测 selected 中是否包含 stickyInvoker。如果包含的话,说明 stickyInvoker 在此之前没有成功提供服务(但其仍然处于存活状态)。此时我们认为这个服务不可靠,不应该在重试期间内再次被调用,因此这个时候不会返回该 stickyInvoker。如果 selected 不包含 stickyInvoker,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。当可用性检测通过,才可返回 stickyInvoker,否则调用 doSelect 方法选择 Invoker。如果 sticky 为 true,此时会将 doSelect 方法选出的 Invoker 赋值给 stickyInvoker。

    接着是分析 doSelect 方法,我们先看一下方法上的注解 :

    Select a invoker using loadbalance policy. 
    
    a) Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or, if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker
    
    b) Reselection, the validation rule for reselection: selected > available. This rule guarantees that the selected invoker has the minimum chance to be one in the previously selected list, and also guarantees this invoker is available.
    
    
    

    上面的注解已经说明了该方法执行的逻辑,简单点说就是获取之前一直成功的 invoker ,否则进行 reselect , reselect 过程中会避开之前失败的节点。

    FailbackClusterInvoker

        @Override
        protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            Invoker<T> invoker = null;
            try {
                checkInvokers(invokers, invocation);
                invoker = select(loadbalance, invocation, invokers, null);
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                        + e.getMessage() + ", ", e);
                //失败后,加入一个任务,用于定时重试
                addFailed(loadbalance, invocation, invokers, invoker);
                return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
            }
        }
    
    
        private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
            //Timer类 
            if (failTimer == null) {
                synchronized (this) {
                    if (failTimer == null) {
                        failTimer = new HashedWheelTimer(
                                new NamedThreadFactory("failback-cluster-timer", true),
                                1,
                                TimeUnit.SECONDS, 32, failbackTasks);
                    }
                }
            }
            RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
            try {
                // 默认 5 秒执行一次
                failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
            } catch (Throwable e) {
                logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
            }
        }
    
    
         /**
         * RetryTimerTask 内部类 
         */
        private class RetryTimerTask implements TimerTask {
            private final Invocation invocation;
            private final LoadBalance loadbalance;
            private final List<Invoker<T>> invokers;
            private final int retries;
            private final long tick;
            private Invoker<T> lastInvoker;
            private int retryTimes = 0;
    
            RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
                this.loadbalance = loadbalance;
                this.invocation = invocation;
                this.invokers = invokers;
                this.retries = retries;
                this.tick = tick;
                this.lastInvoker=lastInvoker;
            }
    
            @Override
            public void run(Timeout timeout) {
                try {
                    //重新选择,重新调用
                    Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                    lastInvoker = retryInvoker;
                    retryInvoker.invoke(invocation);
                } catch (Throwable e) {
                    logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                    if ((++retryTimes) >= retries) {
                        logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                    } else {
                        //重新放入执行
                        rePut(timeout);
                    }
                }
            }
    
            private void rePut(Timeout timeout) {
                if (timeout == null) {
                    return;
                }
    
                Timer timer = timeout.timer();
                if (timer.isStop() || timeout.isCancelled()) {
                    return;
                }
    
                timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
            }
        }
       
    

    FailbackClusterInvoker 利用一个 Timer 去执行重试操作,逻辑没什么复杂的。其他的 invoker 不再分析了。

    使用示例

    下面展示failback 的集群失败策略。 provider 端

    public interface DemoService {
        String sayHello(String name)  throws Exception;
    }
    
    
    
    public class DemoServiceImpl implements DemoService {
        private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
    
        int count = 0 ;
    
        @Override
        public String sayHello(String name) throws Exception{
    
            logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
            logger.info("我被调用啦 +  " + count++);
            try {
                Thread.sleep(8000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
        }
        ...
    }
    
    

    提供者的配置

    <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
           xmlns="http://www.springframework.org/schema/beans"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
           http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    
        <dubbo:application name="demo-provider"/>
    
        <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
    
        <dubbo:protocol name="dubbo"/>
    
        <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
    
        <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failback"  />
    
    </beans>
    
    

    消费端

    public class Application {
    
        public static void main(String[] args) throws Exception {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
            context.start();
            try {
                DemoService demoService = context.getBean("demoService", DemoService.class);
                String s = demoService.sayHello("aaa");
                System.out.println("s : " + s);
                System.out.println("进入等待 : ");
                Thread.sleep(1000 * 50);
            } catch (Exception e) {
                Thread.sleep(1000 * 50);
    
            }
    
        }
    }
    
    

    然后看一下提供端的日志:

    [13/04/20 16:38:30:713 CST] DubboServerHandler-192.168.10.27:20880-thread-2  INFO provider.DemoServiceImpl: Hello aaa, request from consumer: /192.168.10.27:8693
    [13/04/20 16:38:30:713 CST] DubboServerHandler-192.168.10.27:20880-thread-2  INFO provider.DemoServiceImpl: 我被调用啦 +  0
    [13/04/20 16:38:37:544 CST] DubboServerHandler-192.168.10.27:20880-thread-3  INFO provider.DemoServiceImpl: Hello aaa, request from consumer: /192.168.10.27:8693
    [13/04/20 16:38:37:544 CST] DubboServerHandler-192.168.10.27:20880-thread-3  INFO provider.DemoServiceImpl: 我被调用啦 +  1
    [13/04/20 16:38:44:544 CST] DubboServerHandler-192.168.10.27:20880-thread-4  INFO provider.DemoServiceImpl: Hello aaa, request from consumer: /192.168.10.27:8693
    [13/04/20 16:38:44:544 CST] DubboServerHandler-192.168.10.27:20880-thread-4  INFO provider.DemoServiceImpl: 我被调用啦 +  2
    [13/04/20 16:38:51:545 CST] DubboServerHandler-192.168.10.27:20880-thread-5  INFO provider.DemoServiceImpl: Hello aaa, request from consumer: /192.168.10.27:8693
    [13/04/20 16:38:51:545 CST] DubboServerHandler-192.168.10.27:20880-thread-5  INFO provider.DemoServiceImpl: 我被调用啦 +  3
    

    我们发现共重试了3次,然后消费者就丢弃了这个调用,不再调用,这种适用于异步消息通知的调用。

    总结

    dubbo对于集群内调用失败的几种处置方式

    参考资料

    • https://dubbo.apache.org/zh-cn/blog/dubbo-cluster-error-handling.html
  • 相关阅读:
    如何处理iOS中照片的方向
    Builder Pattern 在 Objective-C 中的使用
    多线程(三)-- 线程安全问题
    多线程(二)--NSThread基本使用
    多线程 (一)
    报错:Request failed: unacceptable content-type: text/html
    Cocoapods简单安装和使用
    Objective
    Objective
    Python学习笔记(一)--注释
  • 原文地址:https://www.cnblogs.com/Benjious/p/12720165.html
Copyright © 2011-2022 走看看