zoukankan      html  css  js  c++  java
  • Golang协程池(workpool)实现

    背景

    因与工作相关,所以本文中的数据都进行了更改,但逻辑是一样的。

    笔者的服务ServerA会请求服务ServerH获取一些数据,但ServerH的接口有个N秒内只能请求M次的限制,并返回false。而笔者的服务瞬时请求量远超M次,所以采用了协程池在收到103错误时,停止worker的运行N秒,然后再启动。

    协程池的概念

    协程池的相关概念:要有一个一定数量大小的池子(pool),池子里存储需要执行的任务(task),还要有若干个工作协程(worker)。

    协程池要有启动,停止,睡眠的功能。

    下面是从零开始记录一下思想过程和遇到的问题。

    基础版

    在此版本里,除了睡眠的功能,已经实现了一个基本的协程池。

    // workpool.go
    package workpool
    
    import (
    	"context"
    	"sync"
    )
    
    type TaskFunc func()
    
    type Task struct {
    	f TaskFunc
    }
    
    type WorkPool struct {
    	pool        chan *Task
    	workerCount int
    
    	stopCtx        context.Context
    	stopCancelFunc context.CancelFunc
    	wg             sync.WaitGroup
    }
    
    func (t *Task) Execute() {
    	t.f()
    }
    
    func New(workerCount, poolLen int) *WorkPool {
    	return &WorkPool{
    		workerCount: workerCount,
    		pool:        make(chan *Task, poolLen),
    	}
    }
    
    func (w *WorkPool) PushTask(t *Task) {
    	w.pool <- t
    }
    
    func (w *WorkPool) PushTaskFunc(f TaskFunc) {
    	w.pool <- &Task{
    		f: f,
    	}
    }
    
    func (w *WorkPool) work() {
    	for {
    		select {
    		case <-w.stopCtx.Done():
    			w.wg.Done()
    			return
    		case t := <-w.pool:
    			t.Execute()
    		}
    	}
    }
    
    func (w *WorkPool) Start() *WorkPool {
    	w.wg.Add(w.workerCount)
    	w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background())
    	for i := 0; i < w.workerCount; i++ {
    		go w.work()
    	}
    	return w
    }
    
    func (w *WorkPool) Stop() {
    	w.stopCancelFunc()
    	w.wg.Wait()
    }
    

    看起来没什么毛病,还挺简洁。其实不然...

    下面的程序是创建一个容量为50的workpool,并将通过3个worker输出100个数字。

    // workpool_test.go
    package workpool
    
    import (
    	"fmt"
    	"sync"
    	"testing"
    )
    
    func TestWorkPool_Start(t *testing.T) {
    	wg := sync.WaitGroup{}
    	wp := New(3, 50).Start()
    	lenth := 100
    	wg.Add(lenth)
    	for i := 0; i < lenth; i++ {
    		wp.PushTaskFunc(func() {
    			defer wg.Done()
    			fmt.Print(i, " ")
    		})
    	}
    	wg.Wait()
    }
    
    

    运行后输出结果如下:

    50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 51 51 51 51 69 72 78 78 80 81 81 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 84 84 84 84 50 84
    100 100 100 100 100 100 100 100 100 100 50 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 84 100 100 100 
    

    这和想象中的输出 0-99 相差甚远。

    其原因在于闭包函数对于外部变量是引用的,所以在函数执行的时候,i的值早就已经改变了。下面是一个关于闭包的简单例子。

    x := 1
    f := func() {
    	println(x)
    }
    x = 2
    x = 3
    f() // 3
    

    可以将 f() 的调用时机对应为协程池中的 t.Execute()。

    解决闭包引用问题

    既然是因为闭包引用导致的问题,那就不使用闭包了呗。

    可以把参数传到函数内,但是因为并不知道将要执行的函数需要的参数个数及类型,所以只能是使用不定长的interface{}TaskFunc,在使用的时候进行断言。

    以下仅列出改动部分:

    // workpool.go
    type TaskFunc func(args ...interface{})
    
    type Task struct {
    	f    TaskFunc
    	args []interface{}
    }
    
    func (t *Task) Execute() {
    	t.f(t.args...)
    }
    
    func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) {
    	w.pool <- &Task{
    		f:    f,
    		args: args,
    	}
    }
    

    以下是测试程序:

    // workpool_test.go
    package workpool
    
    import (
    	"fmt"
    	"sync"
    	"testing"
    )
    
    func TestWorkPool_Start(t *testing.T) {
    	wg := sync.WaitGroup{}
    	wp := New(3, 50).Start()
    	lenth := 100
    	wg.Add(lenth)
    	for i := 0; i < lenth; i++ {
    		wp.PushTaskFunc(func(args ...interface{}) {
    			defer wg.Done()
    			fmt.Print(args[0].(int), " ")
    		}, i)
    	}
    	wg.Wait()
    }
    

    输出内容如下:

    0 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 2 1 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 26 48 49 51 52 53 54 55 56 50 58 59 57 61 62 63 64 65 66 25 68 6
    9 70 71 72 73 67 75 76 77 74 79 78 81 82 83 84 60 86 87 88 89 90 91 92 85 94 95 96 97 98 99 80 93 
    

    虽然顺序是错乱的,但这是正常情况,闭包引用问题已解决。

    添加睡眠功能

    基于开头的应用场景,在任意一个被worker执行的任务收到ServerH的103错误后,要停止所有worker一段时间,因为再一直请求也没有意义。

    这个版本已经与笔者正在使用的相差无几了

    // workpool.go
    package workpool
    
    import (
    	"context"
    	"fmt"
    	"sync"
    	"sync/atomic"
    	"time"
    )
    
    type Flag int64
    
    const (
    	FLAG_OK    Flag = 1 << iota
    	FLAG_RETRY Flag = 1 << iota
    )
    
    type TaskFunc func(w *WorkPool, args ...interface{}) Flag
    
    type Task struct {
    	f    TaskFunc
    	args []interface{}
    }
    
    type WorkPool struct {
    	pool        chan *Task
    	workerCount int
    
            // stop相关
    	stopCtx        context.Context
    	stopCancelFunc context.CancelFunc
    	wg             sync.WaitGroup
    
            // sleep相关
    	sleepCtx        context.Context
    	sleepCancelFunc context.CancelFunc
    	sleepSeconds    int64
    	sleepNotify     chan bool
    }
    
    func (t *Task) Execute(w *WorkPool) Flag {
    	return t.f(w, t.args...)
    }
    
    func New(workerCount, poolLen int) *WorkPool {
    	return &WorkPool{
    		workerCount: workerCount,
    		pool:        make(chan *Task, poolLen),
    		sleepNotify: make(chan bool),
    	}
    }
    
    func (w *WorkPool) PushTask(t *Task) {
    	w.pool <- t
    }
    
    func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) {
    	w.pool <- &Task{
    		f:    f,
    		args: args,
    	}
    }
    
    func (w *WorkPool) work(i int) {
    	for {
    		select {
    		case <-w.stopCtx.Done():
    			w.wg.Done()
    			return
    		case <-w.sleepCtx.Done():
    			time.Sleep(time.Duration(w.sleepSeconds) * time.Second)
    		case t := <-w.pool:
    			flag := t.Execute(w)
    			if flag&FLAG_RETRY != 0 {
    				w.PushTask(t)
    				fmt.Printf("work %v PushTask,pool length %v
    ", i, len(w.pool))
    			}
    		}
    	}
    }
    
    func (w *WorkPool) Start() *WorkPool {
    	fmt.Printf("workpool run %d worker
    ", w.workerCount)
    	w.wg.Add(w.workerCount + 1)
    	w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background())
    	w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background())
    	go w.sleepControl()
    	for i := 0; i < w.workerCount; i++ {
    		go w.work(i)
    	}
    	return w
    }
    
    func (w *WorkPool) Stop() {
    	w.stopCancelFunc()
    	w.wg.Wait()
    }
    
    func (w *WorkPool) sleepControl() {
    	fmt.Println("sleepControl start...")
    	for {
    		select {
    		case <-w.stopCtx.Done():
    			w.wg.Done()
    			return
    		case <-w.sleepNotify:
    			fmt.Printf("receive sleep notify start...
    ")
    			w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background())
    			w.sleepCancelFunc()
    			fmt.Printf("sleepControl will star sleep %v s
    ", w.sleepSeconds)
    			time.Sleep(time.Duration(w.sleepSeconds) * time.Second)
    			w.sleepSeconds = 0
    			fmt.Println("sleepControl was end sleep")
    		}
    	}
    }
    
    
    func (w *WorkPool) SleepNotify(seconds int64) {
    	// 因为需要CAS操作,所以sleepSeconds没有采用time.Duration类型
    	// 成功设置后才发出通知
    	if atomic.CompareAndSwapInt64(&w.sleepSeconds, 0, seconds) {
    		fmt.Printf("sleepSeconds set %v
    ", seconds)
    		w.sleepNotify <- true
    	}
    }
    
    

    下面的测试程序中,模拟了一下ServerH,其使用场景与笔者工作中大同小异。

    // workpool_test.go
    package workpool
    
    import (
    	"fmt"
    	"sync"
    	"testing"
    	"time"
    )
    
    // 这里模拟ServerH服务的限流操作
    var serverh = &server{max: 10, interval: 5}
    
    type server struct {
    	count    int
    	max      int
    	lasttime time.Time
    	interval int64
    	mu       sync.Mutex
    }
    
    func (s *server) Access(i int) bool {
    	now := time.Now()
    
    	s.mu.Lock()
    	defer s.mu.Unlock()
    
    	time.Sleep(100 * time.Millisecond)
    
    	if s.lasttime.Unix() <= 0 || s.count >= s.max {
    		if now.After(s.lasttime) {
    			s.count = 1
    			s.lasttime = time.Unix(now.Unix()+s.interval, 0)
    			return true
    		}
    		fmt.Printf("Access false,i=%d 
    ", i)
    		return false
    	} else {
    		s.count++
    		fmt.Printf("Access true,i=%d s.count %d
    ", i, s.count)
    		return true
    	}
    }
    
    // 这里是笔者服务的逻辑
    func TestWorkPool_Start(t *testing.T) {
    	wp := New(3, 100).Start()
    	for i := 0; i < 100; i++ {
    		time.Sleep(100 * time.Millisecond)
    		wp.PushTaskFunc(func(w *WorkPool, args ...interface{}) Flag {
    			if !serverh.Access(args[0].(int)) {
                                    // 发送睡眠5秒的通知
    				w.SleepNotify(5) 
                                    // 此次未执行成功,要将该任务放回协程池
    				return FLAG_RETRY 
    			}
    			return FLAG_OK
    		}, i)
    	}
    	time.Sleep(100 * time.Second)
    }
    
    

    输出内容如下:

    workpool run 3 worker
    sleepControl start...
    Access true,i=1 s.count 2
    Access true,i=2 s.count 3
    Access true,i=3 s.count 4
    Access true,i=4 s.count 5
    Access true,i=5 s.count 6
    Access true,i=6 s.count 7
    Access true,i=7 s.count 8
    Access true,i=8 s.count 9
    Access true,i=9 s.count 10
    Access false,i=10 
    sleepSeconds set 5
    work 1 PushTask,pool length 0
    receive sleep notify start...
    sleepControl will star sleep 5 s
    Access false,i=10 
    work 0 PushTask,pool length 1
    Access false,i=10 
    work 0 PushTask,pool length 2
    Access false,i=11 
    work 2 PushTask,pool length 3
    Access false,i=12 
    work 1 PushTask,pool length 5
    Access false,i=13 
    work 0 PushTask,pool length 6
    Access false,i=14 
    work 0 PushTask,pool length 7
    Access false,i=10 
    work 1 PushTask,pool length 8
    Access false,i=15 
    work 1 PushTask,pool length 9
    Access false,i=11 
    work 0 PushTask,pool length 11
    Access false,i=12 
    work 0 PushTask,pool length 11
    Access false,i=16 
    work 0 PushTask,pool length 12
    sleepControl was end sleep
    Access true,i=17 s.count 2
    Access true,i=14 s.count 3
    Access true,i=18 s.count 4
    Access true,i=10 s.count 5
    Access true,i=15 s.count 6
    Access true,i=20 s.count 7
    Access true,i=19 s.count 8
    Access true,i=12 s.count 9
    Access true,i=11 s.count 10
    Access false,i=21 
    sleepSeconds set 5
    work 0 PushTask,pool length 53
    receive sleep notify start...
    sleepControl will star sleep 5 s
    Access false,i=16 
    work 1 PushTask,pool length 54
    Access false,i=22 
    work 2 PushTask,pool length 55
    Access false,i=23 
    work 0 PushTask,pool length 57
    Access false,i=24 
    ...........
    

    待补充

    重试次数的逻辑

  • 相关阅读:
    Android App Bundle 使用指南
    Homebrew国内源
    Mac下配置环境变量不生效问题
    CocosCreator1.x配置打包Android App Bundle
    Android读取Json文件的工具类
    Cocos Creator 坐标转换
    XCode真机调试不了,提示"Please reconnect the device"
    Canvas: trying to draw too large(134374464bytes) bitmap.
    Modbus主从关系几点记录
    当前时间加上几天
  • 原文地址:https://www.cnblogs.com/flhs/p/13209057.html
Copyright © 2011-2022 走看看