概要
golang 的提供的 channel 机制是基于 CSP(Communicating Sequencial Processes)模型的并发模式.
通过 channel, 可以很方便的写出多个 协程 (goroutine)之间协作的代码, 将顺序的代码改成并行的代码非常简单. 改造成并行的代码之后, 虽然可以更好的利用多核的硬件, 有效的提高代码的执行效率, 但是, 也带来了代码控制的问题.
并行的代码显然比顺序执行的代码更难于管理和控制, 这时, 就得靠 golang 提供的 Context 接口来帮助我们控制 goroutine 了.
goroutine 的控制
goroutine 之间最基本的控制, 就是通过 channel 来交互数据:
1 func routineSample() {
2 ch := make(chan int, 10)
3 go p1(ch)
4 go c1(ch)
5 go c2(ch)
6
7 time.Sleep(10 * time.Second)
8 }
9
10 func p1(ch chan int) {
11 fmt.Println("Parent go routine!")
12
13 for i := 0; i < 10; i++ {
14 ch <- i
15 }
16
17 close(ch)
18 }
19
20 func c1(ch chan int) {
21 fmt.Println("Child 1 go routine!")
22 for c := range ch {
23 fmt.Printf("child 1, recivie: %d
", c)
24 time.Sleep(100 * time.Millisecond)
25 }
26 }
27
28 func c2(ch chan int) {
29 fmt.Println("Child 2 go routine!")
30 for c := range ch {
31 fmt.Printf("child 2, recivie: %d
", c)
32 time.Sleep(100 * time.Millisecond)
33 }
34 }
上述是最基本的示例, p1 函数不断向 channel 中发送数据, c1 和 c2 负责处理数据. 虽然通过 channel 实现 c1, c2 的并发很简单, 但是可以看出, p1 要想控制 c1, c2 没有那么容易.
这时, 就需要通过 Context 接口来控制并发的协程.
取消控制
取消控制指的是任务的发起者, 在特定条件下, 发送信号指示已经接受到任务的协程停止任务的执行.
1 func routineSample() {
2 ch := make(chan int, 10)
3 ctx, cancel := context.WithCancel(context.Background())
4 go p1(ctx, ch)
5 go c1(ctx, ch)
6 go c2(ctx, ch)
7
8 // 300 ms之后取消任务
9 time.Sleep(300 * time.Millisecond)
10 cancel()
11
12 time.Sleep(10 * time.Second)
13 }
14
15 func p1(ctx context.Context, ch chan int) {
16 fmt.Println("Parent go routine!")
17
18 for i := 0; i < 10; i++ {
19 ch <- i
20 }
21
22 close(ch)
23 }
24
25 func c1(ctx context.Context, ch chan int) {
26 fmt.Println("Child 1 go routine!")
27 for c := range ch {
28 select {
29 case <-ctx.Done():
30 fmt.Println("child 1, return!")
31 return
32 default:
33 fmt.Printf("child 1, recivie: %d
", c)
34 }
35 time.Sleep(100 * time.Millisecond)
36 }
37 }
38
39 func c2(ctx context.Context, ch chan int) {
40 fmt.Println("Child 2 go routine!")
41 for c := range ch {
42 select {
43 case <-ctx.Done():
44 fmt.Println("child 2, return!")
45 return
46 default:
47 fmt.Printf("child 2, recivie: %d
", c)
48 }
49 time.Sleep(100 * time.Millisecond)
50 }
51 }
300 毫秒后, 发送取消任务的信号 cancel() , c1 和 c2 通过 select 判断是否有取消信号, 收到取消信号后, 退出处理.
通过执行结果可以看出, c1 和 c2 大约处理 5~6 个任务之后停止退出.
超时控制
除了取消控制, context 还有超时的控制.
1 func routineSample() {
2 ch := make(chan int, 10)
3 ctx, _ := context.WithTimeout(context.Background(), 300*time.Millisecond)
4 go p1(ctx, ch)
5 go c1(ctx, ch)
6 go c2(ctx, ch)
7
8 time.Sleep(10 * time.Second)
9 }
10
11 func p1(ctx context.Context, ch chan int) {
12 fmt.Println("Parent go routine!")
13
14 for i := 0; i < 10; i++ {
15 ch <- i
16 }
17
18 close(ch)
19 }
20
21 func c1(ctx context.Context, ch chan int) {
22 fmt.Println("Child 1 go routine!")
23 for c := range ch {
24 select {
25 case <-ctx.Done():
26 fmt.Println("child 1, return!")
27 return
28 default:
29 fmt.Printf("child 1, recivie: %d
", c)
30 }
31 time.Sleep(100 * time.Millisecond)
32 }
33 }
34
35 func c2(ctx context.Context, ch chan int) {
36 fmt.Println("Child 2 go routine!")
37 for c := range ch {
38 select {
39 case <-ctx.Done():
40 fmt.Println("child 2, return!")
41 return
42 default:
43 fmt.Printf("child 2, recivie: %d
", c)
44 }
45 time.Sleep(100 * time.Millisecond)
46 }
47 }
控制超时的 WithTimeout 也返回一个 cancel 函数, 可以在超时到达之前来取消任务的执行, 上面的例子等待超时时间达到后自动取消任务, 没有使用 cancel 函数.
goroutine 之间的传值
一般来说, goroutine 之间通过 channel 传递都是业务数据, 除此之外, 还可以通过 channel 来传递一些控制 goroutine 的元数据.
1 func routineSample() {
2 ch := make(chan int, 10)
3 // 哪个goroutine收到5号任务, 就退出, 不再做后续的任务
4 ctx := context.WithValue(context.Background(), "finish", 5)
5 go p1(ctx, ch)
6 go c1(ctx, ch)
7 go c2(ctx, ch)
8
9 time.Sleep(10 * time.Second)
10 }
11
12 func p1(ctx context.Context, ch chan int) {
13 fmt.Println("Parent go routine!")
14
15 for i := 0; i < 10; i++ {
16 ch <- i
17 }
18
19 close(ch)
20 }
21
22 func c1(ctx context.Context, ch chan int) {
23 fmt.Println("Child 1 go routine!")
24 for c := range ch {
25 if c == ctx.Value("finish").(int) {
26 fmt.Println("child 1, return!")
27 return
28 }
29 fmt.Printf("child 1, recivie: %d
", c)
30 time.Sleep(100 * time.Millisecond)
31 }
32 }
33
34 func c2(ctx context.Context, ch chan int) {
35 fmt.Println("Child 2 go routine!")
36 for c := range ch {
37 if c == ctx.Value("finish").(int) {
38 fmt.Println("child 2, return!")
39 return
40 }
41 fmt.Printf("child 2, recivie: %d
", c)
42 time.Sleep(100 * time.Millisecond)
43 }
44 }
上面的例子是在 context 中放置一个 key="finish" 的任务号, 如果 c1 或者 c2 收到的任务号和它相同, 则退出任务的执行. 通过运行上面的例子可以看出, c1 或者 c2 执行到 5 号任务的时候就会退出协程. 但是谁收到 5 号任务是不确定的, 多执行几次上面的代码, 可以发现有时是 c1 退出, 有时是 c2 退出.
总结
context 是控制并发协程的上下文, 利用 context, 可以大量简化控制协程的超时, 取消协程执行, 以及协程之间传值的代码. context 很方便, 但也不能乱用, 通过 channel 传递的业务数据, 不要放在 context 中来传递.
此外, context 是线程安全的, 可以放心的在多个协程中使用.