思路:
总体使用生产者消费者模式。使用两个有缓冲区的channel来实现协程的并发控制,一个sigChannel通过缓冲空间限制最大的协程数量,另一个jobChannel则用于传递请求的数据(包括请求函数以及参数),该jobChannel对于是否缓冲没有要求。
流程:
(1)首先当请求到来之后,往sigChannel中写入标志位数据,如果此时有空闲位置,则不会阻塞在此处;
(2)之后往jobChannel中写入要执行的函数以及参数;
(3)后台监听jobChannel的函数worker(该函数要源源不断读取管道数据)则会取出管道中的数据;
(4)worker创建goroutine执行请求函数;
(5)该请求函数执行完成后,goroutine再去取出sigChannel管道中的标志数据,腾出来位置;
注:如果开始时候sigChannel写数据写入不了,则说明该池子满了,则需要阻塞等待。这样就实现了使用sigChannel控制并发量的功能。
代码
package main import ( "fmt" "net/http" //"context" _ "net/http/pprof" ) type Info struct { ParamFunc func(c int) Param int } type Task struct { taskLet chan Info taskCmp chan int64 } type Pool struct { tasks *Task taskNum int64 } func NewPool(n int64) *Pool{ taskc := make(chan Info, n) workc := make(chan int64, n) return &Pool{ tasks: &Task{ taskLet: taskc, taskCmp: workc, }, taskNum: n, } } func (p *Pool) Put(a Info) { p.tasks.taskCmp <- 1 p.tasks.taskLet <- a } func (p *Pool) Run() { for { select { case let := <- p.tasks.taskLet: go p.work(let) } } } func (p *Pool) work(f Info) { f.ParamFunc(f.Param) <- p.tasks.taskCmp } func test(c int) { fmt.Println(c) } func main() { // 限制并发 po := NewPool(50) go func() {//使用pprof跟踪 http.ListenAndServe(":9876",nil) }() go po.Run() var i int = 1 for { in := Info{test, i} po.Put(in) i++ } }
https://mp.weixin.qq.com/s/Y80kalSIlGhaDx3uWxoMig