zoukankan      html  css  js  c++  java
  • motan源码分析五:cluster相关

    上一章我们分析了客户端调用服务端相关的源码,但是到了cluster里面的部分我们就没有分析了,本章将深入分析cluster和它的相关支持类。

    1.clustersupport的创建过程,上一章的ReferConfig的initRef()方法中调用了相关的创建代码:

            for(Iterator iterator = protocols.iterator(); iterator.hasNext();)
            {
                ProtocolConfig protocol = (ProtocolConfig)iterator.next();
                LoggerUtil.info((new StringBuilder("ProtocolConfig's")).append(protocol.getName()).toString());
                Map params = new HashMap();
                params.put(URLParamType.nodeType.getName(), "referer");
                params.put(URLParamType.version.getName(), URLParamType.version.getValue());
                params.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
                collectConfigParams(params, new AbstractConfig[] {
                    protocol, basicReferer, extConfig, this
                });
                collectMethodConfigParams(params, getMethods());
                URL refUrl = new URL(protocol.getName(), localIp, 0, interfaceClass.getName(), params);
                ClusterSupport clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);//创建clustersupport
                clusterSupports.add(clusterSupport);
                clusters.add(clusterSupport.getCluster());//获取对应的cluster
                proxy = proxy != null ? proxy : refUrl.getParameter(URLParamType.proxy.getName(), URLParamType.proxy.getValue());
            }
    
        private ClusterSupport createClusterSupport(URL refUrl, ConfigHandler configHandler, List registryUrls)
        {
            List regUrls = new ArrayList();
            if(StringUtils.isNotBlank(directUrl) || "injvm".equals(refUrl.getProtocol()))
            {
                URL regUrl = new URL("local", "127.0.0.1", 0, com/weibo/api/motan/registry/RegistryService.getName());
                if(StringUtils.isNotBlank(directUrl))
                {
                    StringBuilder duBuf = new StringBuilder(128);
                    String dus[] = MotanConstants.COMMA_SPLIT_PATTERN.split(directUrl);
                    String as[];
                    int j = (as = dus).length;
                    for(int i = 0; i < j; i++)
                    {
                        String du = as[i];
                        if(du.contains(":"))
                        {
                            String hostPort[] = du.split(":");
                            URL durl = refUrl.createCopy();
                            durl.setHost(hostPort[0].trim());
                            durl.setPort(Integer.parseInt(hostPort[1].trim()));
                            durl.addParameter(URLParamType.nodeType.getName(), "service");
                            duBuf.append(StringTools.urlEncode(durl.toFullStr())).append(",");
                        }
                    }
    
                    if(duBuf.length() > 0)
                    {
                        duBuf.deleteCharAt(duBuf.length() - 1);
                        regUrl.addParameter(URLParamType.directUrl.getName(), duBuf.toString());
                    }
                }
                regUrls.add(regUrl);
            } else//走注册中心的方式
            {
                if(registryUrls == null || registryUrls.isEmpty())
                    throw new IllegalStateException(String.format("No registry to reference %s on the consumer %s , please config <motan:registry address="..." /> in your spring config.", new Object[] {
                        interfaceClass, "127.0.0.1"
                    }));
                URL url;
                for(Iterator iterator = registryUrls.iterator(); iterator.hasNext(); regUrls.add(url.createCopy()))
                    url = (URL)iterator.next();
    
            }
            URL url;
            for(Iterator iterator1 = regUrls.iterator(); iterator1.hasNext(); url.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(refUrl.toFullStr())))
                url = (URL)iterator1.next();
    
            return configHandler.buildClusterSupport(interfaceClass, regUrls);//调用simpleconfighandler的创建clustersupport方法
        }
    
        public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
            ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls);//创建cluster支持类,将业务接口和注册中心信息传递进去
            clusterSupport.init();//初始化
    
            return clusterSupport;
        }

    2.clustersupport的init和prepare方法

        public void init() {
    
            prepareCluster();
    
            URL subUrl = toSubscribeUrl(url);
            for (URL ru : registryUrls) {//循环注册中心的url
    
                String directUrlStr = ru.getParameter(URLParamType.directUrl.getName());
                // 如果有directUrl,直接使用这些directUrls进行初始化,不用到注册中心discover
                if (StringUtils.isNotBlank(directUrlStr)) {
                    List<URL> directUrls = parseDirectUrls(directUrlStr);
                    if (!directUrls.isEmpty()) {
                        notify(ru, directUrls);
                        LoggerUtil.info("Use direct urls, refUrl={}, directUrls={}", url, directUrls);
                        continue;
                    }
                }
    
                // client 注册自己,同时订阅service列表
                Registry registry = getRegistry(ru);//获取zookeeper的注册中心
                registry.subscribe(subUrl, this);//注册自己并订阅服务
            }
    
            boolean check = Boolean.parseBoolean(url.getParameter(URLParamType.check.getName(), URLParamType.check.getValue()));
            if (!CollectionUtil.isEmpty(cluster.getReferers()) || !check) {
                cluster.init();//初始化集群
                if (CollectionUtil.isEmpty(cluster.getReferers()) && !check) {
                    LoggerUtil.warn(String.format("refer:%s", this.url.getPath() + "/" + this.url.getVersion()), "No services");
                }
                return;
            }
    
            throw new MotanFrameworkException(String.format("ClusterSupport No service urls for the refer:%s, registries:%s",
                    this.url.getIdentity(), registryUrls), MotanErrorMsgConstant.SERVICE_UNFOUND);
        }
    
        private void prepareCluster() {
            String clusterName = url.getParameter(URLParamType.cluster.getName(), URLParamType.cluster.getValue());//集群名称
            String loadbalanceName = url.getParameter(URLParamType.loadbalance.getName(), URLParamType.loadbalance.getValue());//负载均衡名称
            String haStrategyName = url.getParameter(URLParamType.haStrategy.getName(), URLParamType.haStrategy.getValue());//ha高可用名称
    
            cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(clusterName);//获取具体的集群对象
            LoadBalance<T> loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalanceName);//获取具体的负载均衡方式,目前motan支持6种负载方式
            HaStrategy<T> ha = ExtensionLoader.getExtensionLoader(HaStrategy.class).getExtension(haStrategyName);//获取高可用的方式,目前支持两种failfast和failover方式
            cluster.setLoadBalance(loadBalance);
            cluster.setHaStrategy(ha);
            cluster.setUrl(url);
        }
    

    3.负载均衡,motan支持6种方式,分别是:轮训、随机、hash、本地服务优先、权重可配置、低并发优先,具体代码可见com.weibo.api.motan.cluster.loadbalance目录,本文我们主要看一下轮训的方式:

    public class RoundRobinLoadBalance<T> extends AbstractLoadBalance<T> {
    
        private AtomicInteger idx = new AtomicInteger(0);
    
        @Override
        protected Referer<T> doSelect(Request request) {
            List<Referer<T>> referers = getReferers();//获取所有服务器的引用
    
            int index = idx.incrementAndGet();//自增
            for (int i = 0; i < referers.size(); i++) {
                Referer<T> ref = referers.get((i + index) % referers.size());//利用自增数去模,达到轮训的目的
                if (ref.isAvailable()) {
                    return ref;
                }
            }
            return null;
        }
    
        @Override
        protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
            List<Referer<T>> referers = getReferers();
    
            int index = idx.incrementAndGet();
            for (int i = 0; i < referers.size(); i++) {
                Referer<T> referer = referers.get((i + index) % referers.size());
                if (referer.isAvailable()) {
                    refersHolder.add(referer);
                }
            }
        }
    }
    

    4.motan支持failfast和failover两种方式,failfast只调用一次,如果失败则直接返回失败,failover循环调用若干次,直到成功或循环结束后

        public Response call(Request request, LoadBalance<T> loadBalance) {
    
            List<Referer<T>> referers = selectReferers(request, loadBalance);//获取所有的引用
            if (referers.isEmpty()) {
                throw new MotanServiceException(String.format("FailoverHaStrategy No referers for request:%s, loadbalance:%s", request,
                        loadBalance));
            }
            URL refUrl = referers.get(0).getUrl();
            // 先使用method的配置
            int tryCount =
                    refUrl.getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.retries.getName(),
                            URLParamType.retries.getIntValue());//获取重试次数
            // 如果有问题,则设置为不重试
            if (tryCount < 0) {
                tryCount = 0;
            }
    
            for (int i = 0; i <= tryCount; i++) {
                Referer<T> refer = referers.get(i % referers.size());//循环调用
                try {
                    request.setRetries(i);
                    return refer.call(request);
                } catch (RuntimeException e) {
                    // 对于业务异常,直接抛出
                    if (ExceptionUtil.isBizException(e)) {
                        throw e;//业务异常退出调用
                    } else if (i >= tryCount) {
                        throw e;
                    }
                    LoggerUtil.warn(String.format("FailoverHaStrategy Call false for request:%s error=%s", request, e.getMessage()));
                }
            }
    
            throw new MotanFrameworkException("FailoverHaStrategy.call should not come here!");
        }
    

    本章知识点总结:

    1.一个cluster有一个cluster的支持类,有一个ha,有一个loadbalance;

    2.motan支持6种负载均衡方式;

    3.motan支持failover的ha方式;

  • 相关阅读:
    sql
    Java后台通过传参得到对应的坐标值的方法
    运行第一个大型项目工程时出现的问题
    对sqlserver存储过程合游标循环中的一点心得
    (基于Java)算法之动态规划——矩阵连乘问题
    算法之线性时间选择(最坏情况下)
    算法之快速排序
    算法之合并排序
    算法之二分搜索法
    设计模式之装饰者模式
  • 原文地址:https://www.cnblogs.com/mantu/p/5885996.html
Copyright © 2011-2022 走看看