zoukankan      html  css  js  c++  java
  • Dubbo 集群容错模式详解

    参考:

    https://blog.csdn.net/u011642663/article/details/81913325

    https://blog.csdn.net/u011642663/article/details/81949941

      

    Dubbo 的集群容错模式:Failover Cluster

    失败自动切换,当出现失败,重试其它服务器,通常用于读操作(推荐使用),缺点:重试会带来更长延迟

    本文简单介绍 Dubbo 中的 Failover Cluster(失败自动切换)。

    1 简介

    调用实例失败后,继续调用其他实例。假如有 3 个实例:A, B, C,当调用 A 失败后,再调用 B,如果还是失败,则调用 C。

    2 如何使用

    <dubbo:service cluster="failover" retries="2"/>

    <dubbo:reference cluster="failover" retries="2"/>

    3.实现逻辑

    1. 根据负载均衡算法选中被调用实例

    2. 执行选中的实例;将实例保存到已经调用的列表中

    3. 执行成功则返回;执行不成功则选下个调用实例(排除已经调用的实例)

    4 源代码

    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

       public FailoverClusterInvoker(Directory<T> directory) {
    super(directory);
       }

    @Override
       @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
           checkInvokers(copyinvokers, invocation);
           // 获取调用次数的配置 retries
           int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
           if (len <= 0) {
    len = 1;
           }
    // retry loop.
           RpcException le = null; // last exception.
           // 记录已经被调用的实例
           List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
           Set<String> providers = new HashSet<String>(len);
           for (int i = 0; i < len; i++) {
    //Reselect before retry to avoid a change of candidate `invokers`.
               //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
               if (i > 0) {
    checkWhetherDestroyed();
                   copyinvokers = list(invocation);
                   // check again
                   checkInvokers(copyinvokers, invocation);
               }
    // 选择这次要调用的实例
               Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
               // 添加到已经调用的实例列表中
               invoked.add(invoker);
               // 保存到上下文
               RpcContext.getContext().setInvokers((List) invoked);
               try {
    // 执行调用
                   Result result = invoker.invoke(invocation);
                   if (le != null && logger.isWarnEnabled()) {
    logger.warn("Although retry the method " + invocation.getMethodName()
    + " in the service " + getInterface().getName()
    + " was successful by the provider " + invoker.getUrl().getAddress()
    + ", but there have been failed providers " + providers
    + " (" + providers.size() + "/" + copyinvokers.size()
    + ") from the registry " + directory.getUrl().getAddress()
    + " on the consumer " + NetUtils.getLocalHost()
    + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                               + le.getMessage(), le);
                   }
    return result;
               } catch (RpcException e) {
    if (e.isBiz()) { // biz exception.
                       throw e;
                   }
    le = e;
               } catch (Throwable e) {
    le = new RpcException(e.getMessage(), e);
               } finally {
    // 保存已经调用的实例的 url 地址
                   providers.add(invoker.getUrl().getAddress());
               }
    }
    // 重试了 retries 次后如果没有调用成功,则报错
           throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                   + invocation.getMethodName() + " in the service " + getInterface().getName()
    + ". Tried " + len + " times of the providers " + providers
    + " (" + providers.size() + "/" + copyinvokers.size()
    + ") from the registry " + directory.getUrl().getAddress()
    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                   + Version.getVersion() + ". Last error is: "
                   + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
       }

    }

    Dubbo 的集群容错模式:Failback Cluster

    失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作。 缺点:不可靠,重启丢失

    本文简单介绍 Dubbo 中的 Failback Cluster(失败自动回复,定时重发)。

    简介

    调用实例发生异常后,一段时间后重新再调用,直到调用成功。

    如何使用

    <dubbo:service cluster="failback" />

    <dubbo:reference cluster="failback" />

    实现逻辑

    1. 根据负载均衡算法选中被调用实例
    2. 执行选中的实例
    3. 执行成功则返回;执行有异常则定时重新发送请求,默认发送定时时间为 5 秒

    实现代码

    public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
    
        private static final long RETRY_FAILED_PERIOD = 5 * 1000;
    
        /**
         * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
         * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
         */
        private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
                new NamedInternalThreadFactory("failback-cluster-timer", true));
    
        private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
        private volatile ScheduledFuture<?> retryFuture;
    
        public FailbackClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
            if (retryFuture == null) {
                synchronized (this) {
                    // 锁定当前对象
                    if (retryFuture == null) {
                        // 双重检查是否初始化实例 retryFuture
                        retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
    
                            @Override
                            public void run() {
                                // collect retry statistics
                                try {
                                    retryFailed();
                                } catch (Throwable t) { // Defensive fault tolerance
                                    logger.error("Unexpected error occur at collect statistic", t);
                                }
                            }
                            // 5s 发送重试一次
                        }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                    }
                }
            }
            failed.put(invocation, router);
        }
    
        // 重新调用
        void retryFailed() {
            // 没有调用失败的实例则不进行重试
            if (failed.size() == 0) {
                return;
            }
            // 遍历所有调用失败的实例,进行重试调用
            for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
                    failed).entrySet()) {
                Invocation invocation = entry.getKey();
                Invoker<?> invoker = entry.getValue();
                try {
                    invoker.invoke(invocation);
                    failed.remove(invocation);
                } catch (Throwable e) {
                    logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                }
            }
        }
    
        @Override
        protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                // 根据负载均衡算法选中调用实例
                Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                // 执行调用实例
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                        + e.getMessage() + ", ", e);
                // 调用出现异常,则定时重试直到成功
                addFailed(invocation, this);
                return new RpcResult(); // ignore
            }
        }
    
    }

    Dubbo 的集群容错模式:Failfast Cluster

    快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作   缺点:如果有机器正在重启,可能会出现调用失败

    本文简单介绍 Dubbo 中的 Failfast Cluster。

    1 简介

    调用实例失败后,如果有报错,则直接抛出异常。

    2 如何使用

    <dubbo:service cluster="failfast" /> 

    <dubbo:reference cluster="failfast" />

     3.实现逻辑

    1. 根据负载均衡算法选中被调用实例

    2. 执行选中的实例

    3. 执行成功则返回;执行有异常则直接抛出异常,不进行重试等操作

     

    4.实现代码

    public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailfastClusterInvoker(Directory<T> directory) {
    super(directory);
       }

    @Override
       public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
           // 选择调用实例
           Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
           try {
    // 执行调用
               return invoker.invoke(invocation);
           } catch (Throwable e) {
    // 如果有异常则直接抛出异常
               if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                   throw (RpcException) e;
               }
    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
           }
    }
    }

    Dubbo 的集群容错模式:Failsafe Cluster

    失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作   缺点:调用信息丢失

    本文简单介绍 Dubbo 中的 Failsafe Cluster(安全失败)。

    简介

    调用实例失败后,如果有报错,则忽略掉异常,返回一个正常的空结果。

    如何使用

    <dubbo:service cluster="failsafe" />

    <dubbo:reference cluster="failsafe" />

    实现逻辑

    1. 根据负载均衡算法选中被调用实例
    2. 执行选中的实例
    3. 执行成功则返回;执行有异常则 catch 异常,然后返回一个正常的空结果

    源代码

    public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
        private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
    
        public FailsafeClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                // 根据负载均衡算法选中调用实例
                Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                // 执行调用实例
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                // 有异常时不抛出异常,返回一个 RpcResult 对象
                logger.error("Failsafe ignore exception: " + e.getMessage(), e);
                return new RpcResult(); // ignore
            }
        }
    }

    Dubbo 的集群容错模式:Forking Cluster

    并行调用多个服务器,只要一个成功即返回,通常用于实时性要求较高的读操作  缺点:需要浪费更多服务资源【适用于高德上同时打多个车,打到一个即取消其他;或者利用第三方回调,第一个回调当做成功处理,其他认为是异常】

    本文简单介绍 Dubbo 中的 Forking Cluster(并行调用多个服务器,只要一个成功就返回)

    简介

    并行调用多个实例,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。通过 timeout=”1000” 来设置调用超时时间

    如何使用

    <dubbo:service cluster="forking" forks="2" timeout="1000" />

    <dubbo:reference cluster="forking" forks="2" timeout="1000" />

    实现逻辑

    1. 计算目前需要的并发数,通过负载均衡算法选中被调用实例列表
    2. 并发地调用实例列表,并将处理结果成功的放到阻塞队列中
    3. 获取处理结果队列中的第一个结果,判断是否是异常,是异常则抛出,不是异常则返回结果

    源代码

    public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        /**
         * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
         * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
         */
        // 使用 Cached 线程池
        private final ExecutorService executor = Executors.newCachedThreadPool(
                new NamedInternalThreadFactory("forking-cluster-timer", true));
    
        public ForkingClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                final List<Invoker<T>> selected;
                // 并行数量,默认是 2
                final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
                // 调用超时, 默认是 1s
                final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                if (forks <= 0 || forks >= invokers.size()) {
                    // 当并行数配置超过调用实例数量,则默认为调用实例数
                    selected = invokers;
                } else {
                    selected = new ArrayList<Invoker<T>>();
                    for (int i = 0; i < forks; i++) {
                        // TODO. Add some comment here, refer chinese version for more details.
                        // 通过负载均衡算法选中实例
                        Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                        if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                            // 添加到选中的实例列表中
                            selected.add(invoker);
                        }
                    }
                }
                // 将选中的实例添加到上下文中
                RpcContext.getContext().setInvokers((List) selected);
                // 记录调用异常的次数
                final AtomicInteger count = new AtomicInteger();
                // 使用阻塞队列来存放调用实例的执行结果
                final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
                // 遍历所有被选中的调用者,一个子线程执行一个调用实例
                for (final Invoker<T> invoker : selected) {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // 执行调用实例
                                Result result = invoker.invoke(invocation);
                                // 将结果放到阻塞队列中
                                ref.offer(result);
                            } catch (Throwable e) {
                                // 记录调用异常的次数
                                int value = count.incrementAndGet();
                                if (value >= selected.size()) {
                                    // 根据队列先入先出原则,因为该算法是只要有一个调用成功就好,
                                    // 所以只有当全部的调用实例都失败,才记录到队列中,因为下面取结果会判断是不是异常
                                    ref.offer(e);
                                }
                            }
                        }
                    });
                }
                try {
                    // 取出结果
                    Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                    // 没有一个实例执行成功,抛出异常
                    if (ret instanceof Throwable) {
                        Throwable e = (Throwable) ret;
                        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                    }
                    return (Result) ret;
                } catch (InterruptedException e) {
                    throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
                }
            } finally {
                // 清除上下文信息
                // clear attachments which is binding to current thread.
                RpcContext.getContext().clearAttachments();
            }
        }
    }

    Dubbo 的集群容错模式:Broadcast Cluster

    广播调用所有提供者,逐个调用,任意一台报错则报错,通常用于更新提供方本地状态   缺点:速度慢,任意一台报错则报错

    本文简单介绍 Dubbo 中的 Broadcast Cluster(广播调用所有提供者,逐个调用,任何一台报错则报错)。

    简介

    广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息

    如何使用

    <dubbo:service cluster="broadcast" />

    <dubbo:reference cluster="broadcast" />

    实现逻辑

    1. 循环调用所有的实例
    2. 如果有发生异常则记录异常保存
    3. 只要有异常,则抛出异常,如果没有则返回执行结果

    源代码

    public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
    
        public BroadcastClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            checkInvokers(invokers, invocation);
            RpcContext.getContext().setInvokers((List) invokers);
            RpcException exception = null;
            Result result = null;
            // 广播到所有的被调用实例
            for (Invoker<T> invoker : invokers) {
                try {
                    result = invoker.invoke(invocation);
                } catch (RpcException e) {
                    // 记录异常
                    exception = e;
                    logger.warn(e.getMessage(), e);
                } catch (Throwable e) {
                    // 记录异常
                    exception = new RpcException(e.getMessage(), e);
                    logger.warn(e.getMessage(), e);
                }
            }
            // 只要有一个实例有异常则报错
            if (exception != null) {
                throw exception;
            }
            return result;
        }
    
    }

    Dubbo 的集群容错模式:Available Cluster

    本文简单介绍 Dubbo 中的 Available Cluster(可用的实例)。

    简介

    调用目前可用的实例(只调用一个),如果当前没有可用的实例,则抛出异常。

    如何使用

    <dubbo:service cluster="available" />

    <dubbo:reference cluster="available" />

    实现逻辑

    1. 遍历所有的实例
    2. 遍历到第一个可用的实例,调用该实例
    3. 如果没有可用的实例,则抛出异常

    源代码

    public class AvailableClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        public AvailableClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            // 调用所有可用的实例
            for (Invoker<T> invoker : invokers) {
                if (invoker.isAvailable()) {
                    // 实例可用则调用
                    return invoker.invoke(invocation);
                }
            }
            // 如果没有可用的实例,则抛出异常
            throw new RpcException("No provider available in " + invokers);
        }
    
    }

    Dubbo 的集群容错模式:Mergeable Cluster

    本文简单介绍 Dubbo 中的 Mergeable Cluster(结果可合并)。

    简介

    该集群容错模式下,可以合并结果集,一般和 group 一起使用,具体使用规则参考 分组聚合

    实现逻辑

    1. 如果没有配置 merger,则不合并结果集,直接调用实例执行后返回
    2. 异步调用所有的实例
    3. 获取要调用的方法的返回类型,根据返回类型进行调用,如果是 . 开头的,则调用方法;否则如果有配置自定义合并器,则调用自动以合并器,如果没有则调用默认的合并器

    源代码

    public class MergeableClusterInvoker<T> implements Invoker<T> {
    
        private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class);
        private final Directory<T> directory;
        private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));
    
        public MergeableClusterInvoker(Directory<T> directory) {
            this.directory = directory;
        }
    
        @Override
        @SuppressWarnings("rawtypes")
        public Result invoke(final Invocation invocation) throws RpcException {
            List<Invoker<T>> invokers = directory.list(invocation);
    
            // 获取 merger 配置参数值
            String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
            if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
                // 没有配置 merger,则默认所有调用实例都是一个组的
                for (final Invoker<T> invoker : invokers) {
                    if (invoker.isAvailable()) {
                        // 调用可用的实例
                        return invoker.invoke(invocation);
                    }
                }
                // 如果都是不可用的,则调用第一个实例
                return invokers.iterator().next().invoke(invocation);
            }
    
            Class<?> returnType;
            try {
                returnType = getInterface().getMethod(
                        invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
            } catch (NoSuchMethodException e) {
                returnType = null;
            }
    
            // 异步调用所有的实例,并把对象存储到 results 中
            Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
            for (final Invoker<T> invoker : invokers) {
                Future<Result> future = executor.submit(new Callable<Result>() {
                    @Override
                    public Result call() throws Exception {
                        return invoker.invoke(new RpcInvocation(invocation, invoker));
                    }
                });
                results.put(invoker.getUrl().getServiceKey(), future);
            }
    
            Object result = null;
    
            List<Result> resultList = new ArrayList<Result>(results.size());
    
            // 获取超时时间
            int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 遍历获取执行实例的结果
            for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
                Future<Result> future = entry.getValue();
                try {
                    Result r = future.get(timeout, TimeUnit.MILLISECONDS);
                    if (r.hasException()) {
                        // 调用的服务内部有异常,不抛出,记录
                        log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + 
                                        " failed: " + r.getException().getMessage(), 
                                r.getException());
                    } else {
                        // 保存执行结果
                        resultList.add(r);
                    }
                } catch (Exception e) {
                    // 有异常则抛出
                    throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
                }
            }
    
            if (resultList.isEmpty()) {
                return new RpcResult((Object) null);
            } else if (resultList.size() == 1) {
                return resultList.iterator().next();
            }
    
            if (returnType == void.class) {
                return new RpcResult((Object) null);
            }
    
    
            if (merger.startsWith(".")) {
                // . 开头,则需要调用自定义的方法来进行合并
                merger = merger.substring(1);
                Method method;
                try {
                    method = returnType.getMethod(merger, returnType);
                } catch (NoSuchMethodException e) {
                    throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + 
                            returnType.getClass().getName() + " ]");
                }
                if (!Modifier.isPublic(method.getModifiers())) {
                    method.setAccessible(true);
                }
                result = resultList.remove(0).getValue();
                try {
                    // 调用方法进行合并
                    if (method.getReturnType() != void.class
                            && method.getReturnType().isAssignableFrom(result.getClass())) {
                        for (Result r : resultList) {
                            result = method.invoke(result, r.getValue());
                        }
                    } else {
                        for (Result r : resultList) {
                            method.invoke(result, r.getValue());
                        }
                    }
                } catch (Exception e) {
                    throw new RpcException("Can not merge result: " + e.getMessage(), e);
                }
            } else {
                Merger resultMerger;
                if (ConfigUtils.isDefault(merger)) {
                    // 执行默认的合并器
                    resultMerger = MergerFactory.getMerger(returnType);
                } else {
                    // 执行自定义的 Merger 合并器来合并
                    resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
                }
                if (resultMerger != null) {
                    List<Object> rets = new ArrayList<Object>(resultList.size());
                    for (Result r : resultList) {
                        rets.add(r.getValue());
                    }
                    // 调用合并器 merge 方法
                    result = resultMerger.merge(
                            rets.toArray((Object[]) Array.newInstance(returnType, 0)));
                } else {
                    throw new RpcException("There is no merger to merge result.");
                }
            }
            return new RpcResult(result);
        }
    
        @Override
        public Class<T> getInterface() {
            return directory.getInterface();
        }
    
        @Override
        public URL getUrl() {
            return directory.getUrl();
        }
    
        @Override
        public boolean isAvailable() {
            return directory.isAvailable();
        }
    
        @Override
        public void destroy() {
            directory.destroy();
        }
    
        private String getGroupDescFromServiceKey(String key) {
            int index = key.indexOf("/");
            if (index > 0) {
                return "group [ " + key.substring(0, index) + " ]";
            }
            return key;
        }
    }
  • 相关阅读:
    yii2手动添加图片处理插件Imagine
    ElementUI组件库常见方法及问题汇总(持续更新)
    JS如何使用Math.atan2获取两点之间角度的实践案例
    给HTML拍个照(如何将html元素转成图片)
    Canvas知识点汇总
    如何修改Vue打包后文件的接口地址配置
    Angular6 基础(数据绑定、生命周期、父子组件通讯、响应式编程)
    前端使用express+node实现接口模拟及websocket通讯
    前端如何使用proxyTable和nginx解决跨域问题
    微信小程序入门案例
  • 原文地址:https://www.cnblogs.com/xuwc/p/14032932.html
Copyright © 2011-2022 走看看