漏桶算法
1、算法思想
与令牌桶是“反向”的算法,当有请求到来时先放到木桶中,worker以固定的速度从木桶中取出请求进行响应。如果木桶已经满了,直接返回请求频率超限的错误码或者页面
2、使用场景
流量最均匀的限流方式,一般用于流量“整形”,例如保护数据库的限流。先把对数据库的访问加入到木桶中,worker再以db能够承受的qps从木桶中取出请求,去访问数据库。不太适合电商抢购和微博出现热点事件等场景的限流,意识应对突发流量不是很灵活,二是为每个user_id/ip维护一个队列(木桶),worker从这些队列中拉取任务,资源的消耗会比较大。
3、go语言实现
通常使用队列来实现,在go语言中可以通过buffered channel来快速实现,任务加入channel,开启一定数量的worker从channel中获取任务执行。
代码如下:
package main import ( "fmt" "sync" "time" ) // 每个请求来了,把需要执行的业务逻辑封装成Task,放入木桶,等待worker取出执行 type Task struct { handler func() Result // worker从木桶中取出请求对象后要执行的业务逻辑函数 resChan chan Result // 等待worker执行并返回结果的channel taskID int } func handler() Result { time.Sleep(300 * time.Millisecond) return Result{} } func NewTask(id int) Task { return Task{ handler: handler, resChan: make(chan Result), taskID: id, } } // 漏桶 type LeakyBucket struct { BucketSize int // 木桶的大小 NumWorker int // 同时从木桶中获取任务执行的worker数量 bucket chan Task // 存方任务的木桶 } func NewLeakyBucket(bucketSize int, numWorker int) *LeakyBucket { return &LeakyBucket{ BucketSize: bucketSize, NumWorker: numWorker, bucket: make(chan Task, bucketSize), } } func (b *LeakyBucket) validate(task Task) bool { // 如果木桶已经满了,返回false select { case b.bucket <- task: default: fmt.Printf("request[id=%d] is refused ", task.taskID) return false } // 等待worker执行 <-task.resChan fmt.Printf("request[id=%d] is run ", task.taskID) return true } func (b *LeakyBucket) Start() { // 开启worker从木桶拉取任务执行 go func() { for i := 0; i < b.NumWorker; i++ { go func() { for { task := <-b.bucket result := task.handler() task.resChan <- result } }() } }() } // 封装业务逻辑的执行结果 type Result struct { } func main() { bucket := NewLeakyBucket(10, 4) bucket.Start() var wg sync.WaitGroup for i := 0; i < 20; i++ { wg.Add(1) go func(id int) { defer wg.Done() task := NewTask(id) bucket.validate(task) }(i) } wg.Wait() }
运行结果如下:
request[id=2] is refused
request[id=13] is refused
request[id=14] is refused
request[id=17] is refused
request[id=16] is refused
request[id=18] is refused
request[id=0] is run
request[id=3] is run
request[id=1] is run
request[id=19] is run
request[id=4] is run
request[id=6] is run
request[id=5] is run
request[id=11] is run
request[id=7] is run
request[id=8] is run
request[id=9] is run
request[id=10] is run
request[id=12] is run
request[id=15] is run
令牌桶算法
1、算法思想
想象有一个木桶,以固定的速度往木桶里面加入令牌,木桶满了则不能再加入令牌。服务收到请求时尝试从木桶中取出一个令牌,如果能够得到令牌则继续执行后续的业务逻辑;如果没有得到令牌,直接返回访问频率超限的错误码或页面等,不继续执行后续的业务逻辑
2、特点
由于木桶内只要有令牌,请求就可以被处理,所以令牌桶算法可以支持突发流量。同时由于往木桶添加令牌的速度是固定的,且木桶的容量有上限,所以单位时间内处理的请求数也能够得到控制,起到限流的目的。假设加入令牌的速度为1 token/10ms,桶的容量为500,在请求比较少的时候(小于10ms一个请求),木桶可以先“攒”一些令牌(最多500个)。当有突发流量时,一下把木桶内的令牌取空,也就是有500个在并发执行的业务逻辑,之后要等每10ms不中一个新的令牌才能接收一个新的请求。
2、参数设置
木桶的容量,考虑到业务逻辑的资源消耗和机器能承载并发处理多少业务逻辑。生成令牌的速度,太慢的话起不到“攒”令牌应对突发流量的效果。
3、适用场景
适合电商抢购或者微博出现热点事件这种场景,因为在限流的同时可以应对一定的突发流量。如果采用均匀速度处理请求的算法,在发生热点时间的时候,会造成大量的用户无法访问,对用户体验的损害比较大。
4、go语言实现
假设每100ms产生一个令牌,按user_id/IP记录访问,最近一次访问的时间戳 t_last 和令牌数,每次请求时如果 now - last > 100ms,增加(now - last)/100ms个令牌。然后,如果令牌数 > 0,令牌数 -1 继续执行后续的业务逻辑,否则返回请求频率超限的错误码或页面。
go run结果如下:
package main import ( "fmt" "sync" "time" ) // 并发访问同一个user_id/ip的记录需要上锁 var recordMu map[string]*sync.RWMutex func init() { recordMu = make(map[string]*sync.RWMutex) } func max(a, b int) int { if a > b { return a } return b } type TokenBucket struct { BucketSize int // 木桶内的容量:最多可以存放多少个令牌 TokenRate time.Duration // 多长时间生成一个令牌 records map[string]*record // 报错user_id/ip的访问记录 } // 上次访问时的时间戳和令牌数 type record struct { last time.Time token int } func NewTokenBucket(bucketSize int, tokenRate time.Duration) *TokenBucket { return &TokenBucket{ BucketSize: bucketSize, TokenRate: tokenRate, records: make(map[string]*record), } } func (t *TokenBucket) getUidOrIp() string { // 获取请求用户的user_id或者ip地址 return "127.0.0.1" } // 获取这个user_id/ip上次访问时的时间戳和令牌数 func (t *TokenBucket) getRecord(uidOrIp string) *record { if r, ok := t.records[uidOrIp]; ok { return r } return &record{} } // 保存user_id/ip最近一次请求时的时间戳和令牌数量 func (t *TokenBucket) storeRecord(uidOrIp string, r *record) { t.records[uidOrIp] = r } // 验证是否能获取一个令牌 func (t *TokenBucket) validate(uidOrIp string) bool { // 并发修改同一个用户的记录上写锁 rl, ok := recordMu[uidOrIp] if !ok { var mu sync.RWMutex rl = &mu recordMu[uidOrIp] = rl } rl.Lock() defer rl.Unlock() r := t.getRecord(uidOrIp) now := time.Now() if r.last.IsZero() { // 第一次访问初始化为最大令牌数 r.last, r.token = now, t.BucketSize } else { if r.last.Add(t.TokenRate).Before(now) { // 如果与上次请求的间隔超过了token rate // 则增加令牌,更新last r.token += max(int(now.Sub(r.last) / t.TokenRate), t.BucketSize) r.last = now } } var result bool if r.token > 0 { // 如果令牌数大于1,取走一个令牌,validate结果为true r.token-- result = true } // 保存最新的record t.storeRecord(uidOrIp, r) return result } // 返回是否被限流 func (t *TokenBucket) IsLimited() bool { return !t.validate(t.getUidOrIp()) } func main() { tokenBucket := NewTokenBucket(5, 100*time.Millisecond) for i := 0; i< 6; i++ { fmt.Println(tokenBucket.IsLimited()) } time.Sleep(100 * time.Millisecond) fmt.Println(tokenBucket.IsLimited()) }
go run结果如下:
false
false
false
false
false
true
false
滑动时间窗口算法
1、算法思想
滑动时间窗口算法,是从对普通时间窗口计数的优化。使用普通时间窗口时,我们会为每个user_id/ip维护一个KV:uidOrIp:timestamp_requestCount。假设限制1秒1000个请求,那么第100ms有一个请求,这个KV变成uidOrIp: timestamp_1,第200ms有一个请求,我们先比较距离记录的timestamp有没有超过1s,如果没有只更新count,此时KV变成 uidOrIp: timestamp_2。当第11--ms来一个请求时,更新记录中的timestamp并重置技术,KV变成uidOrIp: newtimestamp_1。
普通时间窗口有一个问题,假设有500个请求集中在前1s的后100ms,500个请求集中在后1s的前100ms,其实在这200ms就已经请求超限了,但是由于时间窗口每经过1s就会重置计数,就无法识别到此时请求超限。对于滑动时间窗口,我们可以把1ms的时间窗口划分为10个time slot,每个time slot统计某个100ms的请求数量。每经过100ms,有一个新的time slot加入窗口,早于当前时间100ms的time slot出窗口。窗口内最多维护10个time slot,储存空间的消耗同样是比较低的。
2、适用场景
与令牌桶一样,有应对突发流量的能力。
3、go语言实现
主要就是实现sliding window 算法。可以参考Bilibili开元的kratos框架里circuit breaker用循环列表保存time slot对象的实现,他们这个实现的好处是不用频繁的创建和销毁time slot对象。
下面给出一个简单的基本实现:
package main import ( "fmt" "sync" "time" ) var winMu map[string]*sync.RWMutex func init() { winMu = make(map[string]*sync.RWMutex) } type timeSlot struct { timestamp time.Time // 这个timeSlot的时间起点 count int // 落在这个timeSlot内的请求数 } func countReq(win []*timeSlot) int { var count int for _, ts := range win { count += ts.count } return count } type SlidingWindowLimiter struct { SlotDuration time.Duration // time slot的长度 WinDuration time.Duration // sliding window的长度 numSlots int // window内最多有多少个slot windows map[string][]*timeSlot maxReq int // win duration内允许的最大请求数 } func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter { return &SlidingWindowLimiter{ SlotDuration: slotDuration, WinDuration: winDuration, numSlots: int(winDuration / slotDuration), windows: make(map[string][]*timeSlot), maxReq: maxReq, } } // 获取user_id/ip的时间窗口 func (l *SlidingWindowLimiter) getWindow(uidOrIp string) []*timeSlot { win, ok := l.windows[uidOrIp] if !ok { win = make([]*timeSlot, 0, l.numSlots) } return win } func (l *SlidingWindowLimiter) storeWindow(uidOrIp string, win []*timeSlot) { l.windows[uidOrIp] = win } func (l *SlidingWindowLimiter) validate(uidOrIp string) bool { // 同一user_id/ip并发安全 mu, ok := winMu[uidOrIp] if !ok { var m sync.RWMutex mu = &m winMu[uidOrIp] = mu } mu.Lock() defer mu.Unlock() win := l.getWindow(uidOrIp) now := time.Now() // 已经过期的time slot移出时间窗 timeoutOffset := -1 for i, ts := range win { if ts.timestamp.Add(l.WinDuration).After(now) { break } timeoutOffset = i } if timeoutOffset > -1 { win = win[timeoutOffset+1:] } // 判断请求是否超限 var result bool if countReq(win) < l.maxReq { result = true } // 记录这次的请求数 var lastSlot *timeSlot if len(win) > 0 { lastSlot = win[len(win)-1] if lastSlot.timestamp.Add(l.SlotDuration).Before(now) { lastSlot = &timeSlot{timestamp: now, count: 1} win = append(win, lastSlot) } else { lastSlot.count++ } } else { lastSlot = &timeSlot{timestamp: now, count: 1} win = append(win, lastSlot) } l.storeWindow(uidOrIp, win) return result } func (l *SlidingWindowLimiter) getUidOrIp() string { return "127.0.0.1" } func (l *SlidingWindowLimiter) IsLimited() bool { return !l.validate(l.getUidOrIp()) } func main() { limiter := NewSliding(100*time.Millisecond, time.Second, 10) for i := 0; i < 5; i++ { fmt.Println(limiter.IsLimited()) } time.Sleep(100 * time.Millisecond) for i := 0; i < 5; i++ { fmt.Println(limiter.IsLimited()) } fmt.Println(limiter.IsLimited()) for _, v := range limiter.windows[limiter.getUidOrIp()] { fmt.Println(v.timestamp, v.count) } fmt.Println("a thousand years later...") time.Sleep(time.Second) for i := 0; i < 7; i++ { fmt.Println(limiter.IsLimited()) } for _, v := range limiter.windows[limiter.getUidOrIp()] { fmt.Println(v.timestamp, v.count) } }
go run结果如下:
false
false
false
false
false
false
false
false
false
false
true
2021-06-10 19:44:26.4902949 +0800 CST m=+0.001994501 5
2021-06-10 19:44:26.5914698 +0800 CST m=+0.103169401 6
a thousand years later...
false
false
false
false
false
false
false
2021-06-10 19:44:27.6020188 +0800 CST m=+1.113718401 7