什么是限流?为什么要限流
通俗的来讲,一根管子往池塘注水,池塘底部有一个口子往外出水,当注水的速度过快时,池塘的水会溢出,此时,我们的做法换根小管子注水或者把注水管子的口堵住一半,这就是限流,限流的目的就是为了防止池塘的水溢出,放在软件开发中,一台硬件的CPU和内存总归是有限的,能处理的请求量是有一个阈值的,就跟人的精力一样是有限的,超过这个限度系统就会异常,人就会生病。
明白了什么是限流,为什么要限流,那么互联网公司在各种业务大促中,为了保证系统不被流量压垮,会在系统流量到达设置的阈值时,拒绝后续的流量,限流会导致部分时间段(这个时间段是毫秒级的)系统不可用,不是完全不可用,一般衡量系统处理能力的指标是每秒的QPS或者TPS,假设系统每秒的阈值是1000,当这一秒内有1001个请求访问时,那最后一个请求就会被限流(拒绝处理)
限流常用的几种算法
在具体开发中,尤其是RPC框架中,限流是RPC的标配,一般业务开发人员很少做限流算法开发,这也导致大部分开发人员不是很了解限流算法的原理,这里分享几种常用的限流算法,指出他们的优缺点,并通过代码实现他们。
计数器限流
你要是仔细看了上面的内容,就会发现上面举例的每秒阈值1000的那个例子就是一个计数器限流的思想,计数器限流的本质是一定时间内,访问量到达设置的限制后,在这个时间段没有过去之前,超过阈值的访问量拒绝处理,举个例,你告诉老板我一个小时只处理10件事,这是你的处理能力,但领导半个小内就断续断续给你分派了10件事,这时已经到达你的极限了,在后面的半个小时内,领导再派出的活你是拒绝处理的,直到下一个小时的时间段开始。
首先我们定义一个计数限流的结构体,结构体中至少满足3个字段,阈值,单位时间,当前请求数,结构体如下
type CountLimiter struct {
count int64 //阈值
unitTime time.Duration //单位时间(每秒或者每分钟)
index *atomic.Int64 //计数累加
}
我们需要一个为这个结构体提供创建对象的方法,同时初始化各个字段,其中有些字段是可以从外部当作此参数传入的,完成之后同时启动一个定时器。
//创建一个计数器限流结构体
func NewCountLimiter(count int64, unitTime time.Duration) *CountLimiter {
countLimiter := &CountLimiter{
count: count,
unitTime: unitTime,
index: atomic.NewInt64(0),
}
//开启一个新的协程
go timer(countLimiter)
return countLimiter
}
这个定时器干嘛呢,需要在经过单位时间后把当前请求数清0,从而开启下一个单位时间内的请求统计。
//相当于一个定时器,每经过过单位时间后把index置为0,重新累加
func timer(limiter *CountLimiter) {
ticker := time.NewTicker(limiter.unitTime)
for {
<-ticker.C
limiter.index.Store(0)
}
}
最后最重要的是这个计数器限流对象需要提高一个判断当前请求是否限流的方法,返回值应该是一个bool值,true代表请求通过,false代表请求被限流。
//判断是否允许请求通过
func (cl *CountLimiter) IsAllow() bool {
//如果index累加已经超过阈值,不允许请求通过
if cl.index.Load() >= cl.count {
return false
}
//index加1
cl.index.Add(1)
return true
}
这样一个计数器限流就实现完成了,有没有什么问题呢?还是前面举的例子,每秒1000的阈值,假设在前100毫秒内,计数器index就累加到1000了,那么剩余的900毫秒内就无法处理任何请求了,这种限流很容易造成热点,再来分析一种情况,在一秒内最后100毫秒时间内突发请求800个,这时进入下一个单位时间内,在这个单位时间的前100毫秒内,突发请求700个,这时你会发现200毫秒处理了请求1500个,好像限流不起作用了,是的,这是一个边界问题,是计数器限流的缺点。,如下图,黄线是第一个单位时间内,红线是第二个单位时间内。
令牌桶限流
令牌桶限流-顾名思义,手中握有令牌才能通过,系统只处理含有令牌的请求,如果一个请求获取不到令牌,系统拒绝处理,再通俗一点,医院每天接待病人是有限的,只有挂了号才能看病,挂不上号,对不起,医院不给你看病。
令牌桶,有一个固定大小的容器,每隔一定的时间往桶内放入固定数量的定牌,当请求到来时去容器内先获取令牌,拿到了,开始处理,拿不到拒绝处理(或者短暂的等待,再此获取还是获取不到就放弃)
首先我们定义一个令牌桶结构体,根据令牌桶算法我们结构体中字段至少需要有桶容量,令牌容器,时间间隔,初始令牌数核心字段,代码如下:
type TokenBucket struct {
interval time.Duration //时间间隔
ticker *time.Ticker //定时器
cap int // 桶容量
avail int //桶内一开始令牌数
tokenArray []int //存储令牌的数组
intervalInToken int //时间间隔内放入令牌的个数
index int //数组放入令牌的下标处
mutex sync.Mutex
}
同样的,我们需要提供一个创建令牌桶对象的方法,并且初始化所有字段的值,一些字段需要根据外部传参来决定,同时开启一个新的协程定时放入一定数量的令牌
//创建一个令牌通,入参为令牌桶的容量
func NewTokenBucket(cap int) *TokenBucket {
if cap < 100{
return nil
}
tokenBucket := &TokenBucket{
interval: time.Second * 1,
cap: cap,
avail: 100,
tokenArray: make([]int, cap, cap),
intervalInToken: 100,
index: 0,
mutex: sync.Mutex{},
}
//开启一个协程往容器内定时放入令牌
go adjustTokenDaemon(tokenBucket)
return tokenBucket
}
这个方法的核心是初始化令牌桶的初始数量,然后启动定时器,定时调用放入令牌方法
//调整令牌桶令牌的方法
func adjustTokenDaemon(tokenBucket *TokenBucket) {
//如果桶内一开始的令牌小于初始令牌,开始放入初始令牌
for tokenBucket.index < tokenBucket.avail {
tokenBucket.tokenArray[tokenBucket.index] = 1
tokenBucket.index++
}
tokenBucket.ticker = time.NewTicker(tokenBucket.interval)
go func(t *time.Ticker) {
for {
<-t.C
putToken(tokenBucket)
}
}(tokenBucket.ticker)
}
往令牌容器中添加令牌,记得加锁,因为涉及到多协程操作,一个放令牌,一个取令牌,所以可能存在并发安全情况。
//放入令牌
func putToken(tokenBucket *TokenBucket) {
tokenBucket.mutex.Lock()
for i := 0; i < tokenBucket.intervalInToken; i++ {
//容器满了,无法放入令牌了,终止
if tokenBucket.index > tokenBucket.cap-1 {
break
}
tokenBucket.tokenArray[tokenBucket.index] = 1
tokenBucket.index++
}
defer tokenBucket.mutex.Unlock()
}
最后当有请求到来时,我们从令牌桶内取出一个令牌,如果取出成功,则代表请求通过,否则,请求失败,相当于限流了。
//从令牌桶弹出一个令牌,如果令牌通有令牌,返回true,否则返回false
func (tokenBucket *TokenBucket) PopToken() bool {
defer tokenBucket.mutex.Unlock()
tokenBucket.mutex.Lock()
if tokenBucket.index <= 0 {
return false
}
tokenBucket.tokenArray[tokenBucket.index-1] = 0
tokenBucket.index--
return true
}
上面代码就是令牌桶的限流的实现代码了,相对与计数器限流会比较复杂一些,令牌桶限流能够更方便的调整放入令牌的频率和每次获取令牌的个数,甚至可以用令牌桶思想来限制网关入口流量。
漏斗限流
漏斗限流,意思是说在一个漏斗容器中,当请求来临时就从漏斗顶部放入,漏斗底部会以一定的频率流出,当放入的速度大于流出的速度时,漏斗的空间会逐渐减少为0,这时请求会被拒绝,其实就是上面开始时池塘流水的例子。流入速率是随机的,流出速率是固定的,当漏斗满了之后,其实到了一个平滑的阶段,因为流出是固定的,所以你流入也是固定的,相当于请求是匀速通过的
首先定义漏斗限流的结构体,根据漏斗限流原理,需要字段流出速率,漏斗容量,定时器核心字段,这里容量不用具化的数据结构来表示了,采用双指针法,一个流入的指针,一个流出的指针,大家仔细看看设计。
//漏斗限流
type FunnelRateLimiter struct {
interval time.Duration //时间间隔
cap int //漏斗容量
rate int //漏斗流出速率 每秒流多少
head int //放入水的指针
tail int //漏水的指针
ticker *time.Ticker //定时器
}
创建漏斗限流的对象,并且初始化各个字段,同时开启定时器,模拟漏斗流水操作。
//创建漏斗限流结构体
func NewFunnelRateLimiter(cap int, rate int) *FunnelRateLimiter {
limiter := &FunnelRateLimiter{
interval: time.Second * 1,
cap: cap,
rate: rate,
head: 0,
tail: 0,
}
go leakRate(limiter)
return limiter
}
真实的漏斗流水,看流入的总容量减去流出的总容量是否大于流出速率,漏斗限流的核心是保证漏斗尽量空着,这样请求才能流入进来,所以大于的话就往出流走固定速率的请求,否则就把漏斗清空。
//模拟漏斗以一定的流速漏水
func leakRate(limiter *FunnelRateLimiter) {
limiter.ticker = time.NewTicker(limiter.interval)
for {
<-limiter.ticker.C
//根本没有流量,不需要漏(就是漏斗里没有请求,无法流出)
if limiter.tail >= limiter.head {
continue
}
//看漏斗里的剩余的请求是否大于流出的请求,如果大于,就流出这么多
//举个例子,每秒流出100,首先得保证漏斗里有100个
if (limiter.head - limiter.tail) > limiter.rate {
limiter.tail = limiter.tail + limiter.rate
} else {
//否则流出所有(漏斗里只有70个,就把70个流完)
limiter.tail = limiter.head
}
}
}
最后必须有一个判断请求是否允许通过的方法,实则就是判断漏斗容量是否还有空位,也就判断流入总量减去流出总量是否大于总的容量,大于的话代表漏斗已经装不下了,必须限流,否则,请求通过
//是否允许请求通过(主要看漏斗是否满了)
func (limiter *FunnelRateLimiter) IsAllow() bool {
if limiter.head-limiter.tail >= limiter.cap { //说明漏斗满了
return false
}
limiter.head++
return true
}
我们代码实现采用了双变量head,tail,开始都是0,每当有流量进入时,head变量加1,每过一定时间节点tail进行自加rate,当head的值大于减去tail大于cap,就代表漏斗满了,否则漏斗可以处理请求,通俗讲就相当于一个人(head)在前面跑,另一个人(tail)在后面追,当head跑的快时,他们之间的差距有可能达到cap,但是记住,tail不能追上head,最多持平,都是0.
RPC限流到底怎么做的?
微服务盛行的时代,一个Application可能对付发布多个服务(A,B两个服务),一个服务可能存在多个方法(A1,A2,B1,B2),而且一个Application通常会部署多台机器,我们通常的限流可能回对某个服务限流,也可能对某个服务下面的方法限流,一般情况下RPC的控制台中支持限流的可视化,可配置化。
从上图来看,浏览器触发配置中心的限流规则变更,配置中心通知监听了该规则的服务器,这个时候可能是客户端限流,也可能是服务端限流,取决于浏览器上的操作,假设是服务端限流,那么每个服务端启动一个限流算法(可能是上面算法中的任意一个),这个时候是每台机器都在限流,相当于单机限流,各不影响。
第一个问题:我们介绍了三种限流算法,比如计数器限流,会开启一个协程定时检测重置计数变量为0,如果一个应用有很多个服务,是否意味着要开启很多个协程,那么有人说协程轻量级的,没事,但要是Java中的线程呢,怎么解决,思路是延迟重置,服务开始时,设置计数阈值,同时记录当前时间,每当请求来临时,我们只允许在当前时间段内并且计数变量没有到达阈值的请求通过,否则拒绝,当过了当前时间段,我们重置计数变量,这样是不是就不用开启新的协程了,优化完的代码如下
//计数器限流,不用新开协程, 每次判断时,
// 先看当前时间和上次时间差是否大于1秒,如果大于则计数器清零0,重新开始,如果小与1秒,则判断计数器到达阈值,返回false,否则返回true
type CountLimiterNew struct {
count int64
lastTime int64
index *atomic.Int64
nano int64
}
func NewCountLimiterNew(count int64, lastTime int64) *CountLimiterNew {
countLimiterNew := &CountLimiterNew{
count: count,
lastTime: time.Now().UnixNano(),
index: atomic.NewInt64(0),
nano: 1000 * 1000 * 1000,
}
return countLimiterNew
}
func (cl *CountLimiterNew) IsAllowNew() bool {
//已经进入到下一秒中了
if time.Now().UnixNano()-cl.lastTime > cl.nano {
cl.lastTime = time.Now().UnixNano()
cl.index.Store(1)
return true
}
//当前这一秒钟计数器到达阈值了,进行限流
if cl.index.Load() > cl.count {
return false
}
//计数器加1
cl.index.Add(1)
return true
}
第二个问题:上面我们假设是服务端限流,那么到底该用服务端限流还是客户端限流,我们看这样一个示例,有一个A服务,部署了10台机器(相当于10个服务提供者),但调用A服务的有100个消费者(客户端),假设我们每台机器的阈值是1000,你怎么分给100个客户端呢?你也不了解他们的调用量,就比较麻烦,所以一般情况下都是在服务端限流,因为你自己的服务你最清楚。什么时候用客户端限流呢?当你明确的知道某一个客户端调用量非常大,影响了其它客户端的使用,这时你可以指定该客户端的限流规则
第三个问题:我们上面提到的都是单机限流,还是我们的A服务,部署了10台,但有一台机器是1核2G,其余是4核8G的,这时限流就麻烦了,不能用统一标准限流了,那么在分布式应用程序中,有没有分布式限流的方法呢?这里提供几种思路:
- Nginx 层限流,一般http服务需要经过网关,Nginx层相当于网关限流
- Redis限流,redis是线程安全的,redis支持LUA脚本
- 开源组件Hystrix、resilience4j、Sentinel
分布式限流可以单独写好几篇文章了,后面会单开几篇文章写分布式限流的
认真思考,仔细总结
除了上面的三种限流算法,还有很多它们的变种实现,比如滑动时间窗口算法,思考一下该如何实现呢?可以评论区里讨论