zoukankan      html  css  js  c++  java
  • Spring Cloud-Ribbon负载均衡策略类IRule(五)

    IRule

    IRule

    AbstractloadBalancerRule

    负载均衡策略抽象类 负责获得负载均衡器 保存在内部 通过负载均衡器维护的信息 作为分配的依据

    public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
    
        private ILoadBalancer lb;
            
        @Override
        public void setLoadBalancer(ILoadBalancer lb){
            this.lb = lb;
        }
        
        @Override
        public ILoadBalancer getLoadBalancer(){
            return lb;
        }      
    }

    RandomRule

    随机选择一个服务的策略

    public class RandomRule extends AbstractLoadBalancerRule {
    
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
        public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                return null;
            }
            Server server = null;
    
            while (server == null) {
                if (Thread.interrupted()) {
                    return null;
                }
                //通过负载均衡器获得可用服务
                List<Server> upList = lb.getReachableServers();
                //通过负载均衡器获得所有服务
                List<Server> allList = lb.getAllServers();
    
                int serverCount = allList.size();
                //没有服务返回空
                if (serverCount == 0) {
                    /*
                     * No servers. End regardless of pass, because subsequent passes
                     * only get more restrictive.
                     */
                    return null;
                }
                 //通过ThreadLocalRandom.current().nextInt(serverCount); 获得一个随机数
                int index = chooseRandomInt(serverCount);
                //获得一个随机的服务
                server = upList.get(index);
    
                if (server == null) {
                    /**
                     * 线程让步  将线程的cpu执行时间让步出来  可以理解为本来是排队有序的做一件事情
                     * 然后轮到那个人的时候他突然说 大家一起竞赛吧 谁先抢到就是谁的  也包括自己 线程优先级越高 获得的机率越大
                     */
                    Thread.yield();
                    continue;
                }
                //判断服务是否有效
                if (server.isAlive()) {
                    return (server);
                }
    
                // Shouldn't actually happen.. but must be transient or a bug.
                server = null;
                Thread.yield();
            }
    
            return server;
    
        }

    RoundRobinRule

    public class RoundRobinRule extends AbstractLoadBalancerRule {
    
        private AtomicInteger nextServerCyclicCounter;
        public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                log.warn("no load balancer");
                return null;
            }
    
            Server server = null;
            int count = 0;
            while (server == null && count++ < 10) {
                //获得所有有效服务
                List<Server> reachableServers = lb.getReachableServers();
                //获得所有服务
                List<Server> allServers = lb.getAllServers();
                int upCount = reachableServers.size();
                int serverCount = allServers.size();
    
                if ((upCount == 0) || (serverCount == 0)) {
                    log.warn("No up servers available from load balancer: " + lb);
                    return null;
                }
    
                //获得线性轮训 当前轮到的服务下标
                int nextServerIndex = incrementAndGetModulo(serverCount);
                //去除服务
                server = allServers.get(nextServerIndex);
    
                if (server == null) {
                   //让出cpu执行时间
                    Thread.yield();
                    continue;
                }
    
                if (server.isAlive() && (server.isReadyToServe())) {
                    return (server);
                }
    
                // Next.
                server = null;
            }
    
            if (count >= 10) {
                log.warn("No available alive servers after 10 tries from load balancer: "
                        + lb);
            }
            return server;
        }
        private int incrementAndGetModulo(int modulo) {
            //死循环
            for (;;) {
                //cas AtomicInteger 类 保证++的原子性
                int current = nextServerCyclicCounter.get();
                //线性轮训算法 
                int next = (current + 1) % modulo;
                //compareAndSet的作用是防止多线程下还没执行到这一句 current被修改   如果被修改返回false 重新开始
                if (nextServerCyclicCounter.compareAndSet(current, next))
                    return next;
            }
        }

    RetryRule

    /**
     * 具有重试机制的Rule
     */
    public class RetryRule extends AbstractLoadBalancerRule {
        //内部默认维护一个线性轮训的Rule
        IRule subRule = new RoundRobinRule();
        long maxRetryMillis = 500;
    
        public RetryRule(IRule subRule) {
            this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
        }
    
        public RetryRule(IRule subRule, long maxRetryMillis) {
            this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
            this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500;
        }
        public IRule getRule() {
            return subRule;
        }
    
        /**
         * 内部找到就返回 找不到就重试
         * @param lb
         * @param key
         * @return
         */
        public Server choose(ILoadBalancer lb, Object key) {
            long requestTime = System.currentTimeMillis();
            //尝试结束时间  maxRetryMillis阈值 可配置
            long deadline = requestTime + maxRetryMillis;
    
            Server answer = null;
    
            answer = subRule.choose(key);
    
            if (((answer == null) || (!answer.isAlive()))
                    && (System.currentTimeMillis() < deadline)) {
    
                InterruptTask task = new InterruptTask(deadline
                        - System.currentTimeMillis());
                /**
                 *   new Tread().interrupt()给线程增加一个中断标志 但是并不会影响线程执行  但是如果这个时候对线程执行sleep和 wait 底层会将中断状态重置为false并抛出异常InterruptedException 所以我们可以根据捕获这个异常判断线程是否中断
                 *   Thread.interrupted()判断线程的中断状态 并重置线程的中断状态为false
                 *   new Tread().isInterrupted();仅仅判断线程是否中断不会重置
                 */
    
                while (!Thread.interrupted()) {
                    answer = subRule.choose(key);
    
                    if (((answer == null) || (!answer.isAlive()))
                            && (System.currentTimeMillis() < deadline)) {
                        /* pause and retry hoping it's transient */
                        Thread.yield();
                    } else {
                        break;
                    }
                }
    
                task.cancel();
            }
    
            if ((answer == null) || (!answer.isAlive())) {
                return null;
            } else {
                return answer;
            }
        }
    /**
     * RoundRobinRule的扩展
     * 内部根据实例运行情况来进行权重 并根据权重挑选实例
     */
    public class WeightedResponseTimeRule extends RoundRobinRule {
        
    
        void initialize(ILoadBalancer lb) {
            if (this.serverWeightTimer != null) {
                this.serverWeightTimer.cancel();
            }
    
            this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true);
            //开启一个定时任务为实例进行统计 用于计算权重  默认30秒执行一次
            this.serverWeightTimer.schedule(new WeightedResponseTimeRule.DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval);
            WeightedResponseTimeRule.ServerWeight sw = new WeightedResponseTimeRule.ServerWeight();
            sw.maintainWeights();
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                public void run() {
                    WeightedResponseTimeRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + WeightedResponseTimeRule.this.name);
                    WeightedResponseTimeRule.this.serverWeightTimer.cancel();
                }
            }));
        }
    
        @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
        public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                return null;
            } else {
                Server server = null;
    
                while(server == null) {
                    //获得权重
                    List<Double> currentWeights = this.accumulatedWeights;
                    if (Thread.interrupted()) {
                        return null;
                    }
                    //获得所有服务
                    List<Server> allList = lb.getAllServers();
                    int serverCount = allList.size();
                    if (serverCount == 0) {
                        return null;
                    }
    
                    int serverIndex = 0;
                    //获得最后一个权重
                    double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1);
    
                    //如果权重大于0.01
                    if (maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) {
                        //通过随机数计算一个权重
                        double randomWeight = this.random.nextDouble() * maxTotalWeight;
                        int n = 0;
    
                        for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
                            Double d = (Double)var13.next();
                            //如果实例在那个权重区间 则定位此服务索引
                            if (d >= randomWeight) {
                                serverIndex = n;
                                break;
                            }
                        }
                        //返回对应实例
                        server = (Server)allList.get(serverIndex);
                    } else {
                        //如果实例的权重小于0.0.1 则采用父类的线性轮训算法
                        server = super.choose(this.getLoadBalancer(), key);
                        if (server == null) {
                            return server;
                        }
                    }
    
                    if (server == null) {
                        Thread.yield();
                    } else {
                        if (server.isAlive()) {
                            return server;
                        }
    
                        server = null;
                    }
                }
    
                return server;
            }
        }
    
        void setWeights(List<Double> weights) {
            this.accumulatedWeights = weights;
        }
    
        public void initWithNiwsConfig(IClientConfig clientConfig) {
            super.initWithNiwsConfig(clientConfig);
            this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
        }
    
        class ServerWeight {
            ServerWeight() {
            }
    
            public void maintainWeights() {
                ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer();
                if (lb != null) {
                    if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
                        try {
                            WeightedResponseTimeRule.logger.info("Weight adjusting job started");
                            AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
                            //获得统计信息
                            LoadBalancerStats stats = nlb.getLoadBalancerStats();
                            if (stats != null) {
                                //保存所有实例的的平均响应时间总和
                                double totalResponseTime = 0.0D;
    
                                ServerStats ss;
                                for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
                                    Server server = (Server)var6.next();
                                    ss = stats.getSingleServerStat(server);
                                }
    
                                Double weightSoFar = 0.0D;
                                //用于保存权重  下标对应实例在负载均衡器中的位置
                                List<Double> finalWeights = new ArrayList();
                                Iterator var20 = nlb.getAllServers().iterator();
    
                                while(var20.hasNext()) {
                                    Server serverx = (Server)var20.next();
                                    //如果服务的状态不再快照汇总 则这里加载
                                    ServerStats ssx = stats.getSingleServerStat(serverx);
                                    //计算权重 平均响应时间总和-实例的响应平均响应时间+weightSoFar
                                    double weight = totalResponseTime - ssx.getResponseTimeAvg();
                                    //每次都会累加
                                    weightSoFar = weightSoFar + weight;
                                    //保存权重
                                    finalWeights.add(weightSoFar);
                                }
    
                                WeightedResponseTimeRule.this.setWeights(finalWeights);
                                return;
                            }
                        } catch (Exception var16) {
                            WeightedResponseTimeRule.logger.error("Error calculating server weights", var16);
                            return;
                        } finally {
                            WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false);
                        }
    
                    }
                }
            }
        }
        //负责权重计算的定时任务
        class DynamicServerWeightTask extends TimerTask {
            DynamicServerWeightTask() {
            }
    
            public void run() {
                WeightedResponseTimeRule.ServerWeight serverWeight = WeightedResponseTimeRule.this.new ServerWeight();
    
                try {
                    //计算权重
                    serverWeight.maintainWeights();
                } catch (Exception var3) {
                    WeightedResponseTimeRule.logger.error("Error running DynamicServerWeightTask for {}", WeightedResponseTimeRule.this.name, var3);
                }
    
            }
        }
    }

    WeightedResponseTimeRule

    ClientConfigEnabledRoundRobinRule

    不怎么使用 也是线性轮训  用于继承扩展

    public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
    
        RoundRobinRule roundRobinRule = new RoundRobinRule();
    
        @Override
        public void initWithNiwsConfig(IClientConfig clientConfig) {
            roundRobinRule = new RoundRobinRule();
        }
    
        @Override
        public void setLoadBalancer(ILoadBalancer lb) {
            super.setLoadBalancer(lb);
            roundRobinRule.setLoadBalancer(lb);
        }
        
        @Override
        public Server choose(Object key) {
            if (roundRobinRule != null) {
                return roundRobinRule.choose(key);
            } else {
                throw new IllegalArgumentException(
                        "This class has not been initialized with the RoundRobinRule class");
            }
        }
    
    }

    BestAvailableRule

    选出最空闲的服务实例

    /**
     *该策略是选择 最空闲的那一个
     */
    public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
    
        private LoadBalancerStats loadBalancerStats;
    
        @Override
        public Server choose(Object key) {
            if (loadBalancerStats == null) {
                return super.choose(key);
            }
            //取得所有服务实例
            List<Server> serverList = getLoadBalancer().getAllServers();
            int minimalConcurrentConnections = Integer.MAX_VALUE;
            long currentTime = System.currentTimeMillis();
            Server chosen = null;
            //遍历所有服务实例
            for (Server server: serverList) {
                ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
                if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                    int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                    //取得最空闲的服务
                    if (concurrentConnections < minimalConcurrentConnections) {
                        minimalConcurrentConnections = concurrentConnections;
                        chosen = server;
                    }
                }
            }
            //如果没有找到 继续延用父类的线性轮训
            if (chosen == null) {
                return super.choose(key);
            } else {
                return chosen;
            }
        }
    
    
    }

    PredicateBasedRule

    先过滤清单再轮训

    public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
        //内部使用PredicateBasedRule 实现服务的过滤
        public abstract AbstractServerPredicate getPredicate();
        
        @Override
        public Server choose(Object key) {
            ILoadBalancer lb = getLoadBalancer();
            /**
             * 基于Predicate实现服务的过滤
             * Predicate是Google Guava Collection的集合工具
             * 可以帮助我们让集合操作代码更为简短精练并大大增强代码的可读 性
             */
            Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
            if (server.isPresent()) {
                return server.get();
            } else {
                return null;
            }
        }
    }
    public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
        public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
            if (loadBalancerKey == null) {
                return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
            } else {
                List<Server> results = Lists.newArrayList();
                for (Server server : servers) {
                    //过滤服务
                    if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                        results.add(server);
                    }
                }
                return results;
            }
        }
    
        public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers) {
            List<Server> eligible = getEligibleServers(servers);
            if (eligible.size() == 0) {
                return Optional.absent();
            }
            return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
        }
    
    
    }

    AvailabilityFilteringRule

    public class AvailabilityFilteringRule extends PredicateBasedRule {
        private AbstractServerPredicate predicate;
        public AvailabilityFilteringRule() {
            super();
            //初始化 下面predicate.apply的比较策略
            predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
                    .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                    .build();
        }
        @Override
        public void initWithNiwsConfig(IClientConfig clientConfig) {
            //初始化下面 predicate.apply的比较策略
            predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
                    .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                    .build();
        }
        @Override
        public Server choose(Object key) {
            int count = 0;
            Server server = roundRobinRule.choose(key);
            while (count++ <= 10) {
                /**
                 * 优化父类 先过滤再遍历的额外开销
                 * 一边遍历 判断是否故障或者超过最大并发阀值 是否故障, 即断路器是否生效已断开。
                 * 实例的并发请求数大于阙值,默认值为 232 -1, 该配置可通过参数<clientName>. <nameSpace>.ActiveConnectionsLimit 来修改。
                 */
                if (predicate.apply(new PredicateKey(server))) {
                    return server;
                }
                server = roundRobinRule.choose(key);
            }
            return super.choose(key);
        }
    
        @Override
        public AbstractServerPredicate getPredicate() {
            return predicate;
        }
    }
  • 相关阅读:
    【51nod 1331】狭窄的通道
    【noip 2016】提高组
    【noip 2016】普及组
    【bzoj 4764】弹飞大爷
    线筛
    高斯消元
    网络流
    平衡二叉树 treap

    双向广搜
  • 原文地址:https://www.cnblogs.com/LQBlog/p/10084581.html
Copyright © 2011-2022 走看看