zoukankan      html  css  js  c++  java
  • 十一. Go并发编程singleflight

    一.前言

    1.1 为什么需要Singleflight?

    很多程序员可能还是第一次听说,本人第一次听说这个的时候以为翻译过来就是程序设计中被称为的是 "单例模式"。 google之后二者天壤之别。

    一般情况下我们在写一写对外的服务的时候都会有一层 cache 作为缓存,用来减少底层数据库的压力,但是在遇到例如 redis 抖动或者其他情况可能会导致大量的 cache miss 出现。

    1.2 使用场景

    如下图所示,可能存在来自桌面端和移动端的用户有 1000 的并发请求,他们都访问的获取文章列表的接口,获取前 20 条信息,如果这个时候我们服务直接去访问 redis 出现 cache miss 那么我们就会去请求 1000 次数据库,这时可能会给数据库带来较大的压力(这里的 1000 只是一个例子,实际上可能远大于这个值)导致我们的服务异常或者超时。

    这时候就可以使用singleflight 库了,直译过来就是 单飞.

    这个库的主要作用就是: 将一组相同的请求合并成一个请求,实际上只会去请求一次,然后对所有的请求返回相同的结果。 如下图所示

    二. slingleFligh 库(使用教程)

    2.1 函数签名

    主要是一个Group结构体, 结构体包含三个方法,相见代码注释

    type Group
        // Do 执行函数, 对同一个 key 多次调用的时候,在第一次调用没有执行完的时候
    	// 只会执行一次 fn 其他的调用会阻塞住等待这次调用返回
    	// v, err 是传入的 fn 的返回值
    	// shared 表示是否真正执行了 fn 返回的结果,还是返回的共享的结果
        func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
    
    	// DoChan 和 Do 类似,只是 DoChan 返回一个 channel,也就是同步与异步的区别
    	func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
    
        // Forget 用于通知 Group 删除某个 key 这样后面继续这个 key 的调用的时候就不会在阻塞等待了
    	func (g *Group) Forget(key string)
    

    2.2 使用示例

    先使用一个普通的例子,这时一个获取文章详情的函数,我们在函数里面使用一个 count 模拟不同并发下的耗时的不同,并发越多请求耗时越多。

    func getArticle(id int) (article string, err error) {
    	// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
    	// 使用原子操作保证并发安全
    	atomic.AddInt32(&count, 1)
    	time.Sleep(time.Duration(count) * time.Millisecond)
    
    	return fmt.Sprintf("article: %d", id), nil
    }
    

    使用 singlefight 的时候就只需要 new(singleflight.Group) 然后调用以下相对应的Do方法

    func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {
    	
    	v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
    		return getArticle(id)
    	})
    
    	return v.(string), err
    }
    

    3. 测试

    写一个简单的测试代码,模拟启动1000个goroutine 去并发调用这两个方法

    package main
    
    import (
    	"fmt"
    	"sync"
    	"sync/atomic"
    	"time"
    
    	"golang.org/x/sync/singleflight"
    )
    
    var count int32
    
    func getArticle(id int) (article string, err error) {
    	// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
    	atomic.AddInt32(&count, 1)
    	time.Sleep(time.Duration(count) * time.Millisecond)
    
    	return fmt.Sprintf("article: %d", id), nil
    }
    
    func singleFlightGetArticle(sg *singleflight.Group, id int) (string, error) {
    	v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
    		return getArticle(id)
    	})
    
    	return v.(string), err
    }
    
    func main() {
    	// 使用一个定时器,
    	time.AfterFunc(1*time.Second, func() {
    		atomic.AddInt32(&count, -count)
    	})
    
    	var (
    		wg  sync.WaitGroup
    		now = time.Now()
    		n   = 1000
    		sg  = &singleflight.Group{}
    	)
    
    	for i := 0; i < n; i++ {
    		wg.Add(1)
    		go func() {
    			res, _ := singleFlightGetArticle(sg, 1)
    			// res, _ := getArticle(1)
    			if res != "article: 1" {
    				panic("err")
    			}
    			wg.Done()
    		}()
    	}
    
    	wg.Wait()
    	fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
    }
    
    

    调用 getArticle 方法的耗时,花费了 1s 多

    go run demo.go
    同时发起 1000 次请求,耗时: 1.0157721s
    

    切换到singleflight方法测试发现花费20ms

    go run demo.go
    同时发起 1000 次请求,耗时: 21.1962ms
    

    测试发现,使用singleflight这种方式的确在这种场景下能减少执行执行,提高效率。

    3.实现原理

    这是非内置库,仓库golang.org/x/sync/singleflight 源码解析如下

    Group
    是一个结构体

    type Group struct {
    	mu sync.Mutex       // protects m
    	m  map[string]*call // lazily initialized
    }
    

    Group 结构体由一个互斥锁和一个 map 组成,可以看到注释 map 是懒加载的,所以 Group 只要声明就可以使用,不用进行额外的初始化零值就可以直接使用。call 保存了当前调用对应的信息,map 的键就是我们调用 Do 方法传入的 key

    call 也是一个结构体,结构体如下

    type call struct {
    	wg sync.WaitGroup                          
    
    	// 函数的返回值,在 wg 返回前只会写入一次
    	val interface{}
    	err error
    
    	// 使用调用了 Forgot 方法
    	forgotten bool
    
        // 统计调用次数以及返回的 channel
    	dups  int
    	chans []chan<- Result
    }
    

    Do
    Do方法用来执行传入函数

    func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    	g.mu.Lock()
    
        // 前面提到的懒加载
        if g.m == nil {
    		g.m = make(map[string]*call)
    	}
    
        // 会先去看 key 是否已经存在
    	if c, ok := g.m[key]; ok {
           	// 如果存在就会解锁
    		c.dups++
    		g.mu.Unlock()
    
            // 然后等待 WaitGroup 执行完毕,只要一执行完,所有的 wait 都会被唤醒
    		c.wg.Wait()
    
            // 这里区分 panic 错误和 runtime 的错误,避免出现死锁,后面可以看到为什么这么做
    		if e, ok := c.err.(*panicError); ok {
    			panic(e)
    		} else if c.err == errGoexit {
    			runtime.Goexit()
    		}
    		return c.val, c.err, true
    	}
    
        // 如果我们没有找到这个 key 就 new call
    	c := new(call)
    
        // 然后调用 waitgroup 这里只有第一次调用会 add 1,其他的都会调用 wait 阻塞掉
        // 所以这要这次调用返回,所有阻塞的调用都会被唤醒
    	c.wg.Add(1)
    	g.m[key] = c
    	g.mu.Unlock()
    
        // 然后我们调用 doCall 去执行
    	g.doCall(c, key, fn)
    	return c.val, c.err, c.dups > 0
    }
    

    doCall
    这个方法种有个技巧值得学习,使用了两个 defer 巧妙的将 runtime 的错误和我们传入 function 的 panic 区别开来避免了由于传入的 function panic 导致的死锁。

    func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    	normalReturn := false
    	recovered := false
    
        // 第一个 defer 检查 runtime 错误
    	defer func() {
            ...
    	}()
    
        // 使用一个匿名函数来执行
    	func() {
    		defer func() {
    			if !normalReturn {
                    // 如果 panic 了我们就 recover 掉,然后 new 一个 panic 的错误
                    // 后面在上层重新 panic
    				if r := recover(); r != nil {
    					c.err = newPanicError(r)
    				}
    			}
    		}()
    
    		c.val, c.err = fn()
    
            // 如果 fn 没有 panic 就会执行到这一步,如果 panic 了就不会执行到这一步
            // 所以可以通过这个变量来判断是否 panic 了
    		normalReturn = true
    	}()
    
        // 如果 normalReturn 为 false 就表示,我们的 fn panic 了
        // 如果执行到了这一步,也说明我们的 fn  recover 住了,不是直接 runtime exit
    	if !normalReturn {
    		recovered = true
    	}
    }
    

    第一个defer函数如下

    defer func() {
    	// 如果既没有正常执行完毕,又没有 recover 那就说明需要直接退出了
    	if !normalReturn && !recovered {
    		c.err = errGoexit
    	}
    
    	c.wg.Done()
    	g.mu.Lock()
    	defer g.mu.Unlock()
    
         // 如果已经 forgot 过了,就不要重复删除这个 key 了
    	if !c.forgotten {
    		delete(g.m, key)
    	}
    
    	if e, ok := c.err.(*panicError); ok {
    		// 如果返回的是 panic 错误,为了避免 channel 死锁,我们需要确保这个 panic 无法被恢复
    		if len(c.chans) > 0 {
    			go panic(e)
    			select {} // Keep this goroutine around so that it will appear in the crash dump.
    		} else {
    			panic(e)
    		}
    	} else if c.err == errGoexit {
    		// 已经准备退出了,也就不用做其他操作了
    	} else {
    		// 正常情况下向 channel 写入数据
    		for _, ch := range c.chans {
    			ch <- Result{c.val, c.err, c.dups > 0}
    		}
    	}
    }()
    

    DoChan

    Do chan 和 Do 类似,其实就是一个是同步等待,一个是异步返回,主要实现上就是,如果调用 DoChan 会给 call.chans 添加一个 channel 这样等第一次调用执行完毕之后就会循环向这些 channel 写入数据

    func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    	ch := make(chan Result, 1)
    	g.mu.Lock()
    	if g.m == nil {
    		g.m = make(map[string]*call)
    	}
    	if c, ok := g.m[key]; ok {
    		c.dups++
    		c.chans = append(c.chans, ch)
    		g.mu.Unlock()
    		return ch
    	}
    	c := &call{chans: []chan<- Result{ch}}
    	c.wg.Add(1)
    	g.m[key] = c
    	g.mu.Unlock()
    
    	go g.doCall(c, key, fn)
    
    	return ch
    }
    

    Forget
    forget 用于手动释放某个 key 下次调用就不会阻塞等待了

    func (g *Group) Forget(key string) {
    	g.mu.Lock()
    	if c, ok := g.m[key]; ok {
    		c.forgotten = true
    	}
    	delete(g.m, key)
    	g.mu.Unlock()
    }
    

    三.注意事项

    3.1 一个阻塞,全员等待

    使用 singleflight 我们比较常见的是直接使用 Do 方法,但是这个极端情况下会导致整个程序 hang 住,如果我们的代码出点问题,有一个调用 hang 住了,那么会导致所有的请求都 hang 住

    还是之前的例子, 加入一个 select 模拟阻塞

    package main
    
    import (
    	"fmt"
    	"sync"
    	"sync/atomic"
    	"time"
    
    	"golang.org/x/sync/singleflight"
    )
    
    var count2 int32
    
    func getArticle2(id int) (article string, err error) {
    	// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
    	atomic.AddInt32(&count2, 1)
    	time.Sleep(time.Duration(count2) * time.Millisecond)
    
    	return fmt.Sprintf("article: %d", id), nil
    }
    
    func singleFlightGetArticle2(sg *singleflight.Group, id int) (string, error) {
    	v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
    		// 模拟阻塞
    		select {}
    		return getArticle2(id)
    	})
    
    	return v.(string), err
    }
    
    func main() {
    	// 使用一个定时器,
    	time.AfterFunc(1*time.Second, func() {
    		atomic.AddInt32(&count2, -count2)
    	})
    
    	var (
    		wg  sync.WaitGroup
    		now = time.Now()
    		n   = 1000
    		sg  = &singleflight.Group{}
    	)
    
    	for i := 0; i < n; i++ {
    		wg.Add(1)
    		go func() {
    			res, _ := singleFlightGetArticle2(sg, 1)
    			if res != "article: 1" {
    				panic("err")
    			}
    			wg.Done()
    		}()
    	}
    
    	wg.Wait()
    	fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
    }
    

    执行就会发现死锁

    fatal error: all goroutines are asleep - deadlock!
    
    goroutine 1 [select (no cases)]:
    

    这时候DoChan就能派上用场了,结合 select 做超时控制

    func singleflightGetArticle(ctx context.Context, sg *singleflight.Group, id int) (string, error) {
    	result := sg.DoChan(fmt.Sprintf("%d", id), func() (interface{}, error) {
    		// 模拟出现问题,hang 住
    		select {}
    		return getArticle(id)
    	})
    
    	select {
    	case r := <-result:
    		return r.Val.(string), r.Err
    	case <-ctx.Done():
    		return "", ctx.Err()
    	}
    }
    

    调用的时候传入一个含 超时的 context 即可

    func main() {
    	// 使用一个定时器,
    	time.AfterFunc(1*time.Second, func() {
    		atomic.AddInt32(&count2, -count2)
    	})
    
    	var (
    		wg     sync.WaitGroup
    		now    = time.Now()
    		n      = 1000
    		sg     = &singleflight.Group{}
    		// 超时控制
    		ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
    	)
    
    	for i := 0; i < n; i++ {
    		wg.Add(1)
    		go func() {
    			res, _ := singleFlightGetArticle2(ctx, sg, 1)
    			if res != "article: 1" {
    				panic("err")
    			}
    			wg.Done()
    		}()
    
    	}
    
    	wg.Wait()
    	fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
    }
    

    执行时就会返回超时错误.

    ❯ go run demo2.go
    panic: context deadline exceeded
    

    3.2 一个出错,全部出错

    本身不是什么问题,因为singleflight就是这么设计的。 但是实际使用的时候,可能并不是想要这样的效果。 比如 如果一次调用要1s. 我们的数据库请求或者下游服务服务可以支撑10rps(即每秒可以支持10次的请求)的请求的时候会导致我们的错误阈值提高。 因为我们可以1s尝试10次,但是用了singleflight之后只能尝试一次。只要出错这段时间内的所有请求都会收到影响。 那这种情况该如何解决呢?

    我们可以启动一个Goroutine定时forget一下,相当于将rps 从1rps提高到10rps

    go func() {
           time.Sleep(100 * time.Millisecond)
           // logging
           g.Forget(key)
       }()
    

    四. 使用场景

    如开头场景所讲, singleflige 可以有效解决在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。

    使用singleflight能有效解决这个问题,限制对同一个键值对的多次重复请求,减少对下游的瞬时流量

    通过一段代码可以查看如何解决这种缓存击穿的问题

    
    type service struct {
        requestGroup singleflight.Group
    }
    
    func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
        // request.Hash就是传入的建,请求的哈希在业务上一般表示相同的请求,所以上述代码使用它作为请求的键
        v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
            rows, err := // select * from tables
            if err != nil {
                return nil, err
            }
            return rows, nil
        })
        if err != nil {
            return nil, err
        }
        return Response{
            rows: rows,
        }, nil
    }
    

    五. 总结

    golang/sync/singleflight.Group.Dogolang/sync/singleflight.Group.DoChan 分别提供了同步和异步的调用方式,这让我们使用起来也更加灵活。

    当我们需要减少对下游的相同请求时,可以使用 golang/sync/singleflight.Group 来增加吞吐量和服务质量,不过在使用的过程中我们也需要注意以下的几个问题:

    • golang/sync/singleflight.Group.Dogolang/sync/singleflight.Group.DoChan 一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并通过 Channel 接收函数的返回值;
    • golang/sync/singleflight.Group.Forget 可以通知 golang/sync/singleflight.Group 在持有的映射表中删除某个键,接下来对该键的调用就不会等待前面的函数返回了;
    • 一旦调用的函数返回了错误,所有在等待的 Goroutine 也都会接收到同样的错误;

    六.参考

    1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#singleflight
    2. https://lailin.xyz/post/go-training-week5-singleflight.html
    3. https://pkg.go.dev/golang.org/x/sync/singleflight
    ♥永远年轻,永远热泪盈眶♥
  • 相关阅读:
    『Argparse』命令行解析
    『MXNet』专题汇总
    用.NET开发通用Windows App
    ASP.NET 5探险(6):升级ASP.NET 5到beta6
    使用ASP.NET MVC、Rabbit WeixinSDK和Azure快速开发部署微信后台
    Visual Studio 2015将在7月20号RTM
    VS2015上又一必备免费插件:Refactoring Essentials
    ASP.NET 5探险(5):利用AzureAD实现单点登录
    Visual Studio Code升级到0.5,提供对ES6的更好支持
    ASP.NET 5探险(4):如何把ASP.NET 5从beta4升级到beta5
  • 原文地址:https://www.cnblogs.com/failymao/p/15613749.html
Copyright © 2011-2022 走看看