前言
因为还没有深入研究过go的协程实现机制,所以这里只是简单表述协程的使用方法。
多协程执行
func main() {
wg := &sync.WaitGroup{}
var task = printHello
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
task()
}()
}
wg.Wait()
fmt.Println("hhh")
}
func printHello() {
now := time.Now()
fmt.Printf("Hello,now is: %d-%d-%d
", now.Year(), now.Month(), now.Day())
}
上面是使用go的协程的常用例子,使用go关键字创建新的协程执行任务,使用WaitGroup保证当所有协程都执行完毕的时候才会执行到fmt.Println("hhh")。这种方式有个弊端,由于没有控制协程数,有可能导致协程暴增。
协程组
协程组是使用一组协程执行任务的,可以看作是协程池。协程组规定了最多可以同时执行任务的协程数,避免了直接使用go关键字导致的协程暴增问题。这里先介绍第一种实现,这里要最需要注意的是Start方法中使用的ch变量。
package main
import (
"fmt"
"sync"
"time"
)
type Task func()
type Pool struct {
ConNum int // 并发数
Task chan Task // 任务channel
Wg *sync.WaitGroup // 同步组
}
func (p *Pool) init(conNum int, wg *sync.WaitGroup) {
p.ConNum = conNum
p.Wg = wg
p.Task = make(chan Task)
}
func (p *Pool) Start() {
ch := make(chan struct{}, p.ConNum) // 控制并发数
for task := range p.Task {
ch <- struct{}{}
p.Wg.Add(1)
go func() {
defer p.Wg.Done()
task()
<-ch
}()
}
}
// 添加执行任务
func (p *Pool) Execute(task Task) {
p.Task <- task
}
func main() {
pool := &Pool{}
pool.init(10, &sync.WaitGroup{})
go pool.Start()
start := time.Now()
for i := 0; i < 1000; i++ {
fn := func() {
time.Sleep(time.Millisecond)
}
pool.Execute(fn)
}
pool.Wg.Wait()
fmt.Println("tast cost: ", time.Since(start))
start = time.Now()
for i := 0; i < 1000; i++ {
time.Sleep(time.Millisecond)
}
fmt.Println("tast cost: ", time.Since(start))
}
输出:
tast cost: 11.880283ms tast cost: 1.261029362s
从上面的耗时可以看到,使用协程组确实缩短了耗时。这里使用了最多10个协程,可以对比发现,与不使用协程相比,约为其1/10,证明协程组确实有效。
协程组的第二种实现可以理解为常规的“线程池”,其中的协程是复用的。两种实现方式的耗时相当。
package main
import (
"fmt"
"sync"
"time"
)
type Task func()
type Pool struct {
ConNum int // 并发数
Task chan Task // 任务channel
Wg *sync.WaitGroup // 同步组
}
func (p *Pool) init(conNum int, wg *sync.WaitGroup) {
p.ConNum = conNum
p.Wg = wg
p.Task = make(chan Task)
}
func (p *Pool) Start() {
ch := make(chan struct{}, p.ConNum) // 控制并发数
for task := range p.Task {
ch <- struct{}{}
p.Wg.Add(1)
go func() {
defer p.Wg.Done()
task()
<-ch
}()
}
}
// 添加执行任务
func (p *Pool) Execute(task Task) {
p.Task <- task
}
type Pool2 struct {
ConNum int // 并发数
Task chan Task // 任务channel
Wg *sync.WaitGroup // 同步组
Mx *sync.Mutex
}
func (p *Pool2) init(conNum int, wg *sync.WaitGroup) {
p.ConNum = conNum
p.Wg = wg
p.Task = make(chan Task)
p.Mx = &sync.Mutex{}
}
func (p *Pool2) Start() {
go func() {
for i := 0; i < p.ConNum; i++ {
p.Wg.Add(1)
go func() {
defer p.Wg.Done()
for task := range p.Task {
task()
}
}()
}
}()
}
// 添加执行任务
func (p *Pool2) Execute(task Task) {
p.Mx.Lock()
defer p.Mx.Unlock()
if p.Task != nil {
p.Task <- task
}
}
func (p *Pool2) Done() {
p.Mx.Lock()
defer p.Mx.Unlock()
close(p.Task)
p.Task = nil
p.Wg.Wait()
}
func main() {
taskNum := 10000000
conNum := 1000
pool := &Pool{}
pool.init(conNum, &sync.WaitGroup{})
go pool.Start()
start := time.Now()
for i := 0; i < taskNum; i++ {
fn := func() {
time.Sleep(time.Millisecond)
}
pool.Execute(fn)
}
pool.Wg.Wait()
fmt.Println("pool,task cost: ", time.Since(start))
pool2 := &Pool2{}
pool2.init(conNum, &sync.WaitGroup{})
go pool2.Start()
start = time.Now()
for i := 0; i < taskNum; i++ {
fn := func() {
time.Sleep(time.Millisecond)
}
pool2.Execute(fn)
}
pool2.Done()
fmt.Println("pool2,task cost: ", time.Since(start))
}
耗时:
pool,task cost: 10.555079677s pool2,task cost: 10.513585021s
结合限流器的协程组
协程组的好处不言而喻,但高效并发执行是以资源占用为代价的,为了避免资源占用太多,可以通过go自带的限流器对可以对协程执行进行限流。如下面的代码,在使用限流器限流前,执行一百万个任务的耗时大约是1.1s,可以粗略认为QPS是百万/s,这是一个相当高的量了,所以打算用限流器进行限流,不让QPS那么高。通过设置限流器的limit参数为十万限制了任务Qps最高为10万/s,结果不出所料。
package main
import (
"context"
"fmt"
"golang.org/x/time/rate"
"sync"
"time"
)
type Task func()
type Pool struct {
ConNum int // 并发数
Task chan Task // 任务channel
Wg *sync.WaitGroup // 同步组
Limiter *rate.Limiter
Ctx context.Context
}
func (p *Pool) init(conNum int, wg *sync.WaitGroup) {
p.ConNum = conNum
p.Wg = wg
p.Task = make(chan Task)
p.Limiter = rate.NewLimiter(rate.Limit(100000), 10000)
p.Ctx = context.Background()
}
func (p *Pool) Start() {
ch := make(chan struct{}, p.ConNum) // 控制并发数
for task := range p.Task {
ch <- struct{}{}
p.Wg.Add(1)
go func() {
defer p.Wg.Done()
p.Limiter.Wait(p.Ctx)
task()
<-ch
}()
}
}
// 添加执行任务
func (p *Pool) Execute(task Task) {
p.Task <- task
}
func main() {
pool := &Pool{}
pool.init(100, &sync.WaitGroup{})
go pool.Start()
start := time.Now()
for i := 0; i < 1000000; i++ {
fn := func() {
time.Sleep(time.Nanosecond * 10)
}
pool.Execute(fn)
}
pool.Wg.Wait()
fmt.Println("tast cost: ", time.Since(start))
}
耗时:
tast cost: 9.89831723s