zoukankan      html  css  js  c++  java
  • dubbo负载均衡是如何实现的?

    dubbo的负载均衡全部由AbstractLoadBalance的子类来实现

    RandomLoadBalance 随机

    在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

    1. 获取invoker的数量
    2. 获取第一个invoker的权重,并复制给firstWeight
    3. 循环invoker集合,把它们的权重全部相加,并复制给totalWeight,如果权重不相等,那么sameWeight为false
    4. 如果invoker集合的权重并不是全部相等的,那么获取一个随机数在1到totalWeight之间,赋值给offset属性
    5. 循环遍历invoker集合,获取权重并与offset相减,当offset减到小于零,那么就返回这个inovker
    6. 如果权重相等,那么直接在invoker集合里面取一个随机数返回
        @Override
        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            int length = invokers.size(); // Number of invokers
            boolean sameWeight = true; // Every invoker has the same weight?
            int firstWeight = getWeight(invokers.get(0), invocation);
            int totalWeight = firstWeight; // The sum of weights
            for (int i = 1; i < length; i++) {
                int weight = getWeight(invokers.get(i), invocation);
                totalWeight += weight; // Sum
                if (sameWeight && weight != firstWeight) {
                    sameWeight = false;
                }
            }
            if (totalWeight > 0 && !sameWeight) {
                // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
                int offset = ThreadLocalRandom.current().nextInt(totalWeight);
                // Return a invoker based on the random value.
                for (int i = 0; i < length; i++) {
                    offset -= getWeight(invokers.get(i), invocation);
                    if (offset < 0) {
                        return invokers.get(i);
                    }
                }
            }
            // If all invokers have the same weight value or totalWeight=0, return evenly.
            return invokers.get(ThreadLocalRandom.current().nextInt(length));
        }
    

    RoundRobinLoadBalance 轮询

    存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

    在老的版本上,dubbo会求出最大权重和最小权重,如果权重相等,那么就直接按取模的方式,每次取完后值加一;如果权重不相等,顺序根据权重分配。

    在新的版本上,对这个类进行了重构。

    1. 从methodWeightMap这个实例中根据ServiceKey+MethodName的方式获取里面的一个map实例,如果没有则说明第一次进到该方法,则实例化一个放入到methodWeightMap中,并把获取到的实例命名为map
    2. 遍历所有的invokers
    3. 拿到当前的invoker的identifyString作为key,去map里获取weightedRoundRobin实例,如果map里没有则添加一个
    4. 如果weightedRoundRobin的权重和当前invoker的权重不同,说明权重变了,需要重新设置
    5. 获取当前invoker所对应的weightedRoundRobin实例中的current,并加上当前invoker的权重
    6. 设置weightedRoundRobin最后的更新时间
    7. maxCurrent一开始是设置的0,如果当前的weightedRoundRobin的current值大于maxCurrent则进行赋值
    8. 遍历完后会得到最大的权重的invoker的selectedInvoker和这个invoker所对应的weightedRoundRobin赋值给了selectedWRR,还有权重之和totalWeight
    9. 然后把selectedWRR里的current属性减去totalWeight,并返回selectedInvoker

    这样看显然是不够清晰的,我们来举个例子:

    假定有3台dubbo provider:
    
    10.0.0.1:20884, weight=2
    10.0.0.1:20886, weight=3
    10.0.0.1:20888, weight=4
    
    totalWeight=9;
    
    那么第一次调用的时候:
    10.0.0.1:20884, weight=2    selectedWRR -> current = 2
    10.0.0.1:20886, weight=3    selectedWRR -> current = 3
    10.0.0.1:20888, weight=4    selectedWRR -> current = 4
     
    selectedInvoker-> 10.0.0.1:20888 
    调用 selectedWRR.sel(totalWeight); 
    10.0.0.1:20888, weight=4    selectedWRR -> current = -5
    返回10.0.0.1:20888这个实例
    
    那么第二次调用的时候:
    10.0.0.1:20884, weight=2    selectedWRR -> current = 4
    10.0.0.1:20886, weight=3    selectedWRR -> current = 6
    10.0.0.1:20888, weight=4    selectedWRR -> current = -1
    
    selectedInvoker-> 10.0.0.1:20886 
    调用 selectedWRR.sel(totalWeight); 
    10.0.0.1:20886 , weight=4   selectedWRR -> current = -3
    返回10.0.0.1:20886这个实例
    
    那么第三次调用的时候:
    10.0.0.1:20884, weight=2    selectedWRR -> current = 6
    10.0.0.1:20886, weight=3    selectedWRR -> current = 0
    10.0.0.1:20888, weight=4    selectedWRR -> current = 3
    
    selectedInvoker-> 10.0.0.1:20884
    调用 selectedWRR.sel(totalWeight); 
    10.0.0.1:20884, weight=2    selectedWRR -> current = -3
    返回10.0.0.1:20884这个实例
    
        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
            ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
            if (map == null) {
                methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
                map = methodWeightMap.get(key);
            }
            int totalWeight = 0;
            long maxCurrent = Long.MIN_VALUE;
            long now = System.currentTimeMillis();
            Invoker<T> selectedInvoker = null;
            WeightedRoundRobin selectedWRR = null;
            for (Invoker<T> invoker : invokers) {
                String identifyString = invoker.getUrl().toIdentityString();
                WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
                int weight = getWeight(invoker, invocation);
                if (weight < 0) {
                    weight = 0;
                }
                if (weightedRoundRobin == null) {
                    weightedRoundRobin = new WeightedRoundRobin();
                    weightedRoundRobin.setWeight(weight);
                    map.putIfAbsent(identifyString, weightedRoundRobin);
                    weightedRoundRobin = map.get(identifyString);
                }
                if (weight != weightedRoundRobin.getWeight()) {
                    //weight changed
                    weightedRoundRobin.setWeight(weight);
                }
                long cur = weightedRoundRobin.increaseCurrent();
                weightedRoundRobin.setLastUpdate(now);
                if (cur > maxCurrent) {
                    maxCurrent = cur;
                    selectedInvoker = invoker;
                    selectedWRR = weightedRoundRobin;
                }
                totalWeight += weight;
            }
            if (!updateLock.get() && invokers.size() != map.size()) {
                if (updateLock.compareAndSet(false, true)) {
                    try {
                        // copy -> modify -> update reference
                        ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                        newMap.putAll(map);
                        Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                        while (it.hasNext()) {
                            Entry<String, WeightedRoundRobin> item = it.next();
                            if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                                it.remove();
                            }
                        }
                        methodWeightMap.put(key, newMap);
                    } finally {
                        updateLock.set(false);
                    }
                }
            }
            if (selectedInvoker != null) {
                selectedWRR.sel(totalWeight);
                return selectedInvoker;
            }
            // should not happen here
            return invokers.get(0);
        }
    

    LeastActiveLoadBalance 最少活跃调用数

    使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

    1. 遍历所有的invoker
    2. 获取当前invoker的活跃数,调用的是RpcStatus的getStatus方法,过滤器里面会记录每个方法的活跃数
    3. 获取当前invoker的权重
    4. 如果是第一次进来或者是当前invoker的活跃数比最小的活跃数还小
    5. 那么把leastActive设置为当前invoker的活跃数,设置leastCount为1,leastIndexes数组的第一个位置设置为1,记录一下totalWeight和firstWeight
    6. 如果不满足第4点的条件,那么判断当前invoker的活跃数和最小的活跃数是否相等
    7. 如果满足第6点,那么把当前的权重加入到totalWeight中,并把leastIndexes数组中记录一下最小活跃数相同的下标;再看一下是否所有的权重相同
    8. 如果invoker集合中只有一个invoker活跃数是最小的,那么直接返回
    9. 如果权重不相等,随机权重后,判断在哪个 Invoker 的权重区间中
    10. 权重相等,直接随机选择 Invoker 即可
    
    最小活跃数算法实现:
    假定有3台dubbo provider:
    
    10.0.0.1:20884, weight=2,active=2
    10.0.0.1:20886, weight=3,active=4
    10.0.0.1:20888, weight=4,active=3
    active=2最小,且只有一个2,所以选择10.0.0.1:20884
    
    假定有3台dubbo provider:
    
    10.0.0.1:20884, weight=2,active=2
    10.0.0.1:20886, weight=3,active=2
    10.0.0.1:20888, weight=4,active=3
    active=2最小,且有2个,所以从[10.0.0.1:20884,10.0.0.1:20886 ]中选择;
    接下来的算法与随机算法类似:
    
    假设offset=1(即random.nextInt(5)=1)
    1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2
    假设offset=4(即random.nextInt(5)=4)
    4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3
    
     1: public class LeastActiveLoadBalance extends AbstractLoadBalance {
     2: 
     3:     public static final String NAME = "leastactive";
     4: 
     5:     private final Random random = new Random();
     6: 
     7:     @Override
     8:     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
     9:         int length = invokers.size(); // 总个数
    10:         int leastActive = -1; // 最小的活跃数
    11:         int leastCount = 0; // 相同最小活跃数的个数
    12:         int[] leastIndexes = new int[length]; // 相同最小活跃数的下标
    13:         int totalWeight = 0; // 总权重
    14:         int firstWeight = 0; // 第一个权重,用于于计算是否相同
    15:         boolean sameWeight = true; // 是否所有权重相同
    16:         // 计算获得相同最小活跃数的数组和个数
    17:         for (int i = 0; i < length; i++) {
    18:             Invoker<T> invoker = invokers.get(i);
    19:             int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数
    20:             int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
    21:             if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
    22:                 leastActive = active; // 记录最小活跃数
    23:                 leastCount = 1; // 重新统计相同最小活跃数的个数
    24:                 leastIndexes[0] = i; // 重新记录最小活跃数下标
    25:                 totalWeight = weight; // 重新累计总权重
    26:                 firstWeight = weight; // 记录第一个权重
    27:                 sameWeight = true; // 还原权重相同标识
    28:             } else if (active == leastActive) { // 累计相同最小的活跃数
    29:                 leastIndexes[leastCount++] = i; // 累计相同最小活跃数下标
    30:                 totalWeight += weight; // 累计总权重
    31:                 // 判断所有权重是否一样
    32:                 if (sameWeight && weight != firstWeight) {
    33:                     sameWeight = false;
    34:                 }
    35:             }
    36:         }
    37:         // assert(leastCount > 0)
    38:         if (leastCount == 1) {
    39:             // 如果只有一个最小则直接返回
    40:             return invokers.get(leastIndexes[0]);
    41:         }
    42:         if (!sameWeight && totalWeight > 0) {
    43:             // 如果权重不相同且权重大于0则按总权重数随机
    44:             int offsetWeight = random.nextInt(totalWeight);
    45:             // 并确定随机值落在哪个片断上
    46:             for (int i = 0; i < leastCount; i++) {
    47:                 int leastIndex = leastIndexes[i];
    48:                 offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
    49:                 if (offsetWeight <= 0) {
    50:                     return invokers.get(leastIndex);
    51:                 }
    52:             }
    53:         }
    54:         // 如果权重相同或权重为0则均等随机
    55:         return invokers.get(leastIndexes[random.nextInt(leastCount)]);
    56:     }
    57: 
    58: }
    

    ConsistentHashLoadBalance 一致性 Hash

    相同参数的请求总是发到同一提供者。当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

    1. 基于 invokers 集合,根据对象内存地址来计算定义哈希值
    2. 获得 ConsistentHashSelector 对象。若为空,或者定义哈希值变更(说明 invokers 集合发生变化),进行创建新的 ConsistentHashSelector 对象
    3. 调用ConsistentHashSelector对象的select方法
     1: public class ConsistentHashLoadBalance extends AbstractLoadBalance {
     2: 
     3:     /**
     4:      * 服务方法与一致性哈希选择器的映射
     5:      *
     6:      * KEY:serviceKey + "." + methodName
     7:      */
     8:     private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
     9: 
    10:     @SuppressWarnings("unchecked")
    11:     @Override
    12:     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    13:         String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    14:         // 基于 invokers 集合,根据对象内存地址来计算定义哈希值
    15:         int identityHashCode = System.identityHashCode(invokers);
    16:         // 获得 ConsistentHashSelector 对象。若为空,或者定义哈希值变更(说明 invokers 集合发生变化),进行创建新的 ConsistentHashSelector 对象
    17:         ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
    18:         if (selector == null || selector.identityHashCode != identityHashCode) {
    19:             selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
    20:             selector = (ConsistentHashSelector<T>) selectors.get(key);
    21:         }
    22:         return selector.select(invocation);
    23:     }
    24: }
    

    ConsistentHashSelector 一致性哈希选择器

    ConsistentHashSelector ,是 ConsistentHashLoadBalance 的内部类,一致性哈希选择器,基于 Ketama 算法。

    /**
     * 虚拟节点与 Invoker 的映射关系
     */
    private final TreeMap<Long, Invoker<T>> virtualInvokers;
    /**
     * 每个Invoker 对应的虚拟节点数
     */
    private final int replicaNumber;
    /**
     * 定义哈希值
     */
    private final int identityHashCode;
    /**
     * 取值参数位置数组
     */
    private final int[] argumentIndex;
    
      1: ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
      2:     this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
      3:     // 设置 identityHashCode
      4:     this.identityHashCode = identityHashCode;
      5:     URL url = invokers.get(0).getUrl();
      6:     // 初始化 replicaNumber
      7:     this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
      8:     // 初始化 argumentIndex
      9:     String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
     10:     argumentIndex = new int[index.length];
     11:     for (int i = 0; i < index.length; i++) {
     12:         argumentIndex[i] = Integer.parseInt(index[i]);
     13:     }
     14:     // 初始化 virtualInvokers
     15:     for (Invoker<T> invoker : invokers) {
     16:         String address = invoker.getUrl().getAddress();
     17:         // 每四个虚拟结点为一组,为什么这样?下面会说到
     18:         for (int i = 0; i < replicaNumber / 4; i++) {
     19:             // 这组虚拟结点得到惟一名称
     20:             byte[] digest = md5(address + i);
     21:             // Md5是一个16字节长度的数组,将16字节的数组每四个字节一组,分别对应一个虚拟结点,这就是为什么上面把虚拟结点四个划分一组的原因
     22:             for (int h = 0; h < 4; h++) {
     23:                 // 对于每四个字节,组成一个long值数值,做为这个虚拟节点的在环中的惟一key
     24:                 long m = hash(digest, h);
     25:                 virtualInvokers.put(m, invoker);
     26:             }
     27:         }
     28:     }
     29: }
    
    public Invoker<T> select(Invocation invocation) {
        // 基于方法参数,获得 KEY
        String key = toKey(invocation.getArguments());
        // 计算 MD5 值
        byte[] digest = md5(key);
        // 计算 KEY 值
        return selectForKey(hash(digest, 0));
    }
    
    private String toKey(Object[] args) {
        StringBuilder buf = new StringBuilder();
        for (int i : argumentIndex) {
            if (i >= 0 && i < args.length) {
                buf.append(args[i]);
            }
        }
        return buf.toString();
    }
    
    private Invoker<T> selectForKey(long hash) {
        // 得到大于当前 key 的那个子 Map ,然后从中取出第一个 key ,就是大于且离它最近的那个 key
        Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
        // 不存在,则取 virtualInvokers 第一个
    	if (entry == null) {
    		entry = virtualInvokers.firstEntry();
    	}
    	// 存在,则返回
    	return entry.getValue();
    }
    
  • 相关阅读:
    Validation failed for one or more entities. See 'EntityValidationErrors' property for more details
    Visual Studio断点调试, 无法监视变量, 提示无法计算表达式
    ASP.NET MVC中MaxLength特性设置无效
    项目从.NET 4.5迁移到.NET 4.0遇到的问题
    发布网站时应该把debug设置false
    什么时候用var关键字
    扩展方法略好于帮助方法
    在基类构造器中调用虚方法需谨慎
    ASP.NET MVC中商品模块小样
    ASP.NET MVC中实现属性和属性值的组合,即笛卡尔乘积02, 在界面实现
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/10963116.html
Copyright © 2011-2022 走看看