RateLimiter是Guava的concurrent包下的一个用于限制访问频率的类.
1.限流
每个API接口都是有访问上限的,当访问频率或者并发量超过其承受范围时候,我们就必须考虑限流来保证接口的可用性或者降级可用性.即接口也需要安装上保险丝,以防止非预期的请求对系统压力过大而引起的系统瘫痪.
通常的策略就是拒绝多余的访问,或者让多余的访问排队等待服务,或者引流.
如果要准确的控制QPS,简单的做法是维护一个单位时间内的Counter,如判断单位时间已经过去,则将Counter重置零.此做法被认为没有很好的处理单位时间的边界,比如在前一秒的最后一毫秒里和下一秒的第一毫秒都触发了最大的请求数,将目光移动一下,就看到在两毫秒内发生了两倍的QPS.
2.限流算法
常用的更平滑的限流算法有两种:漏桶算法和令牌桶算法.
两种限流的祖师级算法确有其独到之处,其他实现比如滑动时间窗或者三色速率标记法,其实是“漏桶”与“令牌桶”的变种。要么将“漏桶”容积换成了单位时间,要么是按规则将请求标记颜色进行处理,底层还是“令牌”的思想。
很多传统的服务提供商如华为中兴都有类似的专利,参考: http://www.google.com/patents/CN1536815A?cl=zh
2.1计数器
计数器法是限流算法里最简单也是最容易实现的一种算法。比如我们规定,对于A接口来说,我们1分钟的访问次数不能超过100个。那么我们我们可以设置一个计数器counter,其有效时间为1分钟(即每分钟计数器会被重置为0),每当一个请求过来的时候,counter就加1,如果counter的值大于100,就说明请求数过多;
这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题。
如上图所示,在1:00前一刻到达100个请求,1:00计数器被重置,1:00后一刻又到达100个请求,显然计数器不会超过100,所有请求都不会被拦截;然而这一时间段内请求数已经达到200,远超100。
2.2 漏桶算法
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,而当入小于出的情况下,漏桶不起任何作用。可以看出漏桶算法能强行限制数据的传输速率.示意图如下:
注意:在我们的应用中,漏桶算法强制限定流量速率后,多出的(溢出的)流量可以被利用起来,并非完全丢弃,我们可以把它收集到一个队列里面,做流量队列,尽量做到合理利用所有资源。
漏桶算法:水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出(拒绝服务),可以看出漏桶算法能强行限制数据的传输速率
流入:以任意速率往桶中放入水滴。
流出:以固定速率从桶中流出水滴。
用白话具体说明:假设漏斗总支持并发100个最大请求,如果超过100个请求,那么会提示系统繁忙,请稍后再试,数据输出那可以设置1个线程池,处理线程数5个,每秒处理20个请求。
缺点:因为当流出速度固定,大规模持续突发量,无法多余处理,浪费网络带宽
优点:无法击垮服务
示例:
可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate),伪代码如下:
double rate; // leak rate in calls/s double burst; // bucket size in calls long refreshTime; // time for last water refresh double water; // water count at refreshTime refreshWater() { long now = getTimeOfDay(); //水随着时间流逝,不断流走,最多就流干到0. water = max(0, water- (now - refreshTime)*rate); refreshTime = now; } bool permissionGranted() { refreshWater(); if (water < burst) { // 水桶还没满,继续加1 water ++; return true; } else { return false; } }
因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率.
2.3 令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了,令牌就溢出了。如果桶未满,令牌可以积累。新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.
令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.
令牌桶算法:一个存放固定容量令牌的桶,按照固定速率(每秒/或者可以自定义时间)往桶里添加令牌,然后每次获取一个令牌,当桶里没有令牌可取时,则拒绝服务
令牌桶分为2个动作,动作1(固定速率往桶中存入令牌)、动作2(客户端如果想访问请求,先从桶中获取token)
流入:以固定速率从桶中流入水滴
流出:按照任意速率从桶中流出水滴
技术上使用Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法来完成限流,非常易于使用
优点:支持大的并发,有效利用网络带宽
漏桶和令牌桶的区别:
并不能说明令牌桶一定比漏洞好,她们使用场景不一样。
- 令牌桶算法,放在服务端,用来保护服务端(自己),主要用来对调用者频率进行限流,为的是不让自己被压垮。所以如果自己本身有处理能力的时候,如果流量突发(实际消费能力强于配置的流量限制=桶大小),那么实际处理速率可以超过配置的限制(桶大小)。
- 而漏桶算法,放在调用方,这是用来保护他人,也就是保护他所调用的系统。主要场景是,当调用的第三方系统本身没有保护机制,或者有流量限制的时候,我们的调用速度不能超过他的限制,由于我们不能更改第三方系统,所以只有在主调方控制。这个时候,即使流量突发,也必须舍弃。因为消费能力是第三方决定的。
2.4、滑动时间窗
滑动窗口,又称rolling window。为了解决这个问题,我们引入了滑动窗口算法。如果学过TCP网络协议的话,那么一定对滑动窗口这个名词不会陌生。下面这张图,很好地解释了滑动窗口算法:
在上图中,整个红色的矩形框表示一个时间窗口,在我们的例子中,一个时间窗口就是一分钟。然后我们将时间窗口进行划分,比如图中,我们就将滑动窗口 划成了6格,所以每格代表的是10秒钟。每过10秒钟,我们的时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。
那么滑动窗口怎么解决刚才的临界问题的呢?我们可以看上图,0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格 子中。当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触 发了限流。
我再来回顾一下刚才的计数器算法,我们可以发现,计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。
由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。
2.5、三色速率标记法
3.RateLimiter简介(guava的令牌桶实现)
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法(Token Bucket)来完成限流,非常易于使用.RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率.它支持两种获取permits接口,一种是如果拿不到立刻返回false,一种会阻塞等待一段时间看能不能拿到.
RateLimiter和Java中的信号量(java.util.concurrent.Semaphore)类似,Semaphore通常用于限制并发量.
源码注释中的一个例子,比如我们有很多任务需要执行,但是我们不希望每秒超过两个任务执行,那么我们就可以使用RateLimiter:
final RateLimiter rateLimiter = RateLimiter.create(2.0); void submitTasks(List<Runnable> tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // may wait executor.execute(task); } }
另外一个例子,假如我们会产生一个数据流,然后我们想以每秒5kb的速度发送出去.我们可以每获取一个令牌(permit)就发送一个byte的数据,这样我们就可以通过一个每秒5000个令牌的RateLimiter来实现:
final RateLimiter rateLimiter = RateLimiter.create(5000.0); void submitPacket(byte[] packet) { rateLimiter.acquire(packet.length); networkService.send(packet); }
另外,我们也可以使用非阻塞的形式达到降级运行的目的,即使用非阻塞的tryAcquire()方法:
if(limiter.tryAcquire()) { //未请求到limiter则立即返回false doSomething(); }else{ doSomethingElse(); }
4.RateLimiter主要接口
RateLimiter其实是一个abstract类,但是它提供了几个static方法用于创建RateLimiter:
/** * 创建一个稳定输出令牌的RateLimiter,保证了平均每秒不超过permitsPerSecond个请求 * 当请求到来的速度超过了permitsPerSecond,保证每秒只处理permitsPerSecond个请求 * 当这个RateLimiter使用不足(即请求到来速度小于permitsPerSecond),会囤积最多permitsPerSecond个请求 */ public static RateLimiter create(double permitsPerSecond); /** * 创建一个稳定输出令牌的RateLimiter,保证了平均每秒不超过permitsPerSecond个请求 * 还包含一个热身期(warmup period),热身期内,RateLimiter会平滑的将其释放令牌的速率加大,直到起达到最大速率 * 同样,如果RateLimiter在热身期没有足够的请求(unused),则起速率会逐渐降低到冷却状态 * * 设计这个的意图是为了满足那种资源提供方需要热身时间,而不是每次访问都能提供稳定速率的服务的情况(比如带缓存服务,需要定期刷新缓存的) * 参数warmupPeriod和unit决定了其从冷却状态到达最大速率的时间 */ public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit);
- create(double permitsPerSecond):根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)
- create(double permitsPerSecond, long warmupPeriod, TimeUnit unit):根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)
提供了两个获取令牌的方法,不带参数表示获取一个令牌.如果没有令牌则一直等待,返回等待的时间(单位为秒),没有被限流则直接返回0.0:
public double acquire(); public double acquire(int permits);
两个方法区别:第一个方法固定速率生成令牌数,第二个方法有预热时间段,在预热阶段内不超过设定的令牌数,超过预热期后以固定时间速率生成令牌,当出现突然出现大量数据。
明显区别:使用第一种方法可能会使消费方(后台服务)没有消费完成,一直往系统塞数据导致服务器不可用,使用第二种方法将流量比较平滑的过渡,从而降低后台服务down掉的风险(就是预热期内设置的令牌数少,不容易一下子把系统攻破)
尝试获取令牌,分为待超时时间和不带超时时间两种:
public boolean tryAcquire(); //尝试获取一个令牌,立即返回 public boolean tryAcquire(int permits); public boolean tryAcquire(long timeout, TimeUnit unit); //尝试获取permits个令牌,带超时时间 public boolean tryAcquire(int permits, long timeout, TimeUnit unit);
5.RateLimiter设计
考虑一下RateLimiter是如何设计的,并且为什么要这样设计.
RateLimiter的主要功能就是提供一个稳定的速率,实现方式就是通过限制请求流入的速度,比如计算请求等待合适的时间阈值.
实现QPS速率的最简单的方式就是记住上一次请求的最后授权时间,然后保证1/QPS秒内不允许请求进入.比如QPS=5,如果我们保证最后一个被授权请求之后的200ms的时间内没有请求被授权,那么我们就达到了预期的速率.如果一个请求现在过来但是最后一个被授权请求是在100ms之前,那么我们就要求当前这个请求等待100ms.按照这个思路,请求15个新令牌(许可证)就需要3秒.
有一点很重要:上面这个设计思路的RateLimiter记忆非常的浅,它的脑容量非常的小,只记得上一次被授权的请求的时间.如果RateLimiter的一个被授权请求q之前很长一段时间没有被使用会怎么样?这个RateLimiter会立马忘记过去这一段时间的利用不足,而只记得刚刚的请求q.
过去一段时间的利用不足意味着有过剩的资源是可以利用的.这种情况下,RateLimiter应该加把劲(speed up for a while)将这些过剩的资源利用起来.比如在向网络中发生数据的场景(限流),过去一段时间的利用不足可能意味着网卡缓冲区是空的,这种场景下,我们是可以加速发送来将这些过程的资源利用起来.
另一方面,过去一段时间的利用不足可能意味着处理请求的服务器对即将到来的请求是准备不足的(less ready for future requests),比如因为很长一段时间没有请求当前服务器的cache是陈旧的,进而导致即将到来的请求会触发一个昂贵的操作(比如重新刷新全量的缓存).
为了处理这种情况,RateLimiter中增加了一个维度的信息,就是过去一段时间的利用不足(past underutilization),代码中使用storedPermits变量表示.当没有利用不足这个变量为0,最大能达到maxStoredPermits(maxStoredPermits表示完全没有利用).因此,请求的令牌可能从两个地方来:
1.过去剩余的令牌(stored permits, 可能没有)
2.现有的令牌(fresh permits,当前这段时间还没用完的令牌)
我们将通过一个例子来解释它是如何工作的:
对一个每秒产生一个令牌的RateLimiter,每有一个没有使用令牌的一秒,我们就将storedPermits加1,如果RateLimiter在10秒都没有使用,则storedPermits变成10.0.这个时候,一个请求到来并请求三个令牌(acquire(3)),我们将从storedPermits中的令牌为其服务,storedPermits变为7.0.这个请求之后立马又有一个请求到来并请求10个令牌,我们将从storedPermits剩余的7个令牌给这个请求,剩下还需要三个令牌,我们将从RateLimiter新产生的令牌中获取.我们已经知道,RateLimiter每秒新产生1个令牌,就是说上面这个请求还需要的3个请求就要求其等待3秒.
想象一个RateLimiter每秒产生一个令牌,现在完全没有使用(处于初始状态),限制一个昂贵的请求acquire(100)过来.如果我们选择让这个请求等待100秒再允许其执行,这显然很荒谬.我们为什么什么也不做而只是傻傻的等待100秒,一个更好的做法是允许这个请求立即执行(和acquire(1)没有区别),然后将随后到来的请求推迟到正确的时间点.这种策略,我们允许这个昂贵的任务立即执行,并将随后到来的请求推迟100秒.这种策略就是让任务的执行和等待同时进行.
一个重要的结论:RateLimiter不会记最后一个请求,而是即下一个请求允许执行的时间.这也可以很直白的告诉我们到达下一个调度时间点的时间间隔.然后定一个一段时间未使用的Ratelimiter也很简单:下一个调度时间点已经过去,这个时间点和现在时间的差就是Ratelimiter多久没有被使用,我们会将这一段时间翻译成storedPermits.所有,如果每秒钟产生一个令牌(rate==1),并且正好每秒来一个请求,那么storedPermits就不会增长.
6.RateLimiter主要源码
RateLimiter定义了两个create函数用于构建不同形式的RateLimiter:
1.public static RateLimiter create(double permitsPerSecond)
用于创建SmoothBursty类型的RateLimiter
2.public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
用于创建
源码下面以acquire为例子,分析一下RateLimiter如何实现限流:
public double acquire() { return acquire(1); } public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { //应对并发情况需要同步 return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); }
下面方法来自RateLimiter的具体实现类SmoothRateLimiter:
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); //补充令牌 long returnValue = nextFreeTicketMicros; //这次请求消耗的令牌数目 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros; this.storedPermits -= storedPermitsToSpend; return returnValue; } private void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { storedPermits = min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros); nextFreeTicketMicros = nowMicros; } }
另外,对于storedPermits的使用,RateLimiter存在两种策略,二者区别主要体现在使用storedPermits时候需要等待的时间。这个逻辑由storedPermitsToWaitTime函数实现:
/** * Translates a specified portion of our currently stored permits which we want to * spend/acquire, into a throttling time. Conceptually, this evaluates the integral * of the underlying function we use, for the range of * [(storedPermits - permitsToTake), storedPermits]. * * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits} */ abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
存在两种策略就是为了应对我们上面讲到的,存在资源使用不足大致分为两种情况: (1).资源确实使用不足,这些剩余的资源我们私海可以使用的; (2).提供资源的服务过去还没准备好,比如服务刚启动等;
为此,RateLimiter实际上由两种实现策略,其实现分别见SmoothBursty和SmoothWarmingUp。二者主要的区别就是storedPermitsToWaitTime实现以及maxPermits数量的计算。
6.1 SmoothBursty
SmoothBursty使用storedPermits不需要额外等待时间。并且默认maxBurstSeconds未1,因此maxPermits为permitsPerSecond,即最多可以存储1秒的剩余令牌,比如QPS=5,则maxPermits=5.
下面这个RateLimiter的入口就是用来创建SmoothBursty类型的RateLimiter,
/** * This implements a "bursty" RateLimiter, where storedPermits are translated to * zero throttling. The maximum number of permits that can be saved (when the RateLimiter is * unused) is defined in terms of time, in this sense: if a RateLimiter is 2qps, and this * time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits. */ static final class SmoothBursty extends SmoothRateLimiter { /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */ final double maxBurstSeconds; SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; } void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; System.out.println("maxPermits=" + maxPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } } long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { return 0L; } }
一个简单的使用示意图及解释,下面私海一个QPS=4的SmoothBursty:
(1).t=0,这时候storedPermits=0,请求1个令牌,等待时间=0;
(2).t=1,这时候storedPermits=3,请求3个令牌,等待时间=0;
(3).t=2,这时候storedPermits=4,请求10个令牌,等待时间=0,超前使用了2个令牌;
(4).t=3,这时候storedPermits=0,请求1个令牌,等待时间=0.5;
代码的输出:
maxPermits=4.0, storedPermits=7.2E-4, stableIntervalMicros=250000.0, nextFreeTicketMicros=1472 acquire(1), sleepSecond=0.0 maxPermits=4.0, storedPermits=3.012212, stableIntervalMicros=250000.0, nextFreeTicketMicros=1004345 acquire(3), sleepSecond=0.0 maxPermits=4.0, storedPermits=4.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2004668 acquire(10), sleepSecond=0.0 maxPermits=4.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=3504668 acquire(1), sleepSecond=0.499591
6.2 SmoothWarmingUp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
|
maxPermits等于热身(warmup)期间能产生的令牌数,比如QPS=4,warmup为2秒,则maxPermits=8.halfPermits为maxPermits的一半.
参考注释中的神图:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
|
下面是我们QPS=4,warmup为2秒时候对应的图。
maxPermits=8,halfPermits=4,和SmoothBursty相同的请求序列:
(1).t=0,这时候storedPermits=8,请求1个令牌,使用1个storedPermits消耗时间=1×(0.75+0.625)/2=0.6875秒;
(2).t=1,这时候storedPermits=8,请求3个令牌,使用3个storedPermits消耗时间=3×(0.75+0.375)/2=1.6875秒(注意已经超过1秒了,意味着下次产生新Permit时间为2.6875);
(3).t=2,这时候storedPermits=5,请求10个令牌,使用5个storedPermits消耗时间=1×(0.375+0.25)/2+4*0.25=1.3125秒,再加上额外请求的5个新产生的Permit需要消耗=5*0.25=1.25秒,即总共需要耗时2.5625秒,则下一次产生新的Permit时间为2.6875+2.5625=5.25,注意当前请求私海2.6875才返回的,之前一直阻塞;
(4).t=3,因为前一个请求阻塞到2.6875,实际这个请求3.6875才到达RateLimiter,请求1个令牌,storedPermits=0,下一次产生新Permit时间为5.25,因此总共需要等待5.25-3.6875=1.5625秒;
实际执行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
|
7.其他限流器
ASP.NET版本的一个比较成熟限流器:WebApiThrottle 参考:http://www.cnblogs.com/mushroom/archive/2015/07/21/4659200.html
参考: https://github.com/springside/springside4/wiki/Rate-Limiter
https://en.wikipedia.org/wiki/Token_bucket
https://en.wikipedia.org/wiki/Leaky_bucket
转自:http://m.blog.csdn.net/tianyaleixiaowu/article/details/74942405