zoukankan      html  css  js  c++  java
  • dubbo(2.5.3)源码之Directory与LoadBalance

    Directory:

    集群目录服务Directory, 代表多个Invoker, 可以看成List<Invoker>,它的值可能是动态变化的比如注册中心推送变更。集群选择调用服务时通过目录服务找到所有服务

    StaticDirectory: 静态目录服务, 它的所有Invoker通过构造函数传入, 服务消费方引用服务的时候, 服务对多注册中心的引用,将Invokers集合直接传入 StaticDirectory构造器,再由Cluster伪装成一个InvokerStaticDirectory的list方法直接返回所有invoker集合;

    RegistryDirectory: 注册目录服务, 它的Invoker集合是从注册中心获取的, 它实现了NotifyListener接口实现了回调接口notify(List<Url>)

    通俗的来说,就是一个缓存和更新缓存的过程

    Directory目录服务的更新过程

    RegistryProtocol.doRefer方法,也就是消费端在初始化的时候,这里涉及到了RegistryDirectory这个类。然后执行cluster.join(directory)方法。这些代码在上篇博客有分析过。

    cluster.join其实就是将Directory中的多个Invoker伪装成一个Invoker, 对上层透明,包含集群的容错机制

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);//对多个invoker进行组装
        directory.setRegistry(registry); //ZookeeperRegistry
        directory.setProtocol(protocol); //protocol=Protocol$Adaptive
        //url=consumer://192.168.111....
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        //会把consumer://192...  注册到注册中心
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            //zkClient.create()
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        //Cluster$Adaptive
        return cluster.join(directory);
    }
    

      directory.subscribe:

    订阅节点的变化,

    1. zookeeper上指定节点发生变化以后,会通知到RegistryDirectory的notify方法

    2. url转化为invoker对象

    调用过程中invokers的使用

    再调用过程中,AbstractClusterInvoker.invoke方法中:其中list(invocation) 就是获取directory中所缓存的 invoker。调用AbstrctDirectory的list方法,再转由调用RegisteryDirectory的doList,拿到成员变量methodInvokerMap里的值。

    public Result invoke(final Invocation invocation) throws RpcException {
    
        checkWhetherDestroyed();
    
        LoadBalance loadbalance;
        
        List<Invoker<T>> invokers = list(invocation); 
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

    负载均衡LoadBalance: 

      LoadBalance负载均衡, 负责从多个 Invokers中选出具体的一个Invoker用于本次调用,调用过程中包含了负载均衡的算法

      在AbstractClusterInvoker.invoke中代码如下,通过名称获得指定的扩展点。RandomLoadBalance:

    public Result invoke(final Invocation invocation) throws RpcException {
    
        checkWhetherDestroyed();
    
        LoadBalance loadbalance;
        
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {//默认拓展点是随机算法@SPI(RandomLoadBalance.NAME)
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

    AbstractClusterInvoker.doselect

      调用LoadBalance.select方法,讲invokers按照指定算法进行负载

    private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.size() == 0)
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        // 如果只有两个invoker,退化成轮循
        if (invokers.size() == 2 && selected != null && selected.size() > 0) {
            return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
        }
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
        
        //如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试.
        if( (selected != null && selected.contains(invoker))
                ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
            try{
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if(rinvoker != null){
                    invoker =  rinvoker;
                }else{
                    //看下第一次选的位置,如果不是最后,选+1位置.
                    int index = invokers.indexOf(invoker);
                    try{
                        //最后在避免碰撞
                        invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
                    }catch (Exception e) {
                        logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);
                    }
                }
            }catch (Throwable t){
                logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
            }
        }
        return invoker;
    }
    

      通过调用 AbstrctLoadBalance 的loadbalance.select(invokers, getUrl(), invocation) 转向具体的实现类,这里就是随机算法负载的 doSelect:

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int totalWeight = 0; // 总权重
        boolean sameWeight = true; // 权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 计算所有权重是否一样
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offset = random.nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(random.nextInt(length));
    }
    

     配置权重可以在配置文件中再service中可以配置weight 来确定随机的倾向

    Random LoadBalance

    • 随机,按权重设置随机概率。
    • 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

    RoundRobin LoadBalance

    • 轮循,按公约后的权重设置轮循比率。
    • 存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

    LeastActive LoadBalance

    • 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
    • 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

    ConsistentHash LoadBalance

    • 一致性Hash,相同参数的请求总是发到同一提供者。
    • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
    • 缺省只对第一个参数Hash,如果要修改,请配置<dubbo:parameter key="hash.arguments" value="0,1" />
    • 缺省用160份虚拟节点,如果要修改,请配置<dubbo:parameter key="hash.nodes" value="320" />

    配置方法:

    服务端服务级别:
    <dubbo:service interface="..." loadbalance="roundrobin" />
    服务端方法级别:
    <dubbo:service interface="..."> 
        <dubbo:method name="..." loadbalance="roundrobin"/> 
    </dubbo:service>
    
    客户端服务级别:
    <dubbo:reference interface="..." loadbalance="roundrobin" /> 
    客户端方法级别:
    <dubbo:reference interface="..."> 
        <dubbo:method name="..." loadbalance="roundrobin"/> 
    </dubbo:reference>
  • 相关阅读:
    JAVA微信公众号网页开发——获取公众号关注的所有用户
    删除mysl
    sql语言(mysql)
    mycat读写分离
    mysql双主双从技术
    实用的10个日志处理案例
    ansible基本操作
    MySQL改密
    mysql源码包安装
    ftp搭建mysql服务器
  • 原文地址:https://www.cnblogs.com/wuzhenzhao/p/10076176.html
Copyright © 2011-2022 走看看