zoukankan      html  css  js  c++  java
  • Dubbo集群简介

    1 简介

    1.1 概述

    为避免单点故障,通常会部署多台服务器。作为服务消费者,在调用时需要考虑:

    • ① 选择哪个服务提供者进行调用
    • ② 若服务提供者调用失败,将如何操作

    ClusterInvoker选择服务提供者的大概过程如下:

    • 首先,调用Directory#list()获取Invoker列表(该列表经过路由过滤):
    • 最后,通过LoadBalance#select()获取最终的Invoker

    1.2 Cluster和ClusterInvoker

    Dubbo定义了两个集群对象:

    • Cluster接口:用于创建ClusterInvoker
    • ClusterInvoker抽象类:实现Invoker接口

    image

    示例代码如下:

    public class FailoverCluster implements Cluster {
        public final static String NAME = "failover";
    
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<T>(directory);
        }
    }
    

    2 ClusterInvoker分析

    2.1 抽象基类

    2.1.1 invoke

    该方法会执行如下逻辑:

    • 绑定attachments到invocation中.
    • 从服务目录中获取Invoker列表
    • 执行Invoker
    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;
    
        // 绑定attachments到invocation中.
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
        // 从服务目录中获取Invoker列表
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 执行Invoker
        return doInvoke(invocation, invokers, loadbalance);
    }
    

    2.1.2 list

    该方法用于从服务目录中获取Invoker列表,代码十分简单,如下:

    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
    }
    

    2.1.3 select

    select()方法主要逻辑:

    • 对粘带属性的支持:让服务消费者尽可能的调用同一个服务提供者,除非该提供者挂了再进行切换。
    • doSelect()方法:根据负载均衡算法获取一个服务提供者Invoker
    // invokers:服务目录中获取的所有Invoker列表
    // invoked:在本次调用中,已经执行过的invokers列表
    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.isEmpty())
            return null;
        // 获取调用方法
        String methodName = invocation == null ? "" : invocation.getMethodName();
        // 获取sticky属性(粘带属性)
        boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
        {
            // 如果stickyInvoker!=null且invokers列表也不包含stickyInvoker,说明stickyInvoker代表的服务提供者挂了
            if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                stickyInvoker = null;
            }
            //如果sticky==true且stickyInvoker!=null,此时如果selected包含stickyInvoker
            //说明stickyInvoker对应服务提供者可能因为网络问题未能成功提供服务,但该提供者并没有挂(invokers仍包含stickyInvoker)
            if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                // availablecheck是否开启可用性检查,若开启,检测stickyInvoker是否可用
                if (availablecheck && stickyInvoker.isAvailable()) {
                    return stickyInvoker;
                }
            }
        }
        // 若代码走到此处,说明stickyInvoker==null,或不可用
        Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
        if (sticky) {
            stickyInvoker = invoker;
        }
        return invoker;
    }
    

    doSelect方法用于真正去筛选Invoker:

    • 利用负载均衡算法,选出一个Invoker
    • 判断Invoker是否可用,若不可用则进行重选(只做一次)
    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.isEmpty())
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        if (loadbalance == null) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    
        //如果selected包含负载均衡选择出的Invoker,或者该Invoker无法经过可用性检查,此时进行重选
        if ((selected != null && selected.contains(invoker))
                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
            try {
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if (rinvoker != null) {
                    invoker = rinvoker;
                } else {
                    //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                    int index = invokers.indexOf(invoker);
                    try {
                        //Avoid collision
                        invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                    } catch (Exception e) { /* 日志 */ }
                }
            } catch (Throwable t) { /* 日志 */ }
        }
        return invoker;
    }
    

    2.2 FailoverClusterInvoker

    十分简单:失败则循环调用。循环次数可配置,默认3次。

    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
        private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
        public FailoverClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyinvokers = list(invocation);
                    // check again
                    checkInvokers(copyinvokers, invocation);
                }
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        // 
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(...);
        }
    
    }
    

    2.3 FailbackClusterInvoker

    调用失败,返回空结果给服务提供者,定时任务对失败的调用进行补偿,适合执行消息通知等操作,如下:

    public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
        private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
    
        private static final long RETRY_FAILED_PERIOD = 5 * 1000;
        private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
                new NamedInternalThreadFactory("failback-cluster-timer", true));
    
        private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
        private volatile ScheduledFuture<?> retryFuture;
    
        public FailbackClusterInvoker(Directory<T> directory) {
            super(directory);
        }
        // 放入失败队列Map结构中
        private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
            if (retryFuture == null) {
                synchronized (this) {
                    if (retryFuture == null) {
                        retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                            @Override
                            public void run() {
                                // collect retry statistics
                                try {
                                    retryFailed();
                                } catch (Throwable t) { // Defensive fault tolerance
                                    logger.error("Unexpected error occur at collect statistic", t);
                                }
                            }
                        }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                    }
                }
            }
            failed.put(invocation, router);
        }
        // 失败重试
        void retryFailed() {
            if (failed.size() == 0) {
                return;
            }
            for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
                    failed).entrySet()) {
                Invocation invocation = entry.getKey();
                Invoker<?> invoker = entry.getValue();
                try {
                    invoker.invoke(invocation);
                    failed.remove(invocation);
                } catch (Throwable e) {
                    logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                }
            }
        }
    
        @Override
        protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                /* 日志 */
                addFailed(invocation, this);
                return new RpcResult(); // ignore
            }
        }
    
    }
    

    2.4 FailfastClusterInvoker

    调用一次,快速失败,失败后立即抛出异常:

    public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
        public FailfastClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            checkInvokers(invokers, invocation);
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            try {
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                    throw (RpcException) e;
                }
                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
            }
        }
    }
    
    

    2.5 FailsafeClusterInvoker

    调用一次,快速失败,失败仅打印日志,不抛出异常

    public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
        private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
        public FailsafeClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failsafe ignore exception: " + e.getMessage(), e);
                return new RpcResult(); // ignore
            }
        }
    }
    
  • 相关阅读:
    java文件配置MySQL
    logback.xml
    Apache Commons之commons-lang
    Codec入门
    Mysql命令增加、修改、删除表字段
    markDown的简单使用
    Vert.x核心包各功能模块详解
    StringJoiner使用详解
    Vert.x初体验
    Docker镜像备份与迁移
  • 原文地址:https://www.cnblogs.com/wolfdriver/p/10663493.html
Copyright © 2011-2022 走看看