zoukankan      html  css  js  c++  java
  • 使用Golang的singleflight防止缓存击穿

    背景

    在使用缓存时,容易发生缓存击穿。

    缓存击穿:一个存在的key,在缓存过期的瞬间,同时有大量的请求过来,造成所有请求都去读dB,这些请求都会击穿到DB,造成瞬时DB请求量大、压力骤增。

    singleflight

    介绍

    import "golang.org/x/sync/singleflight"
    singleflight类的使用方法就新建一个singleflight.Group,使用其方法Do或者DoChan来包装方法,被包装的方法在对于同一个key,只会有一个协程执行,其他协程等待那个协程执行结束后,拿到同样的结果。

    • Group结构体
      代表一类工作,同一个group中,同样的key同时只能被执行一次。
    • Do方法
      func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
      key:同一个key,同时只有一个协程执行。
      fn:被包装的函数。
      v:返回值,即执行的结果。其他等待的协程都会拿到。
      shared:表示是否有其他协程得到了这个结果v。
    • DoChan方法
      func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
      与Do方法一样,只是返回的是一个channel,执行结果会发送到channel中,其他等待的协程都可以从channel中拿到结果。

    ref:https://godoc.org/golang.org/x/sync/singleflight

    示例

    • 使用Do方法来模拟,解决缓存击穿的问题
    func main() {
       var singleSetCache singleflight.Group
    
       getAndSetCache:=func (requestID int,cacheKey string) (string, error) {
          log.Printf("request %v start to get and set cache...",requestID)
          value,_, _ :=singleSetCache.Do(cacheKey, func() (ret interface{}, err error) {//do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
             log.Printf("request %v is setting cache...",requestID)
             time.Sleep(3*time.Second)
             log.Printf("request %v set cache success!",requestID)
             return "VALUE",nil
          })
          return value.(string),nil
       }
    
       cacheKey:="cacheKey"
       for i:=1;i<10;i++{//模拟多个协程同时请求
          go func(requestID int) {
             value,_:=getAndSetCache(requestID,cacheKey)
             log.Printf("request %v get value: %v",requestID,value)
          }(i)
       }
       time.Sleep(20*time.Second)
    }
    

    输出:

    2020/04/12 18:18:40 request 4 start to get and set cache...
    2020/04/12 18:18:40 request 4 is setting cache...
    2020/04/12 18:18:40 request 2 start to get and set cache...
    2020/04/12 18:18:40 request 7 start to get and set cache...
    2020/04/12 18:18:40 request 5 start to get and set cache...
    2020/04/12 18:18:40 request 1 start to get and set cache...
    2020/04/12 18:18:40 request 6 start to get and set cache...
    2020/04/12 18:18:40 request 3 start to get and set cache...
    2020/04/12 18:18:40 request 8 start to get and set cache...
    2020/04/12 18:18:40 request 9 start to get and set cache...
    2020/04/12 18:18:43 request 4 set cache success!
    2020/04/12 18:18:43 request 4 get value: VALUE
    2020/04/12 18:18:43 request 9 get value: VALUE
    2020/04/12 18:18:43 request 6 get value: VALUE
    2020/04/12 18:18:43 request 3 get value: VALUE
    2020/04/12 18:18:43 request 8 get value: VALUE
    2020/04/12 18:18:43 request 1 get value: VALUE
    2020/04/12 18:18:43 request 5 get value: VALUE
    2020/04/12 18:18:43 request 2 get value: VALUE
    2020/04/12 18:18:43 request 7 get value: VALUE
    

    可以看到确实只有一个协程执行了被包装的函数,并且其他协程都拿到了结果。

    源码分析

    看一下这个Do方法是怎么实现的。
    首先看一下Group的结构:

    type Group struct {
       mu sync.Mutex      
       m  map[string]*call //保存key对应的函数执行过程和结果的变量。
    }
    

    Group的结构非常简单,一个锁来保证并发安全,另一个map用来保存key对应的函数执行过程和结果的变量。
    看下call的结构:

    type call struct {
       wg sync.WaitGroup //用WaitGroup实现只有一个协程执行函数
       val interface{} //函数执行结果
       err error
       forgotten bool
       dups  int //含义是duplications,即同时执行同一个key的协程数量
       chans []chan<- Result
    }
    

    看下Do方法

    func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
       g.mu.Lock()//写Group的m字段时,加锁保证写安全。
       if g.m == nil {
          g.m = make(map[string]*call)
       }
       if c, ok := g.m[key]; ok {//如果key已经存在,说明已经有协程在执行,则dups++,并等待其执行完毕后,返回其执行结果,执行结果保存在对应的call的val字段里
          c.dups++
          g.mu.Unlock()
          c.wg.Wait()
          return c.val, c.err, true
       }
       //如果key不存在,则新建一个call,并使用WaitGroup来阻塞其他协程,同时在m字段里写入key和对应的call
       c := new(call)
       c.wg.Add(1)
       g.m[key] = c
       g.mu.Unlock()
    
       g.doCall(c, key, fn)//第一个进来的协程来执行这个函数
       return c.val, c.err, c.dups > 0
    }
    

    继续看下g.doCall里具体干了什么

    func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
       c.val, c.err = fn()//执行被包装的函数
       c.wg.Done()//执行完毕后,就可以通知其他协程可以拿结果了
    
       g.mu.Lock()
       if !c.forgotten {//其实这里是为了保证执行完毕之后,对应的key被删除,Group有一个方法Forget(key string),可以用来主动删除key,这里是判断那个方法是否被调用过,被调用过则字段forgotten会置为true,如果没有被调用过,则在这里把key删除。
          delete(g.m, key)
       }
       for _, ch := range c.chans {//将执行结果发送到channel里,这里是给DoChan方法使用的
          ch <- Result{c.val, c.err, c.dups > 0}
       }
       g.mu.Unlock()
    }
    

    由此看来,其实现是非常简单的。不得不赞叹一百来行代码就实现了功能。

    其他

    顺便附上DoChan方法的使用示例:

    func main() {
       var singleSetCache singleflight.Group
    
       getAndSetCache:=func (requestID int,cacheKey string) (string, error) {
          log.Printf("request %v start to get and set cache...",requestID)
          retChan:=singleSetCache.DoChan(cacheKey, func() (ret interface{}, err error) {
             log.Printf("request %v is setting cache...",requestID)
             time.Sleep(3*time.Second)
             log.Printf("request %v set cache success!",requestID)
             return "VALUE",nil
          })
    
          var ret singleflight.Result
    
          timeout := time.After(5 * time.Second)
    
          select {//加入了超时机制
          case <-timeout:
             log.Printf("time out!")
             return "",errors.New("time out")
          case ret =<- retChan://从chan中取出结果
             return ret.Val.(string),ret.Err
          }
          return "",nil
       }
    
       cacheKey:="cacheKey"
       for i:=1;i<10;i++{
          go func(requestID int) {
             value,_:=getAndSetCache(requestID,cacheKey)
             log.Printf("request %v get value: %v",requestID,value)
          }(i)
       }
       time.Sleep(20*time.Second)
    }
    

    看下DoChan的源码

    func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
       ch := make(chan Result, 1)
       g.mu.Lock()
       if g.m == nil {
          g.m = make(map[string]*call)
       }
       if c, ok := g.m[key]; ok {
          c.dups++
          c.chans = append(c.chans, ch)//可以看到,每个等待的协程,都有一个结果channel。从之前的g.doCall里也可以看到,每个channel都给塞了结果。为什么不所有协程共用一个channel?因为那样就得在channel里塞至少与协程数量一样的结果数量,但是你却无法保证用户一个协程只读取一次。
          g.mu.Unlock()
          return ch
       }
       c := &call{chans: []chan<- Result{ch}}
       c.wg.Add(1)
       g.m[key] = c
       g.mu.Unlock()
    
       go g.doCall(c, key, fn)
    
       return ch
    }
    
    作者:Chaunceeeeey
    出处:https://www.cnblogs.com/chaunceeeeey/
    版权:本文版权归作者和博客园共有
    转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任
  • 相关阅读:
    python 并发编程 多线程 event
    python 并发编程 多线程 定时器
    python 并发编程 多线程 信号量
    linux top 查看CPU命令
    python 并发编程 多线程 GIL与多线程
    python 并发编程 多线程 死锁现象与递归锁
    python 并发编程 多线程 GIL与Lock
    python GIL全局解释器锁与互斥锁 目录
    python 并发编程 多线程 GIL全局解释器锁基本概念
    执行python程序 出现三部曲
  • 原文地址:https://www.cnblogs.com/chaunceeeeey/p/12740922.html
Copyright © 2011-2022 走看看