RateLimiter
是Guava
包提供的限流器,采用了令牌桶算法,特点是均匀地向桶中添加令牌,每次消费时也必须持有令牌,否则就需要等待。应用场景之一是限制消息消费的速度,避免消息消费过快而对下游的数据库造成较大的压力。
本文主要介绍RateLimiter
的源码,包括基本限流器SmoothBursty
,以及带预热效果的SmoothWarmingUp
。
RateLimiter
作为限流器的顶层类,只有两个属性:
private final SleepingStopwatch stopwatch;
private volatile Object mutexDoNotUseDirectly;
stopwatch
用来计算时间间隔,以及实现了当拿不到令牌时将线程阻塞的功能;mutexDoNotUseDirectly
主要用来进行线程同步。
RateLimiter
作为一个抽象类,本身不能直接实例化,可以使用静态工厂方法来创建:
public static RateLimiter create(double permitsPerSecond); //①
public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod); //②
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) //③
RateLimiter
对外提供了3个构造器,分成两类,构造器①是第一类,底层会创建基本限流器SmoothBursty
;构造器②和③是第二类,底层会创建带预热效果的SmoothWarmingUp
。参数permitsPerSecond
表示每秒产生多少个令牌,参数warmupPeriod
是限流器预热阶段的时间,即限流器产生令牌从最慢到最快所需要的时间,参数unit
是预热的时间单位。
SmoothRateLimiter
新增了4个属性:
//桶中存储的令牌数
double storedPermits;
//桶中允许的最大令牌数
double maxPermits;
//稳定状态下产生令牌是速度,值为1000000/permitsPerSecond,单位是微秒/个
double stableIntervalMicros;
//下一次请求需要等待的时间
private long nextFreeTicketMicros = 0L;
这其中比较有意思的是nextFreeTicketMicros
字段,它表示下一次获取令牌的请求到来时需要等待的时间,该字段可以实现上一次获取令牌的请求预支的等待时间由下一次请求来兑现。举例来说,如果桶中当前有10个令牌,一个请求来获取20个令牌,假设1秒产生一个令牌,那么本次请求预支了10个令牌,可以直接返回,但是下一个请求就需要等待10秒的时间,这10秒就是上一次请求预支的令牌耗时。
接下来先介绍SmoothBursty
的构造过程:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
构造SmoothBursty
时传入了两个参数,stopwatch
好理解,第二个参数意思是当限流器长时间没用时,令牌桶内最多存储多少秒的令牌,这里限定了最多只存储1秒钟的令牌,也就是permitsPerSecond
个。
我们继续分析setRate
方法的实现:
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
setRate
方法先校验permitsPerSecond
必须为整数,然后在同步块中执行doSetRate
方法。mutex
方法通过双重检测的方式实例化mutexDoNotUseDirectly
字段,详细代码略去,doSetRate
是抽象方法,具体的实现在抽象子类SmoothRateLimiter
中:
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
//计算产生单个令牌的时间间隔
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
//stableIntervalMicros字段赋值
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
doSetRate
方法主要是设置了stableIntervalMicros
和nextFreeTicketMicros
字段,调用的两个方法resync
和doSetRate
我们接着分析。resync
方法主要用来设置storedPermits
和nextFreeTicketMicros
这俩字段,代码如下:
void resync(long nowMicros) {
// nowMicros单位是微秒,初始化过程中,代码执行到这里时,nowMicros已经大于0, nextFreeTicketMicros取默认值0
if (nowMicros > nextFreeTicketMicros) {
//计算超过的这些时间里产生了多少新的令牌
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
//重新计算当前令牌桶内持有的令牌数,此时maxPermits=0,storedPermits赋值后仍然是0
storedPermits = min(maxPermits, storedPermits + newPermits);
//更新下次获取令牌的时间为当前时间
nextFreeTicketMicros = nowMicros;
}
}
此方法会根据当前的时间决定是否进行字段赋值,如果当前时间已经超过了nextFreeTicketMicros
的值,那么就重新计算storedPermits
和nextFreeTicketMicros
字段,其中计算storedPermits
的代码虽然容易理解,但是思路挺巧妙。一般来说,令牌桶算法的令牌需要以固定的速率进行添加,那么很自然想到可以起一个任务,按照一定的速度产生令牌,但是起一个新任务会占用一定的资源,从而加重系统的负担,此处的实现是利用时间差来计算这段时间产生的令牌数,以简单的计算完成了原本独立任务需要做的事情,开销大大减少了。coolDownIntervalMicros
方法是抽象方法,在SmoothBursty
和SmoothWarmingUp
有不同的实现,在SmoothBursty
的实现是直接返回stableIntervalMicros
字段,这个字段目前还没设置过值,取默认值0.0,这里double
的除零操作并不会抛异常,而是会返回无穷大。
我们接着看一下doSetRate
方法,这也是个抽象方法,在SmoothBursty
中的实现如下:
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
//初始化的过程中,maxPermits是默认值0
double oldMaxPermits = this.maxPermits;
//最大令牌数,maxBurstSeconds的值固定是1,最大令牌数与permitsPerSecond相等
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
//在初始化阶段,storedPermits赋值为0
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
maxPermits
在此之前并没有设置过值,因此默认是0.0,这里只是将storedPermits
初始化成了0。不过这里的代码也说明,在执行期间maxPermits
是可以在其他地方被修改的,如果出现了更改,就会等比例修改storedPermits
的值。
到这里SmoothBursty
的初始化过程就结束了,大体上是将内部的字段赋予初始值。我们接下来看看SmoothBursty
的使用:
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
acquire
方法用于从令牌桶中获取令牌,参数permits
表示需要获取的令牌数量,如果当前没办法拿到需要的令牌,线程会阻塞一段时间,该方法返回等待的时间,reserve
的实现如下:
final long reserve(int permits) {
//参数校验,略
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
//返回等待时间,如果不需要等待,返回0
return max(momentAvailable - nowMicros, 0);
}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
//再次执行resync方法,但由于stableIntervalMicros已经完成初始化,因此更新之后的storedPermits也不再是0
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
//取可用的令牌与需要的令牌两者的最小值
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
//计算该次请求超出的令牌数,这生成多余令牌需要的时间会算在下一次请求上
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
//扣减令牌桶库存
this.storedPermits -= storedPermitsToSpend;
//注意这里返回的是上次的时间
return returnValue;
}
reserve
的核心逻辑在reserveEarliestAvailable
方法中,该方法的主要思想是:检查当前令牌桶内令牌数是否满足需求,如果满足则不需要额外的等待时间,否则需要将额外等待时间追加到nextFreeTicketMicros
。需要注意的是方法返回的不是更新过后的nextFreeTicketMicros
,而是上一次请求更新过后的时间,这个时间就是当前线程需要阻塞的时间,也就是说,当前请求所需要等待的时间是由下次请求完成的,下次请求需要的等待时间由下下次请求完成,以此类推。当前请求的令牌数超过令牌桶中的令牌数越多,下次请求需要等待的时间就越长。并且这里并没有对requiredPermits
的上限做检查,这就允许预支令牌,即假设桶的上限是100个令牌,那么一次请求可以允许超过100个令牌,只是生成多余令牌的时间需要算到下一个请求上。同时这里的逻辑也说明,获取令牌是直接成功的,如果获取的令牌数过多,会将storedPermits
扣成0,但不会扣成负数。
到这里SmoothBursty
的初始化以及获取令牌的所有逻辑就介绍完了,接下来看看另一个类SmoothWarmingUp
的源码。RateLimiter
使用了模板方法模式,SmoothWarmingUp
和SmoothBursty
共用了很多代码,只是在一些特定方法上各自的实现细节不同。
static RateLimiter create(
double permitsPerSecond,
long warmupPeriod,
TimeUnit unit,
double coldFactor,
SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
我们之前介绍的另外两个构造器,底层调用的是这个包级的create
方法,该方法的5个参数中,只有coldFactor
是新出现的,字面意思是冷启动因子,源码写死了是3.0,该值表示在预热阶段开始时,以多大的速率产生令牌,冷启动速率是稳定速率的三分之一,在预热阶段,该值会均匀减少,冷启动阶段结束后恢复到正常速率。
setRate
方法底层也是调用doSetRate
方法,这里重点关注SmoothWarmingUp
中重载的部分:
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
void resync(long nowMicros) {
if (nowMicros > nextFreeTicketMicros) {
//这里coolDownIntervalMicros返回无穷大,则newPermits赋值为0,与SmoothBursty中的无穷大正好相反
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
//与SmoothBursty中的实现不同
double coolDownIntervalMicros() {
//maxPermits=0,此方法返回无穷大
return warmupPeriodMicros / maxPermits;
}
//此方法的实现和SmoothBursty中的很不一样
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
//maxPermits初始化时候是0
double oldMaxPermits = maxPermits;
//设置冷启动生成令牌的间隔是正常值的3倍(codeFactor固定为3)
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; //公式①
//这里的计算说明maxPermits可能超过permitsPerSecond
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); //公式②
//slope是梯形部分斜线的斜率
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // storedPermit赋值为maxPermits
: storedPermits * maxPermits / oldMaxPermits;
}
}
doSetRate
的代码不容易理解,源码中利用图示介绍了几个变量之间的关系(但是本人仍然不是很理解,因此只能将结论放在这里,无法进行更多解释),如图所示,源码注释中说明了如下的两个等式:
- 梯形的面积等于预热时间
warmupPeriodMicros
warmupPeriodMicros = 0.5 * (coldIntervalMicros + stableIntervalMicros) * (maxPermits - thresholdPermits)
由此可以得到公式②:
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
这里计算maxPermits
的逻辑与SmoothBursty
不一样,SmoothBursty
里它的值等于permitsPerSecond
,而这里的maxPermits
是可以超过permitsPerSecond
的。
- 左边矩形的面积是梯形面积的一半,由此可知:
warmupPeriodMicros * 0.5 = thresholdPermits * stableIntervalMicros
计算可得公式①:
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros
SmoothWarmingUp
的初始化逻辑到这里就结束了,接下来介绍下它获取令牌的流程,acquire
方法的其他部分上文已经介绍过,此处重点介绍storedPermitsToWaitTime
方法:
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
//存储的令牌数量超出thresholdPermits的部分,这部分反映在梯形区域
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
//availablePermitsAboveThreshold > 0.0表示存储的令牌数已经超过thresholdPermits,到了梯形区域
if (availablePermitsAboveThreshold > 0.0) {
//permitsAboveThresholdToTake表示梯形区域的高
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
//length计算的是梯形的上底+下底
double length = permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
//梯形区域的面积,即生产梯形区域的令牌数所需要的时间
//由此处可知,只要存储的令牌数超过了左侧矩形区域达到了梯形区域,就需要梯形区域的等待时间
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
//扣除掉需要消耗的梯形区域的令牌数,表示还需要从左侧矩形区域取得的令牌数量
permitsToTake -= permitsAboveThresholdToTake;
}
//等待时间=梯形区域的时间+矩形区域的时间,按照代码的逻辑,只要梯形区域的令牌数小于permitsToTake,就要取左侧的令牌,需要额外的等待时间
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
//由前文可知,slope = =y/x = 产生令牌间隔/令牌数,permits * slope表示产生令牌间隔的增量,加上stableIntervalMicros表示梯形的底
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
此处的storedPermitsToWaitTime
与SmoothBursty
中的实现大不相同,SmoothBursty
由于不需要预热,可以直接获取桶中的令牌,因此直接返回了0,而此处存在预热阶段,不能直接获取到令牌,因此计算逻辑稍微复杂些,总体来说,就是求图中阴影部分的面积。并且比较有意思的地方是,只要桶里有存储的有令牌,那么获取令牌的时候总会有额外的等待时间,总共有3中情况:
- 只需要左侧区域固定速率的等待时间
即存储的令牌数比较少,还没有到底梯形区域的时候的等待时间,此区间内生产令牌速度恒定,等待时间与能获取到的令牌数成正比。 - 只需要梯形区域的等待时间
即存储的令牌较多,已经到达了梯形区域,并且梯形部分代表的令牌数满足本次获取的数量要求,那么久只需要梯形范围内的等待时间。 - 需要两个区域内的等待时间之和
即梯形区域内的令牌数量小于请求的令牌数,需要从左侧区域拿一些令牌,不同区域的等待时间计算有区别。
同时,也可以看出,当每次使用间隔比较久的时候(哪怕只要桶中产生了一个还没用掉的令牌),此次请求就需要作额外的等待。总结来说,对于使用频率不是很高的场景,SmoothWarmingUp
要求每次请求都需要经历冷启动过程,这也是为了防止长时间没有请求时,下游的各种连接断开,瞬时高并发可能造成下游服务崩溃的问题;而对于高并发场景,由于相邻请求间隔非常短,可以维持storedPermits=0,那么此时SmoothWarmingUp
就没什么额外等待时间了,性能上与SmoothBursty
一致。