zoukankan      html  css  js  c++  java
  • Sentinel-FlowSlot

    FlowSlot流控规则

    这里面主要是根据qps或者线程数配置,来做限流。

        private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                              boolean prioritized) {
            // 根据配置获取关联的node
            Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
            if (selectedNode == null) {
                return true;
            }
    
            return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
        }

    限流规则的实现有这么几个

    都实现了TrafficShapingController接口

    DefaultController快速失败

    这个策略就是说当发现如果加上请求数使得当前的QPS超过阈值的时候,则快速失败。不过有个前提在node的优先级高且当前是检验QPS的时候,如果计算得到的等待时间不超过500ms,则当前的qps计算在未来的样本窗内,并sleep相应的等待时间,依然放行。

        public boolean canPass(Node node, int acquireCount, boolean prioritized) {
            int curCount = avgUsedTokens(node);
            if (curCount + acquireCount > count) { // 被流控
                // 如果流控策略是基于QPS 并且优先级较高
                if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                    long currentTime;
                    long waitInMs;
                    currentTime = TimeUtil.currentTimeMillis();
                    /**
                     * 计算要等待的时间
                     */
                    waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                    if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                        // 添加占用未来时间对应的样本窗的pass配额
                        node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                        // 统计 OccupiedPass
                        node.addOccupiedPass(acquireCount);
                        sleep(waitInMs);
    
                        // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                        throw new PriorityWaitException(waitInMs);
                    }
                }
                return false;
            }
            return true;
        }

    那么等待时间是如何计算的呢?

    @Override
        public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
            double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
            // 0
            long currentBorrow = rollingCounterInSecond.waiting();
            if (currentBorrow >= maxCount) {
                return OccupyTimeoutProperty.getOccupyTimeout();
            }
    
            // 500
            int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
            /**
             * 这个算法  是获取当前时间对应的样本窗的结束时间 往前推一个单位时间窗 来获取到一个能统计到当前时间对应的样本窗的最早时间
             */
            long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;
    
            int idx = 0;
            /*
             * Note: here {@code currentPass} may be less than it really is NOW, because time difference
             * since call rollingCounterInSecond.pass(). So in high concurrency, the following code may
             * lead more tokens be borrowed.
             */
            // 当前时间单位窗口的统计数
            long currentPass = rollingCounterInSecond.pass();
            /**
             * 这块逻辑很好理解:
             * 从最早时间 到 当前时间为止
             * 初始等待时间为  当前时间对应的样本窗,剩余的时间长度
             * 如果去掉最早样本窗的pass数之后,发现仍然大于阈值 ,则等待时间数 + 样本时间窗长度
             * 直到等待时间超过500ms   也就是说最大的等待时间不超过500ms
             */
            while (earliestTime < currentTime) {
                long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
                if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
                    break;
                }
                long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
                if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
                    return waitInMs;
                }
                earliestTime += windowLength;
                currentPass -= windowPass;
                idx++;
            }
    
            return OccupyTimeoutProperty.getOccupyTimeout();
        }

    RateLimiterController排队等待

    这里边利用的漏斗算法实现,假设阈值5qps,那么允许1秒内通过5个请求,这样如果需要处理2个请求,消耗的时间为  2/5*1000ms

    上次通过的时间+消耗的时间 = 如果pass,则需要当前时间达到的这个期望时间。

    接下来对期望时间和当前时间比较,如果当前时间大,说明早就可以通过了,并重置通过时间

    如果期望时间大,那么就算一下离期望时间还有多少时间,这个多少时间就是等待时间。

    如果这个等待时间超过了配置的500ms,则不通过,如果不超过500ms,则sleep等待时间,并最后pass

    @Override
        public boolean canPass(Node node, int acquireCount, boolean prioritized) {
            // Pass when acquire count is less or equal than 0.
            if (acquireCount <= 0) {
                return true;
            }
            // Reject when count is less or equal than 0.
            // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
            if (count <= 0) {
                return false;
            }
    
            long currentTime = TimeUtil.currentTimeMillis();
            // Calculate the interval between every two requests.
            // 计算acquireCount通过需要耗时
            long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
    
            // Expected pass time of this request.
            // 计算期望的pass时间点
            long expectedTime = costTime + latestPassedTime.get();
    
            if (expectedTime <= currentTime) { // 当前时间超过了通过点 pass
                // Contention may exist here, but it's okay.
                latestPassedTime.set(currentTime);
                return true;
            } else {  //  没有到达pass通过的时间点
                // Calculate the time to wait.
                long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
                if (waitTime > maxQueueingTimeMs) {  //等待时间超过了配置的maxQueueingTimeMs
                    return false;
                } else {
                    long oldTime = latestPassedTime.addAndGet(costTime);
                    try {
                        // 二次判断
                        waitTime = oldTime - TimeUtil.currentTimeMillis();
                        if (waitTime > maxQueueingTimeMs) {
                            latestPassedTime.addAndGet(-costTime);
                            return false;
                        }
                        // in race condition waitTime may <= 0
                        // 等待,后续在处理
                        if (waitTime > 0) {
                            Thread.sleep(waitTime);
                        }
                        return true;
                    } catch (InterruptedException e) {
                    }
                }
            }
            return false;
        }

     WarmUpController预热流控

    在初始化的时候,warmUp斜率模板已经订好了coldFactor     warningToken   maxToken  slope  这些属性也就定义好了这个warmup的整个过程。

    public class WarmUpController implements TrafficShapingController {
    
        // FlowRule中设定的阈值
        protected double count;
        /**
         * coldFactor:冷却因子,默认为3,表示倍数,即系统最"冷"时(令牌桶饱和时),令牌生成时间间隔是正常情况下的多少倍
         */
        private int coldFactor;
       // warningToken:预警值,表示进入预热或预热完毕
        protected int warningToken = 0;
        /**
         * maxToken:最大可用token值,计算公式:warningToken+(2*时间*阈值)/(1+因子),默认情况下为warningToken的2倍
         */
        private int maxToken;
        /**
         * slope:斜度,(coldFactor-1)/count/(maxToken-warningToken),用于计算token生成的时间间隔,
         * 进而计算当前token生成速度,最终比较token生成速度与消费速度,决定是否限流
         */
        protected double slope;
    
        /**
         * 令牌桶中令牌的数量
         */
        protected AtomicLong storedTokens = new AtomicLong(0);
    
        // 最后一次添加令牌的时间戳
        protected AtomicLong lastFilledTime = new AtomicLong(0);
    
        public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
            construct(count, warmUpPeriodInSec, coldFactor);
        }
    
        public WarmUpController(double count, int warmUpPeriodInSec) {
            construct(count, warmUpPeriodInSec, 3);
        }
    
        private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
    
            /**
             * 假设配置的阈值是QPS=5  一秒内的允许通过5个 count=5  预热时间为10s 即warmUpPeriodInSec =10
             * 则 coldInterval = coldFactor * stableInterval = 3 * 0.2s = 0.6s
             *  warmUpPeriodInSec = 梯形面积 = 0.5 * (coldInterval+stableInterval)*(maxToken - warningToken)  而 maxToken = 2 * warningToken
             * 计算得到   10 = 0.4 * 1 * warningToken   warningToken = 25    maxToken =50
             *
             */
            if (coldFactor <= 1) {
                throw new IllegalArgumentException("Cold factor should be larger than 1");
            }
    
            this.count = count;
    
            this.coldFactor = coldFactor;
    
            // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
            // warningToken = 100;
            warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
            // / maxPermits = thresholdPermits + 2 * warmupPeriod /
            // (stableInterval + coldInterval)
            // maxToken = 200
            maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
    
            // slope
            // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
            // - thresholdPermits);
            slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
    
        }

    这个参考guava的RateLimiter的实现,不过相对guava的预热机制比较简单。sentinel将前一个样本窗口的qps用来初始化定义的warmUp斜率模板,计算得出当前的token数,根据这个token数,再来计算得出一个生成token数速率,

    当前的qps为消耗速度,如果消耗比生成快,那么就流控,如果消耗速率慢,那么直接比较qps是否达到阈值。

    @Override
        public boolean canPass(Node node, int acquireCount, boolean prioritized) {
            // 获取当前1s的QPS
            long passQps = (long) node.passQps();
            // 获取上一窗口通过的qps
            long previousQps = (long) node.previousPassQps();
            // 生成和滑落token
            syncToken(previousQps);
    
            // 开始计算它的斜率
            // 如果进入了警戒线,开始调整他的qps
            long restToken = storedTokens.get();
            if (restToken >= warningToken) {
                // 如果令牌桶中的token数量大于警戒值,说明还未预热结束,需要判断token的生成速度和消费速度
                long aboveToken = restToken - warningToken;
                // 消耗的速度要比warning快,但是要比慢
                // current interval = restToken*slope+1/count
                double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
                if (passQps + acquireCount <= warningQps) {
                    return true;
                }
            } else {
                if (passQps + acquireCount <= count) {
                    return true;
                }
            }
    
            return false;
        }

    WarmUpRateLimiterController预热等待排队流控

    这个流控规则跟warmupController的区别在于,通过计算得到的waringQPS可以算到通过相应的请求数需要的消耗时间了。

    有了这个消耗时间也就计算得到了期望时间,参考RateLimiterController  就可以实现排队等待了。

    参考 :https://blog.csdn.net/qq_33811736/article/details/119453868 

  • 相关阅读:
    ROS与Arduino学习(六)Logging日志
    ROS与Arduino学习(五)客户端与服务器
    ROS与Arduino学习(四)用ROS中的Cmake编译程序
    ROS与Arduino学习(三)订阅与发布
    ROS与Arduino学习(二)HelloWorld
    /dev/ttyUSB0 permission denied 解决办法:永久有可操作权限
    ROS与Arduino学习(一)准备工作
    E: Sub-process /usr/bin/dpkg returned an error code (1)解决方案
    ROS学习之URDF
    PIC16F684汇编学习笔记
  • 原文地址:https://www.cnblogs.com/gaojy/p/15302859.html
Copyright © 2011-2022 走看看