zoukankan      html  css  js  c++  java
  • 【springcloud】2.eureka源码分析之令牌桶-限流算法

    国际惯例原理图

    代码实现

    package Thread;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * @ProjectName: cutter-point
     * @Package: Thread
     * @ClassName: RateLimiter
     * @Author: xiaof
     * @Description: 令牌桶,限流
     * @Date: 2019/6/21 11:41
     * @Version: 1.0
     */
    public class RateLimiter {
    
        //限流消费的令牌
        private final AtomicInteger consumedTokens = new AtomicInteger();
    
        private final AtomicLong lastRefrushTokenTime = new AtomicLong(0);
    
        //限流类型,是秒,还是分
        private final long rateType;
    
        public RateLimiter(TimeUnit averageRateUnit) {
            switch (averageRateUnit) {
                case SECONDS:
                    rateType = 1000;
                    break;
                case MINUTES:
                    rateType = 60 * 1000;
                    break;
                default:
                    throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
            }
        }
    
        //请求令牌,判断是否可以获取到新的令牌
        public boolean acquire(int bucketSize, long avgRate) {
            return acquire(bucketSize, avgRate, System.currentTimeMillis());
        }
    
        public boolean acquire(int bucketSize, long avgRate, long curentTimeMillis) {
    
            if(bucketSize <= 0 || avgRate <= 0) {
                return true;//如果这2个参数,任意一个为0 ,我们就认为没有上限
            }
    
            //刷新令牌桶
            refillToken(bucketSize, avgRate, curentTimeMillis);
            //开始消费令牌
            return consumToken(bucketSize);
        }
    
        private void refillToken(int bucketSize, long avgRate, long currentTimeMillis) {
            //获取上次最后以后更新令牌时间
            long freshTime = lastRefrushTokenTime.get();
            //获取当前间隔时间
            long timeDelta = currentTimeMillis - freshTime;
    
            //计算这次需要填充的token数
            long newToken = timeDelta * avgRate /rateType;
            if(newToken > 0) {
                //新的更新时间
                long newFillTime = freshTime == 0 ? currentTimeMillis : freshTime + timeDelta;
                //用cas操作,以保证只有一个线程注入新令牌
                if(lastRefrushTokenTime.compareAndSet(freshTime, newFillTime)) {
                    //死循环,直到设置成功新的令牌
                    while(true) {
                        //1.获取当前消费的令牌
                        int currentConsumToken = consumedTokens.get();
                        //2.获取消费的令牌容量,跟桶极限大小比较,取小的那个
                        int realConsumTokens = Math.min(currentConsumToken, bucketSize);
                        //3.计算填充之后剩余的被消费的容量,计算新增容量,用来填充被消费的令牌数
                        //剩余的消费容量,但是不能比0还小,这个要取值
                        int newConsumSize = (int) Math.max(0, realConsumTokens - newToken);
                        //然后设置进去
                        if(consumedTokens.compareAndSet(currentConsumToken, newConsumSize)) {
                            return;
                        }
                    }
                }
            }
        }
    
        //消费令牌
        private boolean consumToken(int bucketSize) {
            while (true) {
                int currentLevel = consumedTokens.get();
                //如果超出负载
                if (currentLevel >= bucketSize) {
                    return false;
                }
                //每次消费一个
                if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
                    return true;
                }
            }
        }
    
        public void reset() {
            consumedTokens.set(0);
            lastRefrushTokenTime.set(0);
        }
    
    }

    到这里可能有的人不清楚怎么用,来我们测试一波

    我们假设有100个用户同时请求,然后令牌恢复速率调成10,然后速率单位改为秒,也就是1秒恢复10个令牌

    这样同时100个请求过来,马上令牌就会被用完,那么就会被限流,比如我们拦截器这个时候可以返回404,或者503

    public static void main(String args[]) {
    
            RateLimiter rateLimiter = new RateLimiter(TimeUnit.SECONDS);
    
            final int bucketSize = 10;
            //回复令牌生产速率
            final long avgRate = 10;
    
            //判断是否流量达到上限
            ExecutorService pool = Executors.newCachedThreadPool();
            for(int i = 0; i < 100; ++i) {
                pool.submit(new Runnable() {
                    @Override
                    public void run() {
                        while(true) {
                            try {
                                Thread.sleep(((int) (Math.random() * 10)) * 1000) ;
    
                                if(!rateLimiter.acquire(bucketSize, avgRate)) {
                                    System.err.println(Thread.currentThread().getName() + "已经限流成功----response.setStatus(404)");
                                } else {
                                    System.out.println(Thread.currentThread().getName() + "正常执行");
                                }
    
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }
    
            while(true) {
    
            }
    
        }

    效果展示

  • 相关阅读:
    使用regsrv32.exe绕过应用程序白名单(多种方法)
    使用rundll32.exe绕过应用程序白名单(多种方法)
    Cobalt Strike 3.13的新功能
    kindeditor<=4.1.5上传漏洞复现
    Winrar目录穿越漏洞复现
    HTTP返回码总结
    如何将Debug文件夹下的资源打包成一个EXE文件直接执行
    我用VS2012在Nuget中安装Signalr之后报错
    System.Drawing.Color
    Asp.Net MVC及Web API框架配置会碰到的几个问题及解决方案
  • 原文地址:https://www.cnblogs.com/cutter-point/p/11064362.html
Copyright © 2011-2022 走看看