zoukankan      html  css  js  c++  java
  • Golang实现请求限流的几种办法

    简单的并发控制

    利用 channel 的缓冲设定,我们就可以来实现并发的限制。我们只要在执行并发的同时,往一个带有缓冲的 channel 里写入点东西(随便写啥,内容不重要)。让并发的 goroutine在执行完成后把这个 channel 里的东西给读走。这样整个并发的数量就讲控制在这个 channel的缓冲区大小上。

    比如我们可以用一个 bool 类型的带缓冲 channel 作为并发限制的计数器。

    1
    chLimit := make(chan bool, 1)

    然后在并发执行的地方,每创建一个新的 goroutine,都往 chLimit 里塞个东西。

    1
    2
    3
    4
    5
    for i, sleeptime := range input {
      chs[i] = make(chan string, 1)
      chLimit <- true
      go limitFunc(chLimit, chs[i], i, sleeptime, timeout)
    }

    这里通过 go 关键字并发执行的是新构造的函数。他在执行完后,会把 chLimit的缓冲区里给消费掉一个。

    1
    2
    3
    4
    limitFunc := func(chLimit chan bool, ch chan string, task_id, sleeptime, timeout int) {
      Run(task_id, sleeptime, timeout, ch)
      <-chLimit
    }

    这样一来,当创建的 goroutine 数量到达 chLimit 的缓冲区上限后。主 goroutine 就挂起阻塞了,直到这些 goroutine 执行完毕,消费掉了 chLimit 缓冲区中的数据,程序才会继续创建新的 goroutine 。我们并发数量限制的目的也就达到了。

    例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    package main
      
    import (
      "fmt"
      "time"
    )
      
    func Run(task_id, sleeptime, timeout int, ch chan string) {
      ch_run := make(chan string)
      go run(task_id, sleeptime, ch_run)
      select {
      case re := <-ch_run:
        ch <- re
      case <-time.After(time.Duration(timeout) * time.Second):
        re := fmt.Sprintf("task id %d , timeout", task_id)
        ch <- re
      }
    }
      
    func run(task_id, sleeptime int, ch chan string) {
      
      time.Sleep(time.Duration(sleeptime) * time.Second)
      ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime)
      return
    }
      
    func main() {
      input := []int{3, 2, 1}
      timeout := 2
      chLimit := make(chan bool, 1)
      chs := make([]chan string, len(input))
      limitFunc := func(chLimit chan bool, ch chan string, task_id, sleeptime, timeout int) {
        Run(task_id, sleeptime, timeout, ch)
        <-chLimit
      }
      startTime := time.Now()
      fmt.Println("Multirun start")
      for i, sleeptime := range input {
        chs[i] = make(chan string, 1)
        chLimit <- true
        go limitFunc(chLimit, chs[i], i, sleeptime, timeout)
      }
      
      for _, ch := range chs {
        fmt.Println(<-ch)
      }
      endTime := time.Now()
      fmt.Printf("Multissh finished. Process time %s. Number of task is %d", endTime.Sub(startTime), len(input))
    }

    运行结果:

    Multirun start      task id 0 , timeout      task id 1 , timeout      task id 2 , sleep 1 second      Multissh finished. Process time 5s. Number of task is 3

    如果修改并发限制为2:

    1
    chLimit := make(chan bool, 2)

    运行结果:

    Multirun start     task id 0 , timeout     task id 1 , timeout     task id 2 , sleep 1 second     Multissh finished. Process time 3s. Number of task is 3

    使用计数器实现请求限流

    限流的要求是在指定的时间间隔内,server 最多只能服务指定数量的请求。实现的原理是我们启动一个计数器,每次服务请求会把计数器加一,同时到达指定的时间间隔后会把计数器清零;这个计数器的实现代码如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    type RequestLimitService struct {
      Interval time.Duration
      MaxCount int
      Lock   sync.Mutex
      ReqCount int
    }
      
    func NewRequestLimitService(interval time.Duration, maxCnt int) *RequestLimitService {
      reqLimit := &RequestLimitService{
        Interval: interval,
        MaxCount: maxCnt,
      }
      
      go func() {
        ticker := time.NewTicker(interval)
        for {
          <-ticker.C
          reqLimit.Lock.Lock()
          fmt.Println("Reset Count...")
          reqLimit.ReqCount = 0
          reqLimit.Lock.Unlock()
        }
      }()
      
      return reqLimit
    }
      
    func (reqLimit *RequestLimitService) Increase() {
      reqLimit.Lock.Lock()
      defer reqLimit.Lock.Unlock()
      
      reqLimit.ReqCount += 1
    }
      
    func (reqLimit *RequestLimitService) IsAvailable() bool {
      reqLimit.Lock.Lock()
      defer reqLimit.Lock.Unlock()
      
      return reqLimit.ReqCount < reqLimit.MaxCount
    }

    在服务请求的时候, 我们会对当前计数器和阈值进行比较,只有未超过阈值时才进行服务:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    var RequestLimit = NewRequestLimitService(10 * time.Second, 5)
      
    func helloHandler(w http.ResponseWriter, r *http.Request) {
      if RequestLimit.IsAvailable() {
        RequestLimit.Increase()
        fmt.Println(RequestLimit.ReqCount)
        io.WriteString(w, "Hello world! ")
      } else {
        fmt.Println("Reach request limiting!")
        io.WriteString(w, "Reach request limit! ")
      }
    }
      
    func main() {
      fmt.Println("Server Started!")
      http.HandleFunc("/", helloHandler)
      http.ListenAndServe(":8000", nil)
    }

    完整代码 url

    使用golang官方包实现httpserver频率限制

    使用golang来编写httpserver时,可以使用官方已经有实现好的包:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import(
      "fmt"
      "net"
      "golang.org/x/net/netutil"
    )
      
    func main() {
      l, err := net.Listen("tcp", "127.0.0.1:0")
      if err != nil {
        fmt.Fatalf("Listen: %v", err)
      }
      defer l.Close()
      l = LimitListener(l, max)
       
      http.Serve(l, http.HandlerFunc())
       
      //bla bla bla.................
    }

    源码[url] ( https://github.com/golang/net/blob/master/netutil/listen.go ),基本思路就是为连接数计数,通过make chan来建立一个最大连接数的channel, 每次accept就+1,close时候就-1. 当到达最大连接数时,就等待空闲连接出来之后再accept。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    // Copyright 2013 The Go Authors. All rights reserved.
    // Use of this source code is governed by a BSD-style
    // license that can be found in the LICENSE file.
      
    // Package netutil provides network utility functions, complementing the more
    // common ones in the net package.
    package netutil // import "golang.org/x/net/netutil"
      
    import (
      "net"
      "sync"
    )
      
    // LimitListener returns a Listener that accepts at most n simultaneous
    // connections from the provided Listener.
    func LimitListener(l net.Listener, n int) net.Listener {
      return &limitListener{
        Listener: l,
        sem:   make(chan struct{}, n),
        done:   make(chan struct{}),
      }
    }
      
    type limitListener struct {
      net.Listener
      sem    chan struct{}
      closeOnce sync.Once   // ensures the done chan is only closed once
      done   chan struct{} // no values sent; closed when Close is called
    }
      
    // acquire acquires the limiting semaphore. Returns true if successfully
    // accquired, false if the listener is closed and the semaphore is not
    // acquired.
    func (l *limitListener) acquire() bool {
      select {
      case <-l.done:
        return false
      case l.sem <- struct{}{}:
        return true
      }
    }
    func (l *limitListener) release() { <-l.sem }
      
    func (l *limitListener) Accept() (net.Conn, error) {
      //如果sem满了,就会阻塞在这
      acquired := l.acquire()
      // If the semaphore isn't acquired because the listener was closed, expect
      // that this call to accept won't block, but immediately return an error.
      c, err := l.Listener.Accept()
      if err != nil {
        if acquired {
          l.release()
        }
        return nil, err
      }
      return &limitListenerConn{Conn: c, release: l.release}, nil
    }
      
    func (l *limitListener) Close() error {
      err := l.Listener.Close()
      l.closeOnce.Do(func() { close(l.done) })
      return err
    }
      
    type limitListenerConn struct {
      net.Conn
      releaseOnce sync.Once
      release   func()
    }
      
    func (l *limitListenerConn) Close() error {
      err := l.Conn.Close()
      //close时释放占用的sem
      l.releaseOnce.Do(l.release)
      return err
    }

    使用Token Bucket(令牌桶算法)实现请求限流

    在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流!为了保证在业务高峰期,线上系统也能保证一定的弹性和稳定性,最有效的方案就是进行服务降级了,而限流就是降级系统最常采用的方案之一。

    这里为大家推荐一个开源库 https://github.com/didip/tollbooth ,但是,如果您想要一些简单的、轻量级的或者只是想要学习的东西,实现自己的中间件来处理速率限制并不困难。今天我们就来聊聊如何实现自己的一个限流中间件

    首先我们需要安装一个提供了 Token bucket (令牌桶算法)的依赖包,上面提到的toolbooth 的实现也是基于它实现的:

    1
    $ go get golang.org/x/time/rate

    Demo代码的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    package main
      
    import (
      "net/http"
      "golang.org/x/time/rate"
    )
      
    var limiter = rate.NewLimiter(2, 5)
    func limit(next http.Handler) http.Handler {
      return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        if limiter.Allow() == false {
          http.Error(w, http.StatusText(429), http.StatusTooManyRequests)
          return
        }
        next.ServeHTTP(w, r)
      })
    }
      
    func main() {
      mux := http.NewServeMux()
      mux.HandleFunc("/", okHandler)
      // Wrap the servemux with the limit middleware.
      http.ListenAndServe(":4000", limit(mux))
    }
      
    func okHandler(w http.ResponseWriter, r *http.Request) {
      w.Write([]byte("OK"))
    }

    算法描述:用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中(每秒会有r个令牌放入桶中),桶中最多可以存放b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃;

    实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    334
    335
    336
    337
    338
    339
    340
    341
    342
    343
    344
    345
    346
    347
    348
    349
    350
    351
    352
    353
    // Copyright 2015 The Go Authors. All rights reserved.
    // Use of this source code is governed by a BSD-style
    // license that can be found in the LICENSE file.
    // Package rate provides a rate limiter.
    package rate
      
    import (
      "fmt"
      "math"
      "sync"
      "time"
      
      "golang.org/x/net/context"
    )
      
    // Limit defines the maximum frequency of some events.
    // Limit is represented as number of events per second.
    // A zero Limit allows no events.
    type Limit float64
      
    // Inf is the infinite rate limit; it allows all events (even if burst is zero).
    const Inf = Limit(math.MaxFloat64)
      
    // Every converts a minimum time interval between events to a Limit.
    func Every(interval time.Duration) Limit {
      if interval <= 0 {
        return Inf
      }
      return 1 / Limit(interval.Seconds())
    }
      
    // A Limiter controls how frequently events are allowed to happen.
    // It implements a "token bucket" of size b, initially full and refilled
    // at rate r tokens per second.
    // Informally, in any large enough time interval, the Limiter limits the
    // rate to r tokens per second, with a maximum burst size of b events.
    // As a special case, if r == Inf (the infinite rate), b is ignored.
    // See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
    //
    // The zero value is a valid Limiter, but it will reject all events.
    // Use NewLimiter to create non-zero Limiters.
    //
    // Limiter has three main methods, Allow, Reserve, and Wait.
    // Most callers should use Wait.
    //
    // Each of the three methods consumes a single token.
    // They differ in their behavior when no token is available.
    // If no token is available, Allow returns false.
    // If no token is available, Reserve returns a reservation for a future token
    // and the amount of time the caller must wait before using it.
    // If no token is available, Wait blocks until one can be obtained
    // or its associated context.Context is canceled.
    //
    // The methods AllowN, ReserveN, and WaitN consume n tokens.
    type Limiter struct {
      //maximum token, token num per second
      limit Limit
      //burst field, max token num
      burst int
      mu  sync.Mutex
      //tokens num, change
      tokens float64
      // last is the last time the limiter's tokens field was updated
      last time.Time
      // lastEvent is the latest time of a rate-limited event (past or future)
      lastEvent time.Time
    }
      
    // Limit returns the maximum overall event rate.
    func (lim *Limiter) Limit() Limit {
      lim.mu.Lock()
      defer lim.mu.Unlock()
      return lim.limit
    }
      
    // Burst returns the maximum burst size. Burst is the maximum number of tokens
    // that can be consumed in a single call to Allow, Reserve, or Wait, so higher
    // Burst values allow more events to happen at once.
    // A zero Burst allows no events, unless limit == Inf.
    func (lim *Limiter) Burst() int {
      return lim.burst
    }
      
    // NewLimiter returns a new Limiter that allows events up to rate r and permits
    // bursts of at most b tokens.
    func NewLimiter(r Limit, b int) *Limiter {
      return &Limiter{
        limit: r,
        burst: b,
      }
    }
      
    // Allow is shorthand for AllowN(time.Now(), 1).
    func (lim *Limiter) Allow() bool {
      return lim.AllowN(time.Now(), 1)
    }
      
    // AllowN reports whether n events may happen at time now.
    // Use this method if you intend to drop / skip events that exceed the rate limit.
    // Otherwise use Reserve or Wait.
    func (lim *Limiter) AllowN(now time.Time, n int) bool {
      return lim.reserveN(now, n, 0).ok
    }
      
    // A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
    // A Reservation may be canceled, which may enable the Limiter to permit additional events.
    type Reservation struct {
      ok   bool
      lim  *Limiter
      tokens int
      //This is the time to action
      timeToAct time.Time
      // This is the Limit at reservation time, it can change later.
      limit Limit
    }
      
    // OK returns whether the limiter can provide the requested number of tokens
    // within the maximum wait time. If OK is false, Delay returns InfDuration, and
    // Cancel does nothing.
    func (r *Reservation) OK() bool {
      return r.ok
    }
      
    // Delay is shorthand for DelayFrom(time.Now()).
    func (r *Reservation) Delay() time.Duration {
      return r.DelayFrom(time.Now())
    }
      
    // InfDuration is the duration returned by Delay when a Reservation is not OK.
    const InfDuration = time.Duration(1<<63 - 1)
      
    // DelayFrom returns the duration for which the reservation holder must wait
    // before taking the reserved action. Zero duration means act immediately.
    // InfDuration means the limiter cannot grant the tokens requested in this
    // Reservation within the maximum wait time.
    func (r *Reservation) DelayFrom(now time.Time) time.Duration {
      if !r.ok {
        return InfDuration
      }
      delay := r.timeToAct.Sub(now)
      if delay < 0 {
        return 0
      }
      return delay
    }
      
    // Cancel is shorthand for CancelAt(time.Now()).
    func (r *Reservation) Cancel() {
      r.CancelAt(time.Now())
      return
    }
      
    // CancelAt indicates that the reservation holder will not perform the reserved action
    // and reverses the effects of this Reservation on the rate limit as much as possible,
    // considering that other reservations may have already been made.
    func (r *Reservation) CancelAt(now time.Time) {
      if !r.ok {
        return
      }
      r.lim.mu.Lock()
      defer r.lim.mu.Unlock()
      if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
        return
      }
      // calculate tokens to restore
      // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
      // after r was obtained. These tokens should not be restored.
      restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
      if restoreTokens <= 0 {
        return
      }
      // advance time to now
      now, _, tokens := r.lim.advance(now)
      // calculate new number of tokens
      tokens += restoreTokens
      if burst := float64(r.lim.burst); tokens > burst {
        tokens = burst
      }
      // update state
      r.lim.last = now
      r.lim.tokens = tokens
      if r.timeToAct == r.lim.lastEvent {
        prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
        if !prevEvent.Before(now) {
          r.lim.lastEvent = prevEvent
        }
      }
      return
    }
      
    // Reserve is shorthand for ReserveN(time.Now(), 1).
    func (lim *Limiter) Reserve() *Reservation {
      return lim.ReserveN(time.Now(), 1)
    }
      
    // ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
    // The Limiter takes this Reservation into account when allowing future events.
    // ReserveN returns false if n exceeds the Limiter's burst size.
    // Usage example:
    //  r, ok := lim.ReserveN(time.Now(), 1)
    //  if !ok {
    //   // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
    //  }
    //  time.Sleep(r.Delay())
    //  Act()
    // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
    // If you need to respect a deadline or cancel the delay, use Wait instead.
    // To drop or skip events exceeding rate limit, use Allow instead.
    func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
      r := lim.reserveN(now, n, InfDuration)
      return &r
    }
      
    // Wait is shorthand for WaitN(ctx, 1).
    func (lim *Limiter) Wait(ctx context.Context) (err error) {
      return lim.WaitN(ctx, 1)
    }
      
    // WaitN blocks until lim permits n events to happen.
    // It returns an error if n exceeds the Limiter's burst size, the Context is
    // canceled, or the expected wait time exceeds the Context's Deadline.
    func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
      if n > lim.burst {
        return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
      }
      // Check if ctx is already cancelled
      select {
      case <-ctx.Done():
        return ctx.Err()
      default:
      }
      // Determine wait limit
      now := time.Now()
      waitLimit := InfDuration
      if deadline, ok := ctx.Deadline(); ok {
        waitLimit = deadline.Sub(now)
      }
      // Reserve
      r := lim.reserveN(now, n, waitLimit)
      if !r.ok {
        return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
      }
      // Wait
      t := time.NewTimer(r.DelayFrom(now))
      defer t.Stop()
      select {
      case <-t.C:
        // We can proceed.
        return nil
      case <-ctx.Done():
        // Context was canceled before we could proceed. Cancel the
        // reservation, which may permit other events to proceed sooner.
        r.Cancel()
        return ctx.Err()
      }
    }
      
    // SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
    func (lim *Limiter) SetLimit(newLimit Limit) {
      lim.SetLimitAt(time.Now(), newLimit)
    }
      
    // SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
    // or underutilized by those which reserved (using Reserve or Wait) but did not yet act
    // before SetLimitAt was called.
    func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) {
      lim.mu.Lock()
      defer lim.mu.Unlock()
      now, _, tokens := lim.advance(now)
      lim.last = now
      lim.tokens = tokens
      lim.limit = newLimit
    }
      
    // reserveN is a helper method for AllowN, ReserveN, and WaitN.
    // maxFutureReserve specifies the maximum reservation wait duration allowed.
    // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
    func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
      lim.mu.Lock()
      defer lim.mu.Unlock()
      if lim.limit == Inf {
        return Reservation{
          ok:    true,
          lim:    lim,
          tokens:  n,
          timeToAct: now,
        }
      }
      now, last, tokens := lim.advance(now)
      // Calculate the remaining number of tokens resulting from the request.
      tokens -= float64(n)
      // Calculate the wait duration
      var waitDuration time.Duration
      if tokens < 0 {
        waitDuration = lim.limit.durationFromTokens(-tokens)
      }
      // Decide result
      ok := n <= lim.burst && waitDuration <= maxFutureReserve
      // Prepare reservation
      r := Reservation{
        ok:  ok,
        lim:  lim,
        limit: lim.limit,
      }
      if ok {
        r.tokens = n
        r.timeToAct = now.Add(waitDuration)
      }
      // Update state
      if ok {
        lim.last = now
        lim.tokens = tokens
        lim.lastEvent = r.timeToAct
      } else {
        lim.last = last
      }
      return r
    }
      
    // advance calculates and returns an updated state for lim resulting from the passage of time.
    // lim is not changed.
    func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
      last := lim.last
      if now.Before(last) {
        last = now
      }
      // Avoid making delta overflow below when last is very old.
      maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
      elapsed := now.Sub(last)
      if elapsed > maxElapsed {
        elapsed = maxElapsed
      }
      // Calculate the new number of tokens, due to time that passed.
      delta := lim.limit.tokensFromDuration(elapsed)
      tokens := lim.tokens + delta
      if burst := float64(lim.burst); tokens > burst {
        tokens = burst
      }
      return now, last, tokens
    }
      
    // durationFromTokens is a unit conversion function from the number of tokens to the duration
    // of time it takes to accumulate them at a rate of limit tokens per second.
    func (limit Limit) durationFromTokens(tokens float64) time.Duration {
      seconds := tokens / float64(limit)
      return time.Nanosecond * time.Duration(1e9*seconds)
    }
      
    // tokensFromDuration is a unit conversion function from a time duration to the number of tokens
    // which could be accumulated during that duration at a rate of limit tokens per second.
    func (limit Limit) tokensFromDuration(d time.Duration) float64 {
      return d.Seconds() * float64(limit)
    }

    虽然在某些情况下使用单个全局速率限制器非常有用,但另一种常见情况是基于IP地址或API密钥等标识符为每个用户实施速率限制器。我们将使用IP地址作为标识符。简单实现代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    package main
    import (
      "net/http"
      "sync"
      "time"
      "golang.org/x/time/rate"
    )
    // Create a custom visitor struct which holds the rate limiter for each
    // visitor and the last time that the visitor was seen.
    type visitor struct {
      limiter *rate.Limiter
      lastSeen time.Time
    }
    // Change the the map to hold values of the type visitor.
    var visitors = make(map[string]*visitor)
    var mtx sync.Mutex
    // Run a background goroutine to remove old entries from the visitors map.
    func init() {
      go cleanupVisitors()
    }
    func addVisitor(ip string) *rate.Limiter {
      limiter := rate.NewLimiter(2, 5)
      mtx.Lock()
      // Include the current time when creating a new visitor.
      visitors[ip] = &visitor{limiter, time.Now()}
      mtx.Unlock()
      return limiter
    }
    func getVisitor(ip string) *rate.Limiter {
      mtx.Lock()
      v, exists := visitors[ip]
      if !exists {
        mtx.Unlock()
        return addVisitor(ip)
      }
      // Update the last seen time for the visitor.
      v.lastSeen = time.Now()
      mtx.Unlock()
      return v.limiter
    }
    // Every minute check the map for visitors that haven't been seen for
    // more than 3 minutes and delete the entries.
    func cleanupVisitors() {
      for {
        time.Sleep(time.Minute)
        mtx.Lock()
        for ip, v := range visitors {
          if time.Now().Sub(v.lastSeen) > 3*time.Minute {
            delete(visitors, ip)
          }
        }
        mtx.Unlock()
      }
    }
    func limit(next http.Handler) http.Handler {
      return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        limiter := getVisitor(r.RemoteAddr)
        if limiter.Allow() == false {
          http.Error(w, http.StatusText(429), http.StatusTooManyRequests)
          return
        }
        next.ServeHTTP(w, r)
      })
    }
  • 相关阅读:
    Linux 下编译hello world 的C 语言程序
    C语言实现二维数组操作--元素个数确定
    Linux Eclipse安装和配置命令行(jre、jdk)
    段错误bug的调试
    fopen与open的区别
    同样的c代码,为何在windows下和linux下执行结果不一样?
    VIM快捷键
    浅谈C中的wprintf和宽字符显示
    Know More About Oracle Row Lock
    【教学视频】Maclean教你一步一步使用Vbox在Linux 5上安装Oracle 11gR2 RAC
  • 原文地址:https://www.cnblogs.com/ExMan/p/12621386.html
Copyright © 2011-2022 走看看