zoukankan      html  css  js  c++  java
  • ribbon源码(4) 载均衡算法

      负载均衡算法模块主要的功能是从负载均衡器中获取服务器列表信息,根据算法选取出一个服务器。

    IRule

      负载均衡算法接口

    public interface IRule{
        public Server choose(Object key);//选择一个服务器
        public void setLoadBalancer(ILoadBalancer lb);//设置负载均衡器
        public ILoadBalancer getLoadBalancer();   //获取负载均衡器 
    }

      通过BaseLoadBalancer的setRule或构造函数来为BaseLoadBalancer添加IRule

        public void setRule(IRule rule) {
            if (rule != null) {
                this.rule = rule;
            } else {
                /* default rule */
                this.rule = new RoundRobinRule();
            }
            if (this.rule.getLoadBalancer() != this) {
                this.rule.setLoadBalancer(this);
            }
        }

    RandomRule

      生成一个随机数,从负载均衡器中选取一个服务器。

    public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                return null;
            }
            Server server = null;
            while (server == null) {
                if (Thread.interrupted()) {
                    return null;
                }
                List<Server> upList = lb.getReachableServers();
                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }
                int index = rand.nextInt(serverCount);
                server = upList.get(index);
                if (server == null) {
                    Thread.yield();
                    continue;
                }
                if (server.isAlive()) {
                    return (server);
                }
                server = null;
                Thread.yield();
            }
            return server;
        }

    RoundRobinRule

      轮询从负载均衡器中选取一个服务器。

    public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                log.warn("no load balancer");
                return null;
            }
            Server server = null;
            int count = 0;
            while (server == null && count++ < 10) {
                List<Server> reachableServers = lb.getReachableServers();
                List<Server> allServers = lb.getAllServers();
                int upCount = reachableServers.size();
                int serverCount = allServers.size();
    
                if ((upCount == 0) || (serverCount == 0)) {
                    log.warn("No up servers available from load balancer: " + lb);
                    return null;
                }
                int nextServerIndex = incrementAndGetModulo(serverCount);
                server = allServers.get(nextServerIndex);
                if (server == null) {
                    /* Transient. */
                    Thread.yield();
                    continue;
                }
                if (server.isAlive() && (server.isReadyToServe())) {
                    return (server);
                }
                server = null;
            }
            if (count >= 10) {
                log.warn("No available alive servers after 10 tries from load balancer: "
                        + lb);
            }
            return server;
        }

    BestAvailableRule

      选择并发量最小且没有被熔断的服务器,需要使用到LoadBalancerStats来获取服务器的状态。

    public Server choose(Object key) {
            if (loadBalancerStats == null) {
                return super.choose(key);
            }
            List<Server> serverList = getLoadBalancer().getAllServers();
            int minimalConcurrentConnections = Integer.MAX_VALUE;
            long currentTime = System.currentTimeMillis();
            Server chosen = null;
            for (Server server: serverList) {
                ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
                if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                    int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                    if (concurrentConnections < minimalConcurrentConnections) {
                        minimalConcurrentConnections = concurrentConnections;
                        chosen = server;
                    }
                }
            }
            if (chosen == null) {
                return super.choose(key);
            } else {
                return chosen;
            }
        }

     WeightedResponseTimeRule

      按照响应时间的比例来选择服务器。首先内部会有一个定时器,定时从负载均衡器里面读取服务器的平均响应时间,然后根据平均响应时间转换成权重。

    class DynamicServerWeightTask extends TimerTask {
            public void run() {
                ServerWeight serverWeight = new ServerWeight();
                try {
                    serverWeight.maintainWeights();
                } catch (Exception e) {
                    logger.error("Error running DynamicServerWeightTask for {}", name, e);
                }
            }
        }
    class ServerWeight {
            public void maintainWeights() {
                ILoadBalancer lb = getLoadBalancer();
                if (lb == null) {
                    return;
                }
                if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {
                    return; 
                }
                try {
                    logger.info("Weight adjusting job started");
                    AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
                    LoadBalancerStats stats = nlb.getLoadBalancerStats();
                    if (stats == null) {
                        return;
                    }
                    double totalResponseTime = 0;
                    // find maximal 95% response time
                    for (Server server : nlb.getAllServers()) {
                        // this will automatically load the stats if not in cache
                        ServerStats ss = stats.getSingleServerStat(server);
                        totalResponseTime += ss.getResponseTimeAvg();
                    }
                    // weight for each server is (sum of responseTime of all servers - responseTime)
                    // so that the longer the response time, the less the weight and the less likely to be chosen
                    Double weightSoFar = 0.0;
                    // create new list and hot swap the reference
                    List<Double> finalWeights = new ArrayList<Double>();
                    for (Server server : nlb.getAllServers()) {
                        ServerStats ss = stats.getSingleServerStat(server);
                        double weight = totalResponseTime - ss.getResponseTimeAvg();
                        weightSoFar += weight;
                        finalWeights.add(weightSoFar);   
                    }
                    setWeights(finalWeights);
                } catch (Exception e) {
                    logger.error("Error calculating server weights", e);
                } finally {
                    serverWeightAssignmentInProgress.set(false);
                }
    
            }
        }

      然后根据权重来选择服务器

    public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                return null;
            }
            Server server = null;
            while (server == null) {
                // get hold of the current reference in case it is changed from the other thread
                List<Double> currentWeights = accumulatedWeights;
                if (Thread.interrupted()) {
                    return null;
                }
                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }
                int serverIndex = 0;
                // last one in the list is the sum of all weights
                double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 
                // No server has been hit yet and total weight is not initialized
                // fallback to use round robin
                if (maxTotalWeight < 0.001d) {
                    server =  super.choose(getLoadBalancer(), key);
                    if(server == null) {
                        return server;
                    }
                } else {
                    // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
                    double randomWeight = random.nextDouble() * maxTotalWeight;
                    // pick the server index based on the randomIndex
                    int n = 0;
                    for (Double d : currentWeights) {
                        if (d >= randomWeight) {
                            serverIndex = n;
                            break;
                        } else {
                            n++;
                        }
                    }
                    server = allList.get(serverIndex);
                }
    
                if (server == null) {
                    /* Transient. */
                    Thread.yield();
                    continue;
                }
    
                if (server.isAlive()) {
                    return (server);
                }
    
                // Next.
                server = null;
            }
            return server;
        }

    AvailabilityFilteringRule

      使用RoundRobinRule来选择服务器,并且通过AvailabilityPredicate进行筛选。AvailabilityPredicate会剔除熔断的和超过指定并发量的server。

    public Server choose(Object key) {
            int count = 0;
            Server server = roundRobinRule.choose(key);
            while (count++ <= 10) {
                if (predicate.apply(new PredicateKey(server))) {
                    return server;
                }
                server = roundRobinRule.choose(key);
            }
            return super.choose(key);
        }

      AvailabilityPredicate:

     public boolean apply(@Nullable PredicateKey input) {
            LoadBalancerStats stats = getLBStats();
            if (stats == null) {
                return true;
            }
            return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
        }
        private boolean shouldSkipServer(ServerStats stats) {        
            if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                    || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
                return true;
            }
            return false;
        }

      使用AvailabilityFilteringRule涉及配置:

    属性 实现 默认值
    niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped  是否剔除熔断server true

    niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit

    最大连接数 Integer.MAX_VALUE

     ZoneAvoidanceRule

      这个rule虽然继承了PredicateBasedRule但是在使用上都与上面的rule不一样,其实他的核心主要是为ZoneAwareLoadBalancer提供了筛选zone的静态方法,他并不通用。

      静态方法getAvailableZones,会遍历所有的zone,以zone为单位,检查各个zone的实例个数,熔断比率,来决定是否包含改zone。

      静态方法createSnapshot,将LoadBalancerStats按zone返回map结构

    类图

     Predicate

      用于过滤服务器,ribbon提供了三个过滤条件,AvailabilityPredicate、ZoneAvoidancePredicate、ZoneAffinityPredicate。PredicateKey为过滤的参数。

  • 相关阅读:
    Django之cookie与session
    Django之在Python中调用Django环境
    Django之Django终端打印SQL语句
    Django之事务
    Django之ORM操作(聚合 分组、F Q)
    Linux常用服务安装部署
    Linux服务基础命令
    程序员的vim
    Linux的xshell命令
    Linux操作服务器的初识
  • 原文地址:https://www.cnblogs.com/zhangwanhua/p/7994758.html
Copyright © 2011-2022 走看看