zoukankan      html  css  js  c++  java
  • golang官方包限流器使用和原理(golang.org/x/time/rate)

    限流器模型

    golang.org/x/time/rate 限流器目前提供了一种令牌桶算法的的限流器。

    • 请求需要拿到令牌才能接着往下执行, 逻辑上有一个令牌桶,桶的最大容量是固定的。

    • 当桶内令牌数 小于 桶的最大容量时, 以固定的频率向桶内增加令牌直至令牌数满。

    • 每个请求理论上消耗一个令牌(实际上提供了的方法每次可以消耗大于一个的令牌)

    • 限流器初始化时 令牌桶是满的

    简单例子

    package main
    
    import (
        "fmt"
        "golang.org/x/time/rate"
        "time"
    )
    
    func main() {
        limiter := rate.NewLimiter(rate.Every(time.Millisecond * 31), 2)
        //time.Sleep(time.Second)
        for i := 0; i < 10; i++ {
            var ok bool
            if limiter.Allow() {
                ok = true
            }
            time.Sleep(time.Millisecond * 20)
            fmt.Println(ok, limiter.Burst())
        }
    }
    

    输出

    true 2
    true 2
    true 2
    true 2
    false 2
    true 2
    true 2
    false 2
    true 2
    true 2
    

    可以看出 一开始桶内令牌数是满的。 由于生成令牌的间隔比请求的间隔多了11ms, 所以到后面每两个请求后就会失败一次。

    限流器实现原理

    限流器的数据结构和New函数如下:

    type Limiter struct {
        limit Limit
        burst int
    
        mu     sync.Mutex
        tokens float64
        // last is the last time the limiter's tokens field was updated
        last time.Time
        // lastEvent is the latest time of a rate-limited event (past or future)
        lastEvent time.Time
    }
    
    func NewLimiter(r Limit, b int) *Limiter {
        return &Limiter{
            limit: r,
            burst: b,
        }
    }
    

    可见, 虽然在逻辑上, 令牌桶在没满的情况下, 是不断往里面以一定时间间隔添加令牌, 但代码实现上并没有这样的一个协程。 实际上桶剩余令牌的更新是推迟在每次消费的时候进行计算的。

    核心函数

    func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation
    

    当调用 limiter.Allow()的时候, 底层调用实际即使该函数, 入参now=当前时间, n=1, maxFutrueReserve=0

    maxFutrueReserve在Wait()方法内调用时, 值会大于0。

    函数前半段: 当limit变量无限大(该变量是新增令牌的时间间隔的倒数, 无限大即表示无间隔), 直接返回TRUE即可

        lim.mu.Lock()
    
        if lim.limit == Inf {
            lim.mu.Unlock()
            return Reservation{
                ok:        true,
                lim:       lim,
                tokens:    n,
                timeToAct: now,
            }
        }
    

    之调用lim.advance(now) 函数, 获取上次令牌桶更新时间和此时桶内令牌数应该是多少(这一步就是上文提到的桶剩余令牌的延迟计算)

    • 先检查传入的时间是否先于数据结构内记录的上次令牌数更新时间(并发的情况下, now变量的初始化未在锁内执行, 这种情况很有可能发生), 如果是则将上次令牌时更新时间替换为当前时间
    • 获取上次令牌数更新时间到现在的时间间隔, 如果超过让令牌桶加到满的时间间隔,就使用让令牌桶加到满的时间间隔。这个变量用于计算增加令牌桶的令牌数, 由于有容量限制,过大没有意义
    • 计算令牌桶此时的个数, 返回
    // advance calculates and returns an updated state for lim resulting from the passage of time.
    // lim is not changed.
    func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
        last := lim.last
        if now.Before(last) {
            last = now
        }
    
        // Avoid making delta overflow below when last is very old.
        maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
        elapsed := now.Sub(last)
        if elapsed > maxElapsed {
            elapsed = maxElapsed
        }
    
        // Calculate the new number of tokens, due to time that passed.
        delta := lim.limit.tokensFromDuration(elapsed)
        tokens := lim.tokens + delta
        if burst := float64(lim.burst); tokens > burst {
            tokens = burst
        }
    
        return now, last, tokens
    }
    

    函数后半段:

    • 计算这次消耗的令牌数, 但是不是直接简单比较消耗后剩余令牌数大于0, 而是计算还需要多少时间令牌桶能生成满足需要的令牌数(如果消耗后令牌数仍大于0 则需要时间就是0)。 将该值和入参maxFutureReserve比较再得出结果。即当前令牌数可能是负的。 这么做的原因是限流器还提供了Wait方法, 当前令牌不足时可以阻塞等待至有令牌为止(提前消费令牌 然后阻塞一段时间), 而不是像Allow方法一样立即返回False
       // Calculate the remaining number of tokens resulting from the request.
        tokens -= float64(n)
    
        // Calculate the wait duration
        var waitDuration time.Duration
        if tokens < 0 {
            waitDuration = lim.limit.durationFromTokens(-tokens)
        }
    
        // Decide result
        ok := n <= lim.burst && waitDuration <= maxFutureReserve
    
        // Prepare reservation
        r := Reservation{
            ok:    ok,
            lim:   lim,
            limit: lim.limit,
        }
        if ok {
            r.tokens = n
          	r.timeToAct = now.Add(waitDuration)
        }
    
        // Update state
        if ok {
            lim.last = now
            lim.tokens = tokens
            lim.lastEvent = r.timeToAct
        } else {
            lim.last = last
        }
    
        lim.mu.Unlock()
        return r
    }
    
  • 相关阅读:
    嵌入式工程师为何不用学习C++语言?
    汽车电子基础知识
    为什么寄存器比存储器快?
    数字信号和模拟信号
    JLink和JTag的区别
    C++中static关键字作用总结
    所谓高情商,就是会说话
    汽车电子缩略语及术语
    卷积
    算法整理
  • 原文地址:https://www.cnblogs.com/Me1onRind/p/13191506.html
Copyright © 2011-2022 走看看