zoukankan      html  css  js  c++  java
  • 高并发之限流RateLimiter(二)

    Guava RateLimiter提供了令牌桶算法实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现。

    SmoothBursty:令牌生成速度恒定

     @Test
        public void testAcquire() {
            // acquire(i); 获取令牌,返回阻塞的时间,支持预消费.
            RateLimiter limiter = RateLimiter.create(1);
    
            for (int i = 1; i < 10; i++) {
                double waitTime = limiter.acquire();
                System.out.println("cutTime=" + longToDate(System.currentTimeMillis()) + " acq:" + i + " waitTime:" + waitTime);
            }
        }
    
        public static String longToDate(long lo){
            Date date = new Date(lo);
            SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            return sd.format(date);
        }

    输出结果:

    cutTime=2019-03-29 09:31:42 acq:1 waitTime:0.0
    cutTime=2019-03-29 09:31:43 acq:2 waitTime:0.989135
    cutTime=2019-03-29 09:31:44 acq:3 waitTime:0.998023
    cutTime=2019-03-29 09:31:45 acq:4 waitTime:0.999573
    cutTime=2019-03-29 09:31:46 acq:5 waitTime:0.999359
    cutTime=2019-03-29 09:31:47 acq:6 waitTime:0.999566
    cutTime=2019-03-29 09:31:48 acq:7 waitTime:0.998763
    cutTime=2019-03-29 09:31:49 acq:8 waitTime:0.999163
    cutTime=2019-03-29 09:31:50 acq:9 waitTime:1.000036

    说明:每秒1个令牌生成一个令牌,从输出可看出很平滑,这种实现将突发请求速率平均成固定请求速率。

    下面demo是突发请求:

    @Test
        public void testAcquire2() {
            // 请求突发
            RateLimiter limiter = RateLimiter.create(5);
    
            for (int i = 1; i < 5; i++) {
                double waitTime = 0;
                if(i == 2){
                    waitTime = limiter.acquire(10);
                }else{
                    waitTime = limiter.acquire(1);
                }
    
                System.out.println("cutTime=" + longToDate(System.currentTimeMillis()) + " acq:" + i + " waitTime:" + waitTime);
            }
        }

    输出:

    cutTime=2019-03-29 09:53:55 acq:1 waitTime:0.0
    cutTime=2019-03-29 09:53:56 acq:2 waitTime:0.188901
    cutTime=2019-03-29 09:53:58 acq:3 waitTime:1.99789
    cutTime=2019-03-29 09:53:58 acq:4 waitTime:0.198832

    说明:

    i=1,消费i个令牌,此时还剩4个令牌;

    i=2,突发10个请求,令牌桶算法也允许了这种突发(允许消费未来的令牌);

    i=3,上次请求消费了,所以需要等待2s;

    下面看源码:


    简单介绍下:Stopwatch

    public final class Stopwatch {
        private final Ticker ticker;//计时器,用于获取当前时间
        private boolean isRunning;//计时器是否运行中的状态标记
        private long elapsedNanos;//用于标记从计时器开启到调用统计的方法时过去的时间
        private long startTick;//计时器开启的时刻时间
    
        private long elapsedNanos() {
            return this.isRunning ? this.ticker.read() - this.startTick + this.elapsedNanos : this.elapsedNanos;
        }
        public long elapsed(TimeUnit desiredUnit) {
            return desiredUnit.convert(this.elapsedNanos(), TimeUnit.NANOSECONDS);
        }
     }

    TimeUnit:

    MILLISECONDS {
            public long toNanos(long d)   { return x(d, C2/C0, MAX/(C2/C0)); }
            public long toMicros(long d)  { return x(d, C2/C1, MAX/(C2/C1)); }
            public long toMillis(long d)  { return d; }
            public long toSeconds(long d) { return d/(C3/C2); }
            public long toMinutes(long d) { return d/(C4/C2); }
            public long toHours(long d)   { return d/(C5/C2); }
            public long toDays(long d)    { return d/(C6/C2); }
            public long convert(long d, TimeUnit u) { return u.toMillis(d); }
            int excessNanos(long d, long m) { return 0; }
        },
     
     MICROSECONDS {
            public long toNanos(long d)   { return x(d, C1/C0, MAX/(C1/C0)); }
            public long toMicros(long d)  { return d; }
            public long toMillis(long d)  { return d/(C2/C1); }
            public long toSeconds(long d) { return d/(C3/C1); }
            public long toMinutes(long d) { return d/(C4/C1); }
            public long toHours(long d)   { return d/(C5/C1); }
            public long toDays(long d)    { return d/(C6/C1); }
            public long convert(long d, TimeUnit u) { return u.toMicros(d); }
            int excessNanos(long d, long m) { return (int)((d*C1) - (m*C2)); }
        },
        
    NANOSECONDS {
            public long toNanos(long d)   { return d; }
            public long toMicros(long d)  { return d/(C1/C0); }
            public long toMillis(long d)  { return d/(C2/C0); }
            public long toSeconds(long d) { return d/(C3/C0); }
            public long toMinutes(long d) { return d/(C4/C0); }
            public long toHours(long d)   { return d/(C5/C0); }
            public long toDays(long d)    { return d/(C6/C0); }
            public long convert(long d, TimeUnit u) { return u.toNanos(d); }
            int excessNanos(long d, long m) { return (int)(d - (m*C2)); }
    },

    其中:

    static final long C0 = 1L;
    static final long C1 = C0 * 1000L;
    static final long C2 = C1 * 1000L;
    static final long C3 = C2 * 1000L;
    static final long C4 = C3 * 60L;
    static final long C5 = C4 * 60L;
    static final long C6 = C5 * 24L;
    @Test
    public void stopwatch1() {
        Stopwatch stopwatch = Stopwatch.createStarted();
    
        doSomething();
        stopwatch.stop(); // optional
        long millis = stopwatch.elapsed(MILLISECONDS);
        System.out.println("time: " + stopwatch);
    }
    
    @Test
    public void stopwatch2() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        //doSomething();
        stopwatch.stop();
        long millis = stopwatch.elapsed(MILLISECONDS);
        System.out.println("time: " + stopwatch);
    
        stopwatch.reset().start();
        //doSomething();
        stopwatch.stop();
        millis = stopwatch.elapsed(MILLISECONDS);
        System.out.println("time: " + stopwatch);
    }
    
    public static void doSomething(){
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    stopwatch1结果:

    time: 100.8 ms

    执行过程:

    使用stopwatch对程序运行时间进行调试,首先调用StopWatch.createStarted()创建并启动一个stopwatch实例,调用stopwatch.stop()停止计时,此时会更新stopwatch的elapsedNanos时间,为stopwatch开始启动到结束计时的时间,再次调用stopwatch.elapsed(),获取stopwatch在start-stop时间段,时间流逝的长度。

    RateLimiter.class

    public static RateLimiter create(double permitsPerSecond) {
            return create(permitsPerSecond, RateLimiter.SleepingStopwatch.createFromSystemTimer());//Stopwatch类稍后
        }
    
        @VisibleForTesting
        static RateLimiter create(double permitsPerSecond, RateLimiter.SleepingStopwatch stopwatch) {
            RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0D);
            rateLimiter.setRate(permitsPerSecond);
            return rateLimiter;
        }
        
        public final void setRate(double permitsPerSecond) {
            Preconditions.checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
            synchronized(this.mutex()) {
                this.doSetRate(permitsPerSecond, this.stopwatch.readMicros());
            }
        }
        
        abstract void doSetRate(double var1, long var3);
    说明:this.stopwatch.readMicros());源码最终调用的是
    NANOSECONDS {
    public long toNanos(long d) { return d; }
    public long toMicros(long d) { return d/(C1/C0); } //return (stopwatch中的elapsedNanos,表示时间差)/(1L * 1000L/1L)
    }

    SmoothRateLimiter

    final void doSetRate(double permitsPerSecond, long nowMicros) {
        this.resync(nowMicros);
        double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
        this.stableIntervalMicros = stableIntervalMicros;
        this.doSetRate(permitsPerSecond, stableIntervalMicros);
    }
    abstract void doSetRate(double var1, double var3);
    
    void resync(long nowMicros) {
        if (nowMicros > this.nextFreeTicketMicros) {
            //相当于(double)(nowMicros - this.nextFreeTicketMicros) * (permitsPerSecond double)TimeUnit.SECONDS.toMicros(1L)) //令牌生成速率:xx/单位时间
            double newPermits = (double)(nowMicros - this.nextFreeTicketMicros) / this.coolDownIntervalMicros();
            this.storedPermits = Math.min(this.maxPermits, this.storedPermits + newPermits);
            this.nextFreeTicketMicros = nowMicros;
        }
    }

    说明:

    nowMicros:表示用于标记从计时器开启到调用统计的方法时过去的时间
    coolDownIntervalMicros:添加令牌时间间隔
    stableIntervalMicros:添加令牌时间间隔 = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
    newPermits:时间段内新生令牌数
    storedPermits:当前令牌数

    nextFreeTicketMicros:

    下一次请求可以获取令牌的起始时间,由于RateLimiter允许预消费,上次请求预消费令牌后,下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌

    SmoothBursty

    static final class SmoothBursty extends SmoothRateLimiter {
            final double maxBurstSeconds;
    
            SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
                super(stopwatch, null);
                this.maxBurstSeconds = maxBurstSeconds;//在RateLimiter未使用时,最多存储几秒的令牌
            }
    
            void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
                double oldMaxPermits = this.maxPermits;
                this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
                if (oldMaxPermits == 1.0D / 0.0) { //相当于oldMaxPermits ==Double.POSITIVE_INFINITY ,Double.POSITIVE_INFINITY 表示无穷大
                    
                    this.storedPermits = this.maxPermits;
                } else {
                    this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : this.storedPermits * this.maxPermits / oldMaxPermits;
                }
    
            }
    
            long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
                return 0L;
            }
    
            double coolDownIntervalMicros() {
                return this.stableIntervalMicros;
            }
        }

    参数说明:

    maxBurstSeconds:在RateLimiter未使用时,最多存储几秒的令牌
    permitsPerSecond: 速率=令牌数/每秒
    maxPermits :最大存储令牌数 = maxBurstSeconds * permitsPerSecond
    storedPermits: 当前存储令牌数

    RateLimiter几个常用接口分析

    1、acquire() 函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回

    RateLimiter.calss

    @CanIgnoreReturnValue
    public double acquire() {
      return acquire(1);
    }
    
    /**
    * 获取令牌,返回阻塞的时间
    **/
    @CanIgnoreReturnValue
    public double acquire(int permits) {
      long microsToWait = reserve(permits);
      
      //获取等待时间后,阻塞线程
      stopwatch.sleepMicrosUninterruptibly(microsToWait);
      return 1.0 * microsToWait / SECONDS.toMicros(1L);
    }
    
    final long reserve(int permits) {
      checkPermits(permits);
      synchronized (mutex()) {
        return reserveAndGetWaitLength(permits, stopwatch.readMicros());
      }
    }
    
    final long reserveAndGetWaitLength(int permits, long nowMicros) {
        long momentAvailable = this.reserveEarliestAvailable(permits, nowMicros);
        return Math.max(momentAvailable - nowMicros, 0L);
    }
    abstract long reserveEarliestAvailable(int var1, long var2);

    SmoothRateLimiter.class

    final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
            this.resync(nowMicros);
            long returnValue = this.nextFreeTicketMicros;//resync()方法后,如果nowMicros > this.nextFreeTicketMicros,等于nowMicros
            
            double storedPermitsToSpend = Math.min((double)requiredPermits, this.storedPermits);
            //freshPermits从令牌桶中获取令牌后还需要的令牌数量
            double freshPermits = (double)requiredPermits - storedPermitsToSpend;
            
            //平滑这里this.storedPermitsToWaitTime()直接返回0L + 还需要令牌数量/速率(需要的时间)
            long waitMicros = this.storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long)(freshPermits * this.stableIntervalMicros);
            
            //如果超前消费,将导致下次请求等待时间=LongMath.saturatedAdd(this.nextFreeTicketMicros, waitMicros);
            this.nextFreeTicketMicros = LongMath.saturatedAdd(this.nextFreeTicketMicros, waitMicros);
            this.storedPermits -= storedPermitsToSpend;
            return returnValue;
        }

    2、tryAcquire()

    函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回false

     public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
            long timeoutMicros = Math.max(unit.toMicros(timeout), 0L);//超时时间
            checkPermits(permits);
            long microsToWait;
            synchronized(this.mutex()) {
                long nowMicros = this.stopwatch.readMicros();
                if (!this.canAcquire(nowMicros, timeoutMicros)) {
                    return false;
                }
                //获取需要阻塞时间
                microsToWait = this.reserveAndGetWaitLength(permits, nowMicros);
            }
    
            this.stopwatch.sleepMicrosUninterruptibly(microsToWait);
            return true;
        }
    
        private boolean canAcquire(long nowMicros, long timeoutMicros) {
            //下一次请求可以获取令牌的起始时间
            return this.queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
        }
    canAcquire用于判断timeout时间内是否可以获取令牌,通过判断当前时间+超时时间是否大于nextFreeTicketMicros 来决定是否能够拿到足够的令牌数,如果可以获取到,则过程同acquire,线程sleep等待,如果通过canAcquire在此超时时间内不能回去到令牌,则可以快速返回,不需要等待timeout后才知道能否获取到令牌。

    SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值

    SmoothWarmingUp创建方式:RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit)

    permitsPerSecond表示每秒新增的令牌数,warmupPeriod表示在从冷启动速率过渡到平均速率的时间间隔。

    @Test
        public void acquire1() {
            RateLimiter limiter = RateLimiter.create(5, 1000, TimeUnit.MILLISECONDS);
            for (int i = 1; i < 6; i++) {
                System.out.println(limiter.acquire());
            }
    
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            for (int i = 1; i < 6; i++) {
                System.out.println(limiter.acquire());
            }
        }

     结果:

    0.0
    0.518741
    0.357811
    0.219877
    0.199584
    0.0
    0.361189
    0.220761
    0.19938
    0.199856

    速率是梯形上升速率的,也就是说冷启动时会以一个比较大的速率慢慢到平均速率;然后趋于平均速率(梯形下降到平均速率)。可以通过调节warmupPeriod参数实现一开始就是平滑固定速率。

    参考:

    https://www.cnblogs.com/xuwc/p/9123078.html

    https://www.cnblogs.com/xuwc/p/9123078.html

  • 相关阅读:
    小儿吃鸡蛋食积案
    女子经前胀痛脸上红斑案
    小儿外感咳嗽咽痛案
    顽固偏头痛案
    交通心肾治失眠
    小儿扁桃体高热兼咳嗽案
    过敏疾患与太少两感相合
    经前乳胀案
    女子黃带案
    小儿外感后频繁眨眼案
  • 原文地址:https://www.cnblogs.com/xiaozhuanfeng/p/10617005.html
Copyright © 2011-2022 走看看