zoukankan      html  css  js  c++  java
  • 二.Go微服务--令牌桶

    1. 令牌桶

    1.1 原理

    1. 我们以 r/s 的速度向桶内放置令牌,桶的容量为 b , 如果桶满了令牌将会丢弃
    2. 当请求到达时,我们向桶内获取令牌,如果令牌足够,我们就通过转发请求
    3. 如果桶内的令牌数量不够,那么这个请求会被缓存等待令牌足够时转发,或者是被直接丢弃掉

    由于桶的存在,所以令牌桶算法不仅可以限流还可以应对突发流量的情况

    举个例子:假设我们桶的容量是 100,速度是 10 rps,那么在我们桶满的情况下,如果突然来 100 个请求是可以满足的,但是后续的请求就会被限制到 10 rps

    存在下面两种特殊情况

    • 如果桶的容量为 0,那么相当于禁止请求,因为所有的令牌都被丢弃了
    • 如果令牌放置速率为无穷大,那么相当于没有限制

    令牌桶最常见的实现就是 Go 官方的 golang.org/x/time/rate

    1.2 使用方法

    方法如下

    type Limiter struct {
    	// contains filtered or unexported fields
    }
    
    // 构建一个限流器,r 是每秒放入的令牌数量,b 是桶的大小
    func NewLimiter(r Limit, b int) *Limiter
    
    // 分别返回 b 和 r 的值
    func (lim *Limiter) Burst() int
    func (lim *Limiter) Limit() Limit
    
    // token 消费方法
    func (lim *Limiter) Allow() bool
    func (lim *Limiter) AllowN(now time.Time, n int) bool
    func (lim *Limiter) Reserve() *Reservation
    func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
    func (lim *Limiter) Wait(ctx context.Context) (err error)
    func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
    
    // 动态流控
    func (lim *Limiter) SetBurst(newBurst int)
    func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
    func (lim *Limiter) SetLimit(newLimit Limit)
    func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
    

    1.2.1 初始化令牌桶

    直接调用 NewLimiter(r Limit, b int) 即可, r 表示每秒产生 token 的速度, b 表示桶的大小

    1.2.2 Token 消费

    总共有三种 token 消费的方式,最常用的是使用 Wait 阻塞等待

    Allow

    Allow 就是 AllowN(now,1) 的别名, AllowN 表示截止到 now 这个时间点,是否存在 n 个 token,如果存在那么就返回 true 反之返回 false,如果我们限流比较严格,没有资源就直接丢弃可以使用这个方法

    func (lim *Limiter) Allow() bool
    func (lim *Limiter) AllowN(now time.Time, n int) bool
    

    Reserve
    同理 Reserve 也是 ReserveN(now, 1) 的别名, ReserveN 其实和 AllowN 类似,表示截止到 now 这个时间点,是否存在 n 个 token,只是 AllowN 直接返回 true or false,但是 ReserveN 返回一个 Reservation 对象

    func (lim *Limiter) Reserve() *Reservation
    func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
    

    Reservation 有 5 个方法,通过调用 OK 我们可以知道是否通过等待可以获取到 N 个 token,如果可以通过 Delay 方法我们可以得知需要等待的时间,如果我们不想等了可以调用 Cancel 方法归还 token

    type Reservation
        func (r *Reservation) Cancel()
        func (r *Reservation) CancelAt(now time.Time)
        func (r *Reservation) Delay() time.Duration
        func (r *Reservation) DelayFrom(now time.Time) time.Duration
        func (r *Reservation) OK() bool
    

    Wait
    Wait 是最常用的WaitWaitN(ctx, 1) 的别名, WaitN(ctx, n) 表示如果存在 n 个令牌就直接转发,不存在我们就等,等待存在为止,传入的 ctx 的 Deadline 就是等待的 Deadline

    func (lim *Limiter) Wait(ctx context.Context) (err error)
    func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
    

    1.2.3 动态流控

    通过调用 SetBurstSetLimit 可以动态的设置桶的大小和 token 生产速率,其中 SetBurstAtSetLimitAt 会将传入的时间 now 设置为流控最后的更新时间

    func (lim *Limiter) SetBurst(newBurst int)
    func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
    func (lim *Limiter) SetLimit(newLimit Limit)
    func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
    

    1.3 基于ip的gin限流中间件

    主要就是使用了 sync.map 来为每一个 ip 创建一个 limiter,当然这个 key 也可以是其他的值,例如用户名等

    func NewLimiter(r rate.Limit, b int, t time.Duration) gin.HandlerFunc {
    	limiters := &sync.Map{}
    
    	return func(c *gin.Context) {
    		// 获取限速器
    		// key 除了 ip 之外也可以是其他的,例如 header,user name 等
    		key := c.ClientIP()
    		l, _ := limiters.LoadOrStore(key, rate.NewLimiter(r, b))
    
    		// 这里注意不要直接使用 gin 的 context 默认是没有超时时间的
    		ctx, cancel := context.WithTimeout(c, t)
    		defer cancel()
    
    		if err := l.(*rate.Limiter).Wait(ctx); err != nil {
    			// 这里先不处理日志了,如果返回错误就直接 429
    			c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": err})
    		}
    		c.Next()
    	}
    }
    

    使用的时候只需要 use 一下中间件就可以了

    func main() {
    	e := gin.Default()
    	// 新建一个限速器,允许突发 10 个并发,限速 3rps,超过 500ms 就不再等待
    	e.Use(NewLimiter(3, 10, 500*time.Millisecond))
    	e.GET("ping", func(c *gin.Context) {
    		c.String(http.StatusOK, "pong")
    	})
    	e.Run(":8080")
    }
    

    我们使用 go-stress-testing 来压测一下,20 个并发

     ~/gopath/bin/go-stress-testing -c 20 -n 1 -u http://127.0.0.1:8080/ping
    
    开始启动  并发数:20 请求数:1 请求参数:
    
    ─────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────
      耗时│  并发数│  成功数│  失败数│   qps  │ 最长耗时│  最短耗时│ 平均耗时│  下载字节│ 字节每秒│ 错误码
    ─────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────
       1s│     20│     11│      9│   63.79│  438.48│   45.37│  313.53│     152│     259│200:11;429:9
    
    
    *************************  结果 stat  ****************************
    处理协程数量: 20
    请求总数(并发数*请求数 -c * -n): 20 总请求时间: 0.586 秒 successNum: 11 failureNum: 9
    *************************  结果 end   ****************************
    

    可以发现总共成功了 11 个请求,失败了 9 个,这是因为我们桶的大小是 10 ,所以前 10 个请求都很快就结束了,第 11 个请求等待 333.3 ms 就可以完成,小于超时时间 500ms,所以可以放行,但是后面的请求确是等不了了,所以就都失败了,并且可以看到最后一个成功的请求的耗时为 336.83591ms 而其他的请求耗时都很短

    [GIN-debug] Listening and serving HTTP on :8080
    [GIN] 2021/03/29 - 13:15:55 | 200 |     1.48104ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 429 |    1.107689ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 429 |    1.746222ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 429 |      866.35µs |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 429 |    1.870403ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 429 |    2.231912ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 429 |    1.832506ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 429 |     613.741µs |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |    1.454753ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |     1.37802ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |    1.428062ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |      40.782µs |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |    1.046146ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 429 |      1.7624ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 429 |    1.803124ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |       41.67µs |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |     1.42315ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |    1.371483ms |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |     731.091µs |       127.0.0.1 | GET      "/ping"
    [GIN] 2021/03/29 - 13:15:55 | 200 |   336.83591ms |       127.0.0.1 | GET      "/ping"
    

    1.3 完整代码

    1. demo.main

      package main
      
      import (
      	"context"
      	"fmt"
      	"net/http"
      	"sync"
      	"time"
      
      	"github.com/gin-gonic/gin"
      	"golang.org/x/time/rate"
      )
      
      // NewLimiter, 定义中间件
      func NewLimiter(r rate.Limit, b int, t time.Duration) gin.HandlerFunc {
      	limiters := &sync.Map{}
      
      	return func(c *gin.Context) {
      		// 获取限速器
      		// key 除了 ip 之外也可以是其他的,例如 header,user name 等
      		key := c.ClientIP()
      		l, _ := limiters.LoadOrStore(key, rate.NewLimiter(r, b))
      
      		// 这里注意不要直接使用 gin 的 context 默认是没有超时时间的
      		ctx, cancel := context.WithTimeout(c, t)
      		defer cancel()
      
      		if err := l.(*rate.Limiter).Wait(ctx); err != nil {
      			// 这里先不处理日志了,如果返回错误就直接 429
      			c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": err})
      		}
      		c.Next()
      	}
      }
      
      func main() {
      	e := gin.Default()
      	// 新建一个限速器,允许突发 10 个并发,限速 3rps,超过 500ms 就不再等待
      	e.Use(NewLimiter(3, 10, 500*time.Millisecond))
      
      	e.GET("ping", func(c *gin.Context) {
      		c.String(http.StatusOK, "pong")
      	})
      
      	err := e.Run(":8080")
      	if err != nil {
      		fmt.Print("start server err:", err.Error())
      	}
      }
      
    2. 下载go-stress-test

      wget https://github.91chifun.workers.dev/https://github.com//link1st/go-stress-testing/releases/download/v1.0.3/go-stress-testing-linux
      
    3. 将gostress-tesing添加环境变量

      mv go-stress-testing-linux /usr/local/bin/go-stress-testing
      
    4. 启动测试

       go-stress-testing -c 20 -n 1 -u http://172.20.80.1:8080/ping
      

    2. 参考

    1. https://lailin.xyz/post/go-training-week6-2-token-bucket-1.html
    2. https://github.com/link1st/go-stress-testing#11-go-stress-testing
    ♥永远年轻,永远热泪盈眶♥
  • 相关阅读:
    广义表的创建和遍历
    dev c++ Boost库的安装
    NAT模式
    vmware桥接模式
    smb与samba
    利用Linux的Samba服务模拟NT域
    使用samba进行共享文件操作步骤
    安装chrome
    使用虚拟机上网第二步
    TCP协议三次握手过程分析
  • 原文地址:https://www.cnblogs.com/failymao/p/15213080.html
Copyright © 2011-2022 走看看