zoukankan      html  css  js  c++  java
  • Dubbo学习源码总结系列五--集群负载均衡

            Dubbo提供了哪些负载均衡机制?如何实现的?

            

            LoadBalance接口:可以看出,通过SPI机制默认为RandomLoadBalance,生成的适配器类执行select()方法。

     1 /**
     2  * LoadBalance. (SPI, Singleton, ThreadSafe)
     3  * <p>
     4  * <a href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load-Balancing</a>
     5  *
     6  * @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
     7  */
     8 @SPI(RandomLoadBalance.NAME)
     9 public interface LoadBalance {
    10 
    11     /**
    12      * select one invoker in list.
    13      *
    14      * @param invokers   invokers.
    15      * @param url        refer url
    16      * @param invocation invocation.
    17      * @return selected invoker.
    18      */
    19     @Adaptive("loadbalance")
    20     <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
    21 
    22 }

            实现类的基本类图如下所示:

     

            0、AbstractLoadBalance是LoadBalance接口的默认实现抽象类,为子类提供了实现框架。我们来看看此类具体的实现:

            (1)主要方法当然是select(),通过选择实现了负载均衡策略。实现主要是调用doSelect()方法,它是个抽象方法,留给具体子类实现不同的负载均衡策略;

            (2)getWeight()方法计算出invoker权重,计算公式为:weight = (int) (uptime(提供者正常运行时间) / warmup(升温时间) /weight(设定权重)))

     1     protected int getWeight(Invoker<?> invoker, Invocation invocation) {
     2         int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
     3         if (weight > 0) {
     4             long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
     5             if (timestamp > 0L) {
     6                 int uptime = (int) (System.currentTimeMillis() - timestamp);
     7                 int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
     8                 if (uptime > 0 && uptime < warmup) {
     9                     weight = calculateWarmupWeight(uptime, warmup, weight);
    10                 }
    11             }
    12         }
    13         return weight;
    14     }

      

            1、Random LoadBalance:随机,按权重设置随机概率。

            在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。   

            RandomLoadBalance子类,主要通过doSelect()实现按权重的随机算法,实现逻辑为:

            (1)计算总权重;

            (2)如果没有设置权重或者所有权重都一样,直接从invokers列表随机返回一个;

            (3)否则:使用总权重随机计算一个offset(偏移量),循环invokers列表,offset=offset -(当前invoker权重),即剩余权重,然后返回第一个大于offset权重的invoker;此算法兼顾了权重和轮询(千重相同则轮询,权重不同则从大到小的节点顺序轮询选中)两个因素。

            具体实现如下: 

     1     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
     2         int length = invokers.size(); // Number of invokers
     3         int totalWeight = 0; // The sum of weights
     4         boolean sameWeight = true; // Every invoker has the same weight?
     5         for (int i = 0; i < length; i++) {
     6             int weight = getWeight(invokers.get(i), invocation);
     7             totalWeight += weight; // Sum
     8             if (sameWeight && i > 0
     9                     && weight != getWeight(invokers.get(i - 1), invocation)) {
    10                 sameWeight = false;
    11             }
    12         }
    13         if (totalWeight > 0 && !sameWeight) {
    14             // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
    15             int offset = random.nextInt(totalWeight);
    16             // Return a invoker based on the random value.
    17             for (int i = 0; i < length; i++) {
    18                 offset -= getWeight(invokers.get(i), invocation);
    19                 if (offset < 0) {
    20                     return invokers.get(i);
    21                 }
    22             }
    23         }
    24         // If all invokers have the same weight value or totalWeight=0, return evenly.
    25         return invokers.get(random.nextInt(length));
    26     }

             2、RoundRobin LoadBalance:轮循,按公约后的权重设置轮循比率。

            存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

            RoundRobinLoadBalance子类,用doSelect()实现了按公约后的权重设置轮训比率, 

     1     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    //一个service接口的一个方法为一个key 2 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); 3 int length = invokers.size(); // Number of invokers 4 int maxWeight = 0; // The maximum weight 5 int minWeight = Integer.MAX_VALUE; // The minimum weight 6 final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); 7 int weightSum = 0;
    //轮训计算总权重值、最大权重值、最小权重值 8 for (int i = 0; i < length; i++) { 9 int weight = getWeight(invokers.get(i), invocation); 10 maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight 11 minWeight = Math.min(minWeight, weight); // Choose the minimum weight 12 if (weight > 0) { 13 invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); 14 weightSum += weight; 15 } 16 }
    //给每个请求方法设置一个原子Integer 17 AtomicPositiveInteger sequence = sequences.get(key); 18 if (sequence == null) { 19 sequences.putIfAbsent(key, new AtomicPositiveInteger()); 20 sequence = sequences.get(key); 21 } 22 int currentSequence = sequence.getAndIncrement(); 23 if (maxWeight > 0 && minWeight < maxWeight) { 24 int mod = currentSequence % weightSum; 25 for (int i = 0; i < maxWeight; i++) { 26 for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { 27 final Invoker<T> k = each.getKey(); 28 final IntegerWrapper v = each.getValue(); 29 if (mod == 0 && v.getValue() > 0) { 30 return k; 31 } 32 if (v.getValue() > 0) { 33 v.decrement(); 34 mod--; 35 } 36 } 37 } 38 } 39 // Round robin 40 return invokers.get(currentSequence % length); 41 }

             算法原理及实现讨论另外写了一篇博客,见:《负载均衡算法WeightedRoundRobin(加权轮询)简介及算法实现》

            3、LeastActive LoadBalance:最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差,即响应一次请求所花费的时长。

            使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

            算法实现逻辑为:

            假如节点活跃数依次为{Node0=3ms,Node1=6ms,Node2=2ms,Node3=2ms,Node4=4ms},

            (1)没有设置权重,或者权重都一样的情况下,遍历所有节点,找出节点中最小活跃数的节点,结果为{Node2=2ms,Node3=2ms};

            (2)按照算法约束:相同活跃数的随机取,则从{Node2,Node3}中随机取出一个节点返回;

            (3)设置了权重,且权重不一样的情况下,从最小活跃数子集{Node2,Node3}中取出权重大的一个节点返回。具体实现与随机访问算法Random LoadBalance类似,构造一个考虑了权重和轮询(多个相同权重的节点轮询选择)两个因素的算法,使用总权重随机计算一个offset(偏移量),循环invokers列表,offset = offset -(当前invoker权重),即剩余权重,然后返回第一个大于offset权重的invoker。

            具体算法实现如下:        

     1     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
     2         int length = invokers.size(); // Number of invokers
     3         int leastActive = -1; // 记录最少的活跃数
     4         int leastCount = 0; // 拥有最少活跃数,且活跃数相同的节点个数
     5         int[] leastIndexs = new int[length]; // 最少活跃数节点索引数组(数组内节点的活跃数相同)
     6         int totalWeight = 0; // The sum of weights
     7         int firstWeight = 0; // Initial value, used for comparision
     8         boolean sameWeight = true; // Every invoker has the same weight value?
     9         for (int i = 0; i < length; i++) {
    10             Invoker<T> invoker = invokers.get(i);
    11             int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); //从上下文记录中取得方法的活跃数 Active number
    12             int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
    13             if (leastActive == -1 || active < leastActive) { // 当找到一个更小的活跃数节点时,重置变量Restart, when find a invoker having smaller least active value.
    14                 leastActive = active; // Record the current least active value
    15                 leastCount = 1; // Reset leastCount, count again based on current leastCount
    16                 leastIndexs[0] = i; // Reset
    17                 totalWeight = weight; // Reset
    18                 firstWeight = weight; // Record the weight the first invoker
    19                 sameWeight = true; // Reset, every invoker has the same weight value?
    20             } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
    21                 leastIndexs[leastCount++] = i; // Record index number of this invoker
    22                 totalWeight += weight; // Add this invoker's weight to totalWeight.
    23                 // If every invoker has the same weight?
    24                 if (sameWeight && i > 0
    25                         && weight != firstWeight) {
    26                     sameWeight = false;
    27                 }
    28             }
    29         }
    30         // assert(leastCount > 0)
    31         if (leastCount == 1) {
    32             // If we got exactly one invoker having the least active value, return this invoker directly.
    33             return invokers.get(leastIndexs[0]);
    34         }
    35         if (!sameWeight && totalWeight > 0) {
    36             // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
    //考虑了权重和轮询(多个相同权重的节点轮询选择)两个因素的算法,使用总权重随机计算一个offset(偏移量),循环invokers列表,offset = offset -(当前invoker权重),即剩余权重,然后返回第一个大于offset权重的invoker 37 int offsetWeight = random.nextInt(totalWeight); 38 // Return a invoker based on the random value. 39 for (int i = 0; i < leastCount; i++) { 40 int leastIndex = leastIndexs[i]; 41 offsetWeight -= getWeight(invokers.get(leastIndex), invocation); 42 if (offsetWeight <= 0) 43 return invokers.get(leastIndex); 44 } 45 } 46 // If all invokers have the same weight value or totalWeight=0, return evenly. 47 return invokers.get(leastIndexs[random.nextInt(leastCount)]); 48 }

      

            4、ConsistentHash LoadBalance:一致性哈希。适用场景为:相同参数的请求始终发送到同一个提供者。

    • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
    • 具体算法原理和实现讨论另外写了一篇博客,见:《一致性哈希算法原理分析及实现》
    • 缺省只对第一个参数 Hash,如果要修改,请配置 <dubbo:parameter key="hash.arguments" value="0,1" />
    • 缺省用 160 份虚拟节点,如果要修改,请配置 <dubbo:parameter key="hash.nodes" value="320" />
      具体dubbo实现如下:
     1     private static final class ConsistentHashSelector<T> {
     2 
     3         private final TreeMap<Long, Invoker<T>> virtualInvokers;
     4 
     5         private final int replicaNumber;
     6 
     7         private final int identityHashCode;
     8 
     9         private final int[] argumentIndex;
    10 
    11         ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
    12             this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
    13             this.identityHashCode = identityHashCode;
    14             URL url = invokers.get(0).getUrl();
    //没有设置,默认虚拟节点(分片)数160个 15 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); 16 String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); 17 argumentIndex = new int[index.length]; 18 for (int i = 0; i < index.length; i++) { 19 argumentIndex[i] = Integer.parseInt(index[i]); 20 } 21 for (Invoker<T> invoker : invokers) { 22 String address = invoker.getUrl().getAddress(); 23 for (int i = 0; i < replicaNumber / 4; i++) { 24 byte[] digest = md5(address + i); 25 for (int h = 0; h < 4; h++) { 26 long m = hash(digest, h); 27 virtualInvokers.put(m, invoker); 28 } 29 } 30 } 31 } 32 33 public Invoker<T> select(Invocation invocation) { 34 String key = toKey(invocation.getArguments()); 35 byte[] digest = md5(key); 36 return selectForKey(hash(digest, 0)); 37 } 38 39 private String toKey(Object[] args) { 40 StringBuilder buf = new StringBuilder(); 41 for (int i : argumentIndex) { 42 if (i >= 0 && i < args.length) { 43 buf.append(args[i]); 44 } 45 } 46 return buf.toString(); 47 } 48 49 private Invoker<T> selectForKey(long hash) { 50 Invoker<T> invoker; 51 Long key = hash; 52 if (!virtualInvokers.containsKey(key)) { 53 SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key); 54 if (tailMap.isEmpty()) { 55 key = virtualInvokers.firstKey(); 56 } else { 57 key = tailMap.firstKey(); 58 } 59 } 60 invoker = virtualInvokers.get(key); 61 return invoker; 62 } 63 64 private long hash(byte[] digest, int number) { 65 return (((long) (digest[3 + number * 4] & 0xFF) << 24) 66 | ((long) (digest[2 + number * 4] & 0xFF) << 16) 67 | ((long) (digest[1 + number * 4] & 0xFF) << 8) 68 | (digest[number * 4] & 0xFF)) 69 & 0xFFFFFFFFL; 70 } 71 72 private byte[] md5(String value) { 73 MessageDigest md5; 74 try { 75 md5 = MessageDigest.getInstance("MD5"); 76 } catch (NoSuchAlgorithmException e) { 77 throw new IllegalStateException(e.getMessage(), e); 78 } 79 md5.reset(); 80 byte[] bytes; 81 try { 82 bytes = value.getBytes("UTF-8"); 83 } catch (UnsupportedEncodingException e) { 84 throw new IllegalStateException(e.getMessage(), e); 85 } 86 md5.update(bytes); 87 return md5.digest(); 88 } 89 90 }

            配置:

            服务端服务级别:<dubbo:service interface="..." loadbalance="roundrobin" />

            服务端方法级别:<dubbo:service interface="..."><dubbo:method name="..." loadbalance="roundrobin"/></dubbo:service>

            客户端服务级别:<dubbo:reference interface="..." loadbalance="roundrobin" />

            客户端方法级别:<dubbo:reference interface="..."><dubbo:method name="..." loadbalance="roundrobin"/></dubbo:reference>

  • 相关阅读:
    Tensorflow中实现BN为什么需要加入这个额外依赖?见CS231N作业源码
    为何神经网络权重初始化要随机初始化,不能以0为初始化
    Batch Normalization&Dropout浅析
    Git版本回退和撤销修改的区别
    linux下安装git提示”无法打开锁文件 /var/lib/dpkg/lock
    数据特征选择法
    深度学习笔记整理
    全面掌握IO(输入/输出流)
    startActivity时报错Calling startActivity() from outside of an Activity context requires the FLAG_ACTIVI
    LitePal——Android数据库框架完整使用手册
  • 原文地址:https://www.cnblogs.com/markcd/p/8504693.html
Copyright © 2011-2022 走看看