zoukankan      html  css  js  c++  java
  • dubbo——容错模式

    一、dubbo提供6种容错模式

    • failover:默认模式。调用失败时,自动切换重试,可通过retries属性设置重试次数。适用于读操作,每次读数据都相同。
    • failfast:快速失败模式。只调用一次,调用失败时立即报错。适用于写操作,不能重复写。
    • failsafe:安全失败模式。只调用一次,调用失败时忽略失败的调用,记录日志。
    • failback:失败恢复模式。在失败后自动恢复,后台记录失败的请求,定时重发。通常用于消息通知。
    • forking:并行调用多个服务器,只要有一个成功便返回。通常用于实时性要求较高的读操作,会浪费服务资源。
    • broadcast:广播调用所有的提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

    二、FailOverClusterInvoker——失败重试

    /* org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker */
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyInvokers = invokers;
            //检查providers不能为空
            checkInvokers(copyInvokers, invocation);
            //获取调用的方法名
            String methodName = RpcUtils.getMethodName(invocation);
            //调用次数  1+2次重试
            int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //失败重试,成功返回
                if (i > 0) {
                    //再次检查invokers.destroyed
                    checkWhetherDestroyed();
                    copyInvokers = list(invocation);
                    // check again
                    checkInvokers(copyInvokers, invocation);
                }
                //负载均衡策略,选取一个invoker
                Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
                //标记已经调用过的invoker,重试时,换其他invoker重试
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + methodName
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyInvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    //成功返回
                    return result;
                    //catch报错,失败重试
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le.getCode(), "Failed to invoke the method "
                    + methodName + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyInvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + le.getMessage(), le.getCause() != null ? le.getCause() : le);
        }

    三、FailfastClusterInvoker——失败抛错

    /* org.apache.dubbo.rpc.cluster.support.FailfastClusterInvoker#doInvoke */
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            //检查invokers不能为空
            checkInvokers(invokers, invocation);
            //负载均衡策略,选取一个invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            try {
                //调用
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                //调用失败直接抛错
                if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                    throw (RpcException) e;
                }
                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                        "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                                + " select from all providers " + invokers + " for service " + getInterface().getName()
                                + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                + " use dubbo version " + Version.getVersion()
                                + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                        e.getCause() != null ? e.getCause() : e);
            }
        }

    四、FaisafeClusterInvoker——失败忽略,记录日志

    /* org.apache.dubbo.rpc.cluster.support.FailsafeClusterInvoker#doInvoke */
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                //检查invokers不能为空
                checkInvokers(invokers, invocation);
                //负载均衡策略,选取一个invoker
                Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                //调用
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                //报错忽略
                logger.error("Failsafe ignore exception: " + e.getMessage(), e);
                return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
            }
        }

    五、FailbackClusterInvoker——失败恢复,记录失败请求,定时重发

    /* org.apache.dubbo.rpc.cluster.support.FailbackClusterInvoker#doInvoke */
        protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            Invoker<T> invoker = null;
            try {
                //检查invokers不能为空
                checkInvokers(invokers, invocation);
                //负载均衡策略,选取一个invoker
                invoker = select(loadbalance, invocation, invokers, null);
                //调用
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                //失败忽略
                logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                        + e.getMessage() + ", ", e);
                //创建定时任务TimeTask实现
                addFailed(loadbalance, invocation, invokers, invoker);
                //返回记录失败日志
                return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
            }
        }

    六、ForkingClusterInvoker——并行调用,有一个调用成功便返回

    /* org.apache.dubbo.rpc.cluster.support.ForkingClusterInvoker#doInvoke */
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                //检查invokers不能为空
                checkInvokers(invokers, invocation);
                final List<Invoker<T>> selected;
                final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
                final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                if (forks <= 0 || forks >= invokers.size()) {
                    //并发数forks大于invokers.size()时,并发调用所有invoker
                    selected = invokers;
                } else {
                    //并发数forks小于invokers.size()时,调用负载均衡策略选取的forks个invoker
                    selected = new ArrayList<>();
                    for (int i = 0; i < forks; i++) {
                        Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                        if (!selected.contains(invoker)) {
                            //Avoid add the same invoker several times.
                            selected.add(invoker);
                        }
                    }
                }
                RpcContext.getContext().setInvokers((List) selected);
                final AtomicInteger count = new AtomicInteger();
                final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
                for (final Invoker<T> invoker : selected) {
                    executor.execute(() -> {
                        try {
                            //并发调用,报错忽略
                            Result result = invoker.invoke(invocation);
                            //创建一个队列,存储返回结果
                            ref.offer(result);
                        } catch (Throwable e) {
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    });
                }
                try {
                    //取队首元素(结果),设置超时时间
                    Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                    if (ret instanceof Throwable) {
                        //正常的报错抛出错误
                        Throwable e = (Throwable) ret;
                        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                    }
                    //调用成功返回
                    return (Result) ret;
                } catch (InterruptedException e) {
                    //超时抛错
                    throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
                }
            } finally {
                // clear attachments which is binding to current thread.
                RpcContext.getContext().clearAttachments();
            }
        }

    七、BroadcastClusterInvoker——串行调用,所有调用成功则成功,有一台报错则抛错

    /* org.apache.dubbo.rpc.cluster.support.BroadcastClusterInvoker#doInvoke */
        public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            //检查invokers不能为空
            checkInvokers(invokers, invocation);
            RpcContext.getContext().setInvokers((List) invokers);
            RpcException exception = null;
            Result result = null;
            for (Invoker<T> invoker : invokers) {
                try {
                    //遍历调用
                    result = invoker.invoke(invocation);
                } catch (RpcException e) {
                    exception = e;
                    logger.warn(e.getMessage(), e);
                } catch (Throwable e) {
                    exception = new RpcException(e.getMessage(), e);
                    logger.warn(e.getMessage(), e);
                }
            }
            //有一个invoker调用抛错,则抛出错误
            if (exception != null) {
                throw exception;
            }
            return result;
        }

     

  • 相关阅读:
    多线程爬取斗图啦图片
    fiddler配置https
    Linux相关命令实例及解析
    htm、html、shtml网页区别
    什么是中间件?常见中间件有哪些?
    列举常见的关系型数据库和非关系型都有那些?
    什么是dao模式,dao模式的实现方法
    如何理解fine-grained和coarse-grained?
    .Net 理解持久层(Persistence Layer)
    web server与app server有什么不同
  • 原文地址:https://www.cnblogs.com/wqff-biubiu/p/12505063.html
Copyright © 2011-2022 走看看