为什么要控制goroutine的数量?
在我们开发过程中,如果不对
goroutine
加以控制而进行滥用的话,可能会导致服务整体崩溃。比如耗尽系统资源导致程序崩溃,或者CPU使用率过高导致系统忙不过来。
用什么方法控制goroutine的数量?
尝试 chan
func main() {
userCount := 10
ch := make(chan bool, 2)
for i := 0; i < userCount; i++ {
ch <- true
go Read(ch, i)
}
//time.Sleep(time.Second)
}
func Read(ch chan bool, i int) {
fmt.Printf("go func: %d
", i)
<- ch
}
输出结果:
go func: 1
go func: 2
go func: 3
go func: 4
go func: 5
go func: 6
go func: 7
go func: 8
go func: 0
但是新的问题出现了,因为并不是所有的goroutine
都执行完了,在main
函数退出之后,还有一些goroutine
没有执行完就被强制结束了。这个时候我们就需要用到sync.WaitGroup
。使用WaitGroup
等待所有的goroutine
退出。如下:
尝试 sync.WaitGroup
var wg *sync.WaitGroup
func work() {
defer wg.Done()
//do something
}
func main() {
wg = &sync.WaitGroup{}
for i:=0; i < 10000; i++ {
wg.Add(1)
go work()
}
wg.Wait()//等待所有goroutine退出
}
单纯的使用 sync.WaitGroup
也不行。没有控制到同时并发的 goroutine
数量
尝试chan + sync
package main
import (
"fmt"
"runtime"
"sync"
)
var wg = sync.WaitGroup{}
// 任务业务流程
func business(ch chan bool, i int) {
fmt.Println("go func", i, " goroutine count = ", runtime.NumGoroutine)
<-ch
wg.Done()
}
func main() {
// 模拟用户需求的业务数量
task_cnt := 10
ch := make(chan bool, 3)
for i := 0; i < taskk_cnt; i++ {
wg.Add(1)
// 如果channel满了,就会阻塞
ch <- true
// 开启一个新协程
go business(ch, i)
}
wg.Wait()
}
⽆缓冲channel和任务发送/执⾏分离来限制(协程池)
package main
import (
"fmt"
"runtime"
"sync"
)
var wg = sync.WaitGroup{}
// 每个go的worker都要执行的一个工作流程
func business(ch chan int){
// 消费一个任务
for t := range ch {
fmt.Println(" go task = ", t, ", goroutine count = ", runtime.NumGoroutine())
wg.Done()
}
}
// 发送一个任务(任务的输入,任务的生产)
func sendTask(task int, ch chan int) {
wg.Add(1)
ch <- task
}
func main() {
// 无buffer的channel
ch := make(chan int)
// 1 启动goroutine工作池(go的数量是固定的)充当任务task的消费
goCnt := 3
for i := 0; i < goCnt; i++ {
// 启动goroutine的worker
go business(ch)
}
// 2模拟用户需求业务的数量,不断的给工作池发送task
taskCnt := math.MaxInt64
for t := 0; t < taskCnt; t++ {
// 发送任务
sendTask(t, ch)
}
wg.Wait()
}
控制goroutine的数量,封装一下
package gpool
import (
"sync"
)
type pool struct {
queue chan int
wg *sync.WaitGroup
}
func New(size int) *pool {
if size <= 0 {
size = 1
}
return &pool{
queue: make(chan int, size),
wg: &sync.WaitGroup{},
}
}
func (p *pool) Add(delta int) {
for i := 0; i < delta; i++ {
p.queue <- 1
}
for i := 0; i > delta; i-- {
<-p.queue
}
p.wg.Add(delta)
}
func (p *pool) Done() {
<-p.queue
p.wg.Done()
}
func (p *pool) Wait() {
p.wg.Wait()
}
测试代码:
package gpool_test
import (
"runtime"
"testing"
"time"
"gpool"
)
func Test_Example(t *testing.T) {
pool := gpool.New(100)
println(runtime.NumGoroutine())
for i := 0; i < 1000; i++ {
pool.Add(1)
go func() {
time.Sleep(time.Second)
println(runtime.NumGoroutine())
pool.Done()
}()
}
pool.Wait()
println(runtime.NumGoroutine())
}