zoukankan      html  css  js  c++  java
  • Dubbo学习源码总结系列五--集群负载均衡

            Dubbo提供了哪些负载均衡机制?如何实现的?

            

            LoadBalance接口:可以看出,通过SPI机制默认为RandomLoadBalance,生成的适配器类执行select()方法。

     1 /**
     2  * LoadBalance. (SPI, Singleton, ThreadSafe)
     3  * <p>
     4  * <a href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load-Balancing</a>
     5  *
     6  * @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
     7  */
     8 @SPI(RandomLoadBalance.NAME)
     9 public interface LoadBalance {
    10 
    11     /**
    12      * select one invoker in list.
    13      *
    14      * @param invokers   invokers.
    15      * @param url        refer url
    16      * @param invocation invocation.
    17      * @return selected invoker.
    18      */
    19     @Adaptive("loadbalance")
    20     <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
    21 
    22 }

            实现类的基本类图如下所示:

     

            0、AbstractLoadBalance是LoadBalance接口的默认实现抽象类,为子类提供了实现框架。我们来看看此类具体的实现:

            (1)主要方法当然是select(),通过选择实现了负载均衡策略。实现主要是调用doSelect()方法,它是个抽象方法,留给具体子类实现不同的负载均衡策略;

            (2)getWeight()方法计算出invoker权重,计算公式为:weight = (int) (uptime(提供者正常运行时间) / warmup(升温时间) /weight(设定权重)))

     1     protected int getWeight(Invoker<?> invoker, Invocation invocation) {
     2         int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
     3         if (weight > 0) {
     4             long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
     5             if (timestamp > 0L) {
     6                 int uptime = (int) (System.currentTimeMillis() - timestamp);
     7                 int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
     8                 if (uptime > 0 && uptime < warmup) {
     9                     weight = calculateWarmupWeight(uptime, warmup, weight);
    10                 }
    11             }
    12         }
    13         return weight;
    14     }

      

            1、Random LoadBalance:随机,按权重设置随机概率。

            在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。   

            RandomLoadBalance子类,主要通过doSelect()实现按权重的随机算法,实现逻辑为:

            (1)计算总权重;

            (2)如果没有设置权重或者所有权重都一样,直接从invokers列表随机返回一个;

            (3)否则:使用总权重随机计算一个offset(偏移量),循环invokers列表,offset=offset -(当前invoker权重),即剩余权重,然后返回第一个大于offset权重的invoker;此算法兼顾了权重和轮询(千重相同则轮询,权重不同则从大到小的节点顺序轮询选中)两个因素。

            具体实现如下: 

     1     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
     2         int length = invokers.size(); // Number of invokers
     3         int totalWeight = 0; // The sum of weights
     4         boolean sameWeight = true; // Every invoker has the same weight?
     5         for (int i = 0; i < length; i++) {
     6             int weight = getWeight(invokers.get(i), invocation);
     7             totalWeight += weight; // Sum
     8             if (sameWeight && i > 0
     9                     && weight != getWeight(invokers.get(i - 1), invocation)) {
    10                 sameWeight = false;
    11             }
    12         }
    13         if (totalWeight > 0 && !sameWeight) {
    14             // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
    15             int offset = random.nextInt(totalWeight);
    16             // Return a invoker based on the random value.
    17             for (int i = 0; i < length; i++) {
    18                 offset -= getWeight(invokers.get(i), invocation);
    19                 if (offset < 0) {
    20                     return invokers.get(i);
    21                 }
    22             }
    23         }
    24         // If all invokers have the same weight value or totalWeight=0, return evenly.
    25         return invokers.get(random.nextInt(length));
    26     }

             2、RoundRobin LoadBalance:轮循,按公约后的权重设置轮循比率。

            存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

            RoundRobinLoadBalance子类,用doSelect()实现了按公约后的权重设置轮训比率, 

     1     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    //一个service接口的一个方法为一个key 2 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); 3 int length = invokers.size(); // Number of invokers 4 int maxWeight = 0; // The maximum weight 5 int minWeight = Integer.MAX_VALUE; // The minimum weight 6 final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); 7 int weightSum = 0;
    //轮训计算总权重值、最大权重值、最小权重值 8 for (int i = 0; i < length; i++) { 9 int weight = getWeight(invokers.get(i), invocation); 10 maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight 11 minWeight = Math.min(minWeight, weight); // Choose the minimum weight 12 if (weight > 0) { 13 invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); 14 weightSum += weight; 15 } 16 }
    //给每个请求方法设置一个原子Integer 17 AtomicPositiveInteger sequence = sequences.get(key); 18 if (sequence == null) { 19 sequences.putIfAbsent(key, new AtomicPositiveInteger()); 20 sequence = sequences.get(key); 21 } 22 int currentSequence = sequence.getAndIncrement(); 23 if (maxWeight > 0 && minWeight < maxWeight) { 24 int mod = currentSequence % weightSum; 25 for (int i = 0; i < maxWeight; i++) { 26 for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { 27 final Invoker<T> k = each.getKey(); 28 final IntegerWrapper v = each.getValue(); 29 if (mod == 0 && v.getValue() > 0) { 30 return k; 31 } 32 if (v.getValue() > 0) { 33 v.decrement(); 34 mod--; 35 } 36 } 37 } 38 } 39 // Round robin 40 return invokers.get(currentSequence % length); 41 }

             算法原理及实现讨论另外写了一篇博客,见:《负载均衡算法WeightedRoundRobin(加权轮询)简介及算法实现》

            3、LeastActive LoadBalance:最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差,即响应一次请求所花费的时长。

            使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

            算法实现逻辑为:

            假如节点活跃数依次为{Node0=3ms,Node1=6ms,Node2=2ms,Node3=2ms,Node4=4ms},

            (1)没有设置权重,或者权重都一样的情况下,遍历所有节点,找出节点中最小活跃数的节点,结果为{Node2=2ms,Node3=2ms};

            (2)按照算法约束:相同活跃数的随机取,则从{Node2,Node3}中随机取出一个节点返回;

            (3)设置了权重,且权重不一样的情况下,从最小活跃数子集{Node2,Node3}中取出权重大的一个节点返回。具体实现与随机访问算法Random LoadBalance类似,构造一个考虑了权重和轮询(多个相同权重的节点轮询选择)两个因素的算法,使用总权重随机计算一个offset(偏移量),循环invokers列表,offset = offset -(当前invoker权重),即剩余权重,然后返回第一个大于offset权重的invoker。

            具体算法实现如下:        

     1     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
     2         int length = invokers.size(); // Number of invokers
     3         int leastActive = -1; // 记录最少的活跃数
     4         int leastCount = 0; // 拥有最少活跃数,且活跃数相同的节点个数
     5         int[] leastIndexs = new int[length]; // 最少活跃数节点索引数组(数组内节点的活跃数相同)
     6         int totalWeight = 0; // The sum of weights
     7         int firstWeight = 0; // Initial value, used for comparision
     8         boolean sameWeight = true; // Every invoker has the same weight value?
     9         for (int i = 0; i < length; i++) {
    10             Invoker<T> invoker = invokers.get(i);
    11             int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); //从上下文记录中取得方法的活跃数 Active number
    12             int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
    13             if (leastActive == -1 || active < leastActive) { // 当找到一个更小的活跃数节点时,重置变量Restart, when find a invoker having smaller least active value.
    14                 leastActive = active; // Record the current least active value
    15                 leastCount = 1; // Reset leastCount, count again based on current leastCount
    16                 leastIndexs[0] = i; // Reset
    17                 totalWeight = weight; // Reset
    18                 firstWeight = weight; // Record the weight the first invoker
    19                 sameWeight = true; // Reset, every invoker has the same weight value?
    20             } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
    21                 leastIndexs[leastCount++] = i; // Record index number of this invoker
    22                 totalWeight += weight; // Add this invoker's weight to totalWeight.
    23                 // If every invoker has the same weight?
    24                 if (sameWeight && i > 0
    25                         && weight != firstWeight) {
    26                     sameWeight = false;
    27                 }
    28             }
    29         }
    30         // assert(leastCount > 0)
    31         if (leastCount == 1) {
    32             // If we got exactly one invoker having the least active value, return this invoker directly.
    33             return invokers.get(leastIndexs[0]);
    34         }
    35         if (!sameWeight && totalWeight > 0) {
    36             // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
    //考虑了权重和轮询(多个相同权重的节点轮询选择)两个因素的算法,使用总权重随机计算一个offset(偏移量),循环invokers列表,offset = offset -(当前invoker权重),即剩余权重,然后返回第一个大于offset权重的invoker 37 int offsetWeight = random.nextInt(totalWeight); 38 // Return a invoker based on the random value. 39 for (int i = 0; i < leastCount; i++) { 40 int leastIndex = leastIndexs[i]; 41 offsetWeight -= getWeight(invokers.get(leastIndex), invocation); 42 if (offsetWeight <= 0) 43 return invokers.get(leastIndex); 44 } 45 } 46 // If all invokers have the same weight value or totalWeight=0, return evenly. 47 return invokers.get(leastIndexs[random.nextInt(leastCount)]); 48 }

      

            4、ConsistentHash LoadBalance:一致性哈希。适用场景为:相同参数的请求始终发送到同一个提供者。

    • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
    • 具体算法原理和实现讨论另外写了一篇博客,见:《一致性哈希算法原理分析及实现》
    • 缺省只对第一个参数 Hash,如果要修改,请配置 <dubbo:parameter key="hash.arguments" value="0,1" />
    • 缺省用 160 份虚拟节点,如果要修改,请配置 <dubbo:parameter key="hash.nodes" value="320" />
      具体dubbo实现如下:
     1     private static final class ConsistentHashSelector<T> {
     2 
     3         private final TreeMap<Long, Invoker<T>> virtualInvokers;
     4 
     5         private final int replicaNumber;
     6 
     7         private final int identityHashCode;
     8 
     9         private final int[] argumentIndex;
    10 
    11         ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
    12             this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
    13             this.identityHashCode = identityHashCode;
    14             URL url = invokers.get(0).getUrl();
    //没有设置,默认虚拟节点(分片)数160个 15 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); 16 String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); 17 argumentIndex = new int[index.length]; 18 for (int i = 0; i < index.length; i++) { 19 argumentIndex[i] = Integer.parseInt(index[i]); 20 } 21 for (Invoker<T> invoker : invokers) { 22 String address = invoker.getUrl().getAddress(); 23 for (int i = 0; i < replicaNumber / 4; i++) { 24 byte[] digest = md5(address + i); 25 for (int h = 0; h < 4; h++) { 26 long m = hash(digest, h); 27 virtualInvokers.put(m, invoker); 28 } 29 } 30 } 31 } 32 33 public Invoker<T> select(Invocation invocation) { 34 String key = toKey(invocation.getArguments()); 35 byte[] digest = md5(key); 36 return selectForKey(hash(digest, 0)); 37 } 38 39 private String toKey(Object[] args) { 40 StringBuilder buf = new StringBuilder(); 41 for (int i : argumentIndex) { 42 if (i >= 0 && i < args.length) { 43 buf.append(args[i]); 44 } 45 } 46 return buf.toString(); 47 } 48 49 private Invoker<T> selectForKey(long hash) { 50 Invoker<T> invoker; 51 Long key = hash; 52 if (!virtualInvokers.containsKey(key)) { 53 SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key); 54 if (tailMap.isEmpty()) { 55 key = virtualInvokers.firstKey(); 56 } else { 57 key = tailMap.firstKey(); 58 } 59 } 60 invoker = virtualInvokers.get(key); 61 return invoker; 62 } 63 64 private long hash(byte[] digest, int number) { 65 return (((long) (digest[3 + number * 4] & 0xFF) << 24) 66 | ((long) (digest[2 + number * 4] & 0xFF) << 16) 67 | ((long) (digest[1 + number * 4] & 0xFF) << 8) 68 | (digest[number * 4] & 0xFF)) 69 & 0xFFFFFFFFL; 70 } 71 72 private byte[] md5(String value) { 73 MessageDigest md5; 74 try { 75 md5 = MessageDigest.getInstance("MD5"); 76 } catch (NoSuchAlgorithmException e) { 77 throw new IllegalStateException(e.getMessage(), e); 78 } 79 md5.reset(); 80 byte[] bytes; 81 try { 82 bytes = value.getBytes("UTF-8"); 83 } catch (UnsupportedEncodingException e) { 84 throw new IllegalStateException(e.getMessage(), e); 85 } 86 md5.update(bytes); 87 return md5.digest(); 88 } 89 90 }

            配置:

            服务端服务级别:<dubbo:service interface="..." loadbalance="roundrobin" />

            服务端方法级别:<dubbo:service interface="..."><dubbo:method name="..." loadbalance="roundrobin"/></dubbo:service>

            客户端服务级别:<dubbo:reference interface="..." loadbalance="roundrobin" />

            客户端方法级别:<dubbo:reference interface="..."><dubbo:method name="..." loadbalance="roundrobin"/></dubbo:reference>

  • 相关阅读:
    PAT 甲级 1027 Colors in Mars
    PAT 甲级 1026 Table Tennis(模拟)
    PAT 甲级 1025 PAT Ranking
    PAT 甲级 1024 Palindromic Number
    PAT 甲级 1023 Have Fun with Numbers
    PAT 甲级 1021 Deepest Root (并查集,树的遍历)
    Java实现 蓝桥杯VIP 算法训练 无权最长链
    Java实现 蓝桥杯VIP 算法训练 无权最长链
    Java实现 蓝桥杯 算法提高 抽卡游戏
    Java实现 蓝桥杯 算法提高 抽卡游戏
  • 原文地址:https://www.cnblogs.com/markcd/p/8504693.html
Copyright © 2011-2022 走看看