zoukankan      html  css  js  c++  java
  • [Journey with golang] 5. Concurrent

    并发和并行是两种不同的概念。

    • 并行意味着程序在任意时刻都是同时运行的
    • 并发意味着程序在单位时间内是同时运行的

    并行就是在任一粒度时间内都具备同时执行的能力,最简单的并行就是多机,多台机器并行处理。SMP表面上看是并行的,但由于是共享内存,以及线程间的同步等,不可能完全做到并行。

    并发是在规定时间内多个请求都得到执行和处理,强调的是给外界的感觉,实际上内部可能是分时操作的。并发重在避免阻塞,使程序不会因为一个阻塞而停职处理。并发典型的应用场景:分时操作系统就是一种并发设计(忽略多核CPU)。

    并行是硬件和操作系统开发者重点考虑的问题,作为应用层的程序员,唯一可以选择的就是充分借助操作系统提供的API和程序语言特性,结合实际需求设计出具有良好并发结构的程序,提升程序的并发处理能力。现代操作系统能够提供的最基础的并发模型就是多线程和多进程;编程语言这一层级可以进一步封装来提升程序的并发处理能力。

    在当前的计算机体系下,并行具有瞬时性,并发具有过程性;并发在于结构,并行在于执行。应用程序具备好的并发结构,操作系统才能更好地利用硬件并行执行,同时避免阻塞等待,合理地进行调度,提升CPU利用率。应用层程序员提升程序并发处理能力的一个重要手段就是为程序设计良好的并发结构。

    操作系统可以进行线程和进程的调度,本身具备并发处理能力,但进程切换代价还是过于高昂,进程切换需要保存现场,耗费较多的时间。如果应用程序能在用户层再构筑一级调度,将并发的粒度进一步降低,或许可以更大限度地提升程序运行效率。golang的并发就是基于这个思想实现的。golang在语言层面支持这种并发模式。

    golang的并发执行称为goroutine,routine的翻译为例程,故goroutine叫go例程会更加合理。golang通过go关键字来启动一个goroutine,go关键字后面必须跟一个函数,不能是语句或其他东西,函数的返回值被忽略。

    • 通过go+匿名函数形式启动goroutine,代码如下:
       1 package main
       2 
       3 import (
       4     "fmt"
       5     "runtime"
       6     "time"
       7 )
       8 
       9 func main() {
      10     go func() {
      11         sum := 0
      12         for i := 0; i < 1000; i++ {
      13             sum += i
      14         }
      15         fmt.Println(sum)
      16         time.Sleep(1 * time.Second)
      17     }()
      18     // NumGoroutine可以返回当前程序的goroutine数目
      19     fmt.Println("NumGoroutine =", runtime.NumGoroutine())
      20     // main goroutine故意sleep几秒防止提前退出
      21     time.Sleep(3 * time.Second)
      22 }
    • 通过go+有名函数形式启动goroutine,代码如下:
       1 package main
       2 
       3 import (
       4     "fmt"
       5     "runtime"
       6     "time"
       7 )
       8 
       9 func sum() {
      10     sum := 0
      11     for i := 0; i < 1000; i++ {
      12         sum += i
      13     }
      14     fmt.Println(sum)
      15     time.Sleep(1 * time.Second)
      16 }
      17 
      18 func main() {
      19     go sum()
      20     fmt.Println("NumGoroutine =", runtime.NumGoroutine())
      21     time.Sleep(3 * time.Second)
      22 }

    goroutine有如下特性:

    • go的执行是非阻塞的,不会等待
    • go后面的函数的返回值会被忽略
    • 调度器不能保证多个goroutine的执行次序
    • 没有父子goroutine的概念,所有goroutine是平等地被调度和执行的
    • go程序执行时会单独为main函数创建一个goroutine,遇到其他go关键字时再去创建其他的goroutine
    • go没有暴露goroutine id给用户,所以不能在一个goroutine里面显示地操作另一个goroutine,不过runtime包提供了一些函数访问和设置goroutine的相关信息
    1. func GOMAXPROCS
      1.  func GOMAXPROCS(n int) int 用来设置或查询可以并发执行的goroutine数目,n大于1表示设置GOMAXPROCS值,否则表示查询当前GOMAXPROCS值。
         1 package main
         2 
         3 import (
         4     "fmt"
         5     "runtime"
         6 )
         7 
         8 func main() {
         9     // 获取当前的GOMAXPROCS值
        10     fmt.Println("GOMAXPROCS =", runtime.GOMAXPROCS(0))
        11     // 设置GOMAXPROCS的值为2
        12     runtime.GOMAXPROCS(2)
        13     // 获取当前的GOMAXPROCS值
        14     fmt.Println("GOMAXPROCS =", runtime.GOMAXPROCS(0))
        15 }
    2. func Goexit
      1.  func Goexit() 是结束当前goroutine的运行,Goexit在结束当前goroutine之前会调用当前goroutine已经注册的defer,Goexit并不会产生panic,所以该goroutine defer里面的recover调用都返回nil。
    3. func Gosched
      1.  func Gosched() 是放弃当前调度执行机会,将当前goroutine放到队列中等待下次被调度。

    只有goroutine还是不够的,多个goroutine之间还需要通信、同步、协同等功能。

     chan 是golang里面的一个关键字,是channel的缩写,翻译为中文就是通道的意思。goroutine是golang里面的并发执行体,通道是goroutine之间通信和同步的重要组件。golang的哲学是“不要通过共享内存来通信,而是通过通信来共享内存”。通道是golang通过通信来共享内存的载体。

    通道是有类型的,可以简单地把它理解为有类型的管道。声明一个简单的通道语句是 chan dataType 但是简单声明一个通道变量没有任何意义,该变量并没有被初始化,其值为nil。golang提供一个内置函数make来创建通道,例如:

    1 package main
    2 
    3 func main() {
    4     // 创建一个没有缓冲区的通道,通道存放元素类型为datatype
    5     make(chan datatype)
    6     // 创建一个缓冲区大小为10的通道,通道存放元素类型为datatype
    7     make(chan datatype2, 10)
    8 }

    通道分为有缓冲和无缓冲两种,golang提供内置函数len和cap,无缓冲的通道的len和cap都是0,有缓冲的通道的len代表没有被读取的元素数,cap代表整个通道的容量。无缓冲的通道既可以用于通信,也可以用于两个goroutine的同步,有缓冲的通道主要用于通信。

    上面的代码使用了time.Sleep()来防止goroutine提前退出。有了通道后,可以使用无缓冲的通道来实现goroutines之间的同步等待。例如:

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "runtime"
     6 )
     7 
     8 func main() {
     9     c := make(chan struct{})
    10     go func(i chan struct{}) {
    11         sum := 0
    12         for i := 0; i < 10000; i++ {
    13             sum += i
    14         }
    15         fmt.Println(sum)
    16         // write chan
    17         c <- struct{}{}
    18     }(c)
    19     fmt.Println("NumGoroutine =", runtime.NumGoroutine())
    20     // read chan c
    21     <-c
    22 }

    goroutine运行结束后退出,写到缓冲通道中的数据不会消失,它可以缓冲和适配两个goroutine处理速率不一致的情况,缓冲通道和消息队列类似,有削峰和增大吞吐量的功能。示例如下:

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "runtime"
     6 )
     7 
     8 func main() {
     9     c := make(chan struct{})
    10     ci := make(chan int, 100)
    11     go func(i chan struct{}, j chan int) {
    12         for i := 0; i < 10; i++ {
    13             ci <- i
    14         }
    15         close(ci)
    16         // write channel
    17         c <- struct{}{}
    18     }(c, ci)
    19     fmt.Println("NumGoroutine =", runtime.NumGoroutine())
    20     <-c
    21     fmt.Println("NumGoroutine =", runtime.NumGoroutine())
    22     for v := range ci {
    23         fmt.Println(v)
    24     }
    25 }

    操作不同状态的chan会引发三种行为。

    • panic
      1. 向已经关闭的通道写数据会导致panic。最佳实践是由写入者关闭通道,能最大程度地避免向已经关闭的通道写数据而导致的panic。
      2. 重复关闭的通道会导致panic。
    • 阻塞
      1. 向未初始化的通道写数据或读数据都会导致当前goroutine的永久阻塞。
      2. 向缓冲区已满的通道写入数据会导致goroutine阻塞。
      3. 通道中没有数据,读取钙通道会导致goroutine阻塞。
    • 非阻塞
      1. 读取已经关闭的通道不会引发阻塞,而是立即返回通道元素类型的零值,可以使用comma,ok语法判断通道是否已经关闭。
      2. 向有缓冲且没有满的通道读/写不会引发阻塞

    除了chan,sync包也提供了多个goroutine同步的机制,主要是通过 WaitGroup 实现的。WaitGroup用来等待多个goroutine完成,main goroutine调用Add设置需要等待goroutine的数目,每一个goroutine结束时调用Done(),Wait()被main用来等地啊所有的goroutine完成。下面的程序演示如何使用sync.WaitGroup完成多个goroutine之间的协同工作:

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "net/http"
     6     "sync"
     7 )
     8 
     9 var wg sync.WaitGroup
    10 
    11 var urls = []string{
    12     "http://www.baidu.com/",
    13     "http://cn.bing.com/",
    14     "http://www.qq.com/",
    15 }
    16 
    17 func main() {
    18     for _, url := range urls {
    19         // 每一个url启动一个goroutine,同时给wg加1
    20         wg.Add(1)
    21         go func(url string) {
    22             // 当前goroutine结束后给wg计数减1,wg.Done()等价于wg.Add(-1)
    23             defer wg.Done()
    24             // 发送http get请求并打印http返回码
    25             resp, err := http.Get(url)
    26             if err == nil {
    27                 fmt.Println(url, resp.Status)
    28             }
    29         }(url)
    30     }
    31     // 等待所有请求结束
    32     wg.Wait()
    33 }

    select是类UNIX系统提供的一个多路复用系统API,golang借用多路复用的概念,提供了select关键字,用于多路监听多个通道。当监听的通道没有状态是可读或可写的,select是阻塞的;只要监听的通道中有一个状态是可读或可写,select就不会阻塞,而是进入处理就绪通道的分支流程。如果监听的通道有多个可读或可写的状态,select会随机选取一个处理。示例如下:

     1 package main
     2 
     3 import "fmt"
     4 
     5 func main() {
     6     ch := make(chan int, 1)
     7     go func(chan int) {
     8         for {
     9             select {
    10             case ch <- 0:
    11             case ch <- 1:
    12             }
    13         }
    14     }(ch)
    15     for i := 0; i < 10; i++ {
    16         fmt.Println(<-ch)
    17     }
    18 }

    编程中经常遇到“扇入”和“扇出”两个概念,所谓的扇入是指将多路通道聚合到一条通道中处理,golang最简单的扇入就是使用select聚合多条通道服务;所谓的扇出是指将一条通道发散到多条通道中处理,在golang中具体实现就是使用go关键字启动多个goroutine并发处理。当生产者的速度很慢时,需要使用扇入技术聚合多个生产者满足消费者,比如很耗时的加密/解密服务;当消费者的速度很慢时,需要使用扇出技术,比如web服务器并发请求处理。

    读取已经关闭的通道不会引起阻塞,也不会导致panic,而是立即返回该通道存储类型的零值。关闭select监听的某个通道能使select立即感知这种通知,然后进行相应的处理,这就是所谓的退出通知机制。退出通知机制是学习使用context库的基础。下面通过一个随机数生成器的实例演示退出通知机制,下有的消费者不需要随机数时,显式地通知生产者停止生产。

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "math/rand"
     6     "runtime"
     7 )
     8 
     9 func generateIntA(done chan struct{}) chan int {
    10     ch := make(chan int)
    11     go func(chan int) {
    12     Lable:
    13         for {
    14             select {
    15             case ch <- rand.Int():
    16             case <-done:
    17                 break Lable
    18             }
    19         }
    20         close(ch)
    21     }(ch)
    22     return ch
    23 }
    24 
    25 func main() {
    26     done := make(chan struct{})
    27     ch := generateIntA(done)
    28     fmt.Println(<-ch)
    29     fmt.Println(<-ch)
    30     close(done)
    31     fmt.Println(<-ch)
    32     fmt.Println(<-ch)
    33     fmt.Println("NumGoroutine =", runtime.NumGoroutine())
    34 }

    在应用系统编程中,常见的应用场景就是调用一个统一的全局的生成器服务,用于生成全局事务号、订单号、序列号和随机数等。最简单的带缓冲的生成器如下所示:

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "math/rand"
     6 )
     7 
     8 func generateInt() chan int {
     9     ch := make(chan int, 10)
    10     // 启动一个goroutine用于生成随机数,函数返回一个通道用于获取随机数
    11     go func() {
    12         for {
    13             ch <- rand.Int()
    14         }
    15     }()
    16     return ch
    17 }
    18 
    19 func main() {
    20     ch := generateInt()
    21     fmt.Println(<-ch)
    22     fmt.Println(<-ch)
    23 }

    可以把这个随机数生成器用多个goroutine来增强一下:

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "math/rand"
     6 )
     7 
     8 func generateIntA() chan int {
     9     ch := make(chan int, 10)
    10     // 启动一个goroutine用于生成随机数,函数返回一个通道用于获取随机数
    11     go func() {
    12         for {
    13             ch <- rand.Int()
    14         }
    15     }()
    16     return ch
    17 }
    18 
    19 func generateIntB() chan int {
    20     ch := make(chan int, 10)
    21     go func() {
    22         for {
    23             ch <- rand.Int()
    24         }
    25     }()
    26     return ch
    27 }
    28 
    29 func generateInt() chan int {
    30     ch := make(chan int, 10)
    31     go func() {
    32         for {
    33             // 使用select的扇入技术来增加生成的随机源
    34             select {
    35             case ch <- <-generateIntA():
    36             case ch <- <-generateIntB():
    37             }
    38         }
    39     }()
    40     return ch
    41 }
    42 
    43 func main() {
    44     ch := generateInt()
    45     for i := 0; i < 10; i++ {
    46         fmt.Println(<-ch)
    47     }
    48 }

    有时希望生成器能够自动退出,可以借助go通道的退出通知机制实现,例如:

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "math/rand"
     6 )
     7 
     8 func generateIntA() chan int {
     9     ch := make(chan int, 10)
    10     // 启动一个goroutine用于生成随机数,函数返回一个通道用于获取随机数
    11     go func() {
    12         for {
    13             ch <- rand.Int()
    14         }
    15     }()
    16     return ch
    17 }
    18 
    19 func generateIntB() chan int {
    20     ch := make(chan int, 10)
    21     go func() {
    22         for {
    23             ch <- rand.Int()
    24         }
    25     }()
    26     return ch
    27 }
    28 
    29 func generateInt() chan int {
    30     ch := make(chan int, 10)
    31     go func() {
    32         for {
    33             // 使用select的扇入技术来增加生成的随机源
    34             select {
    35             case ch <- <-generateIntA():
    36             case ch <- <-generateIntB():
    37             }
    38         }
    39     }()
    40     return ch
    41 }
    42 
    43 func main() {
    44     ch := generateInt()
    45     for i := 0; i < 10; i++ {
    46         fmt.Println(<-ch)
    47     }
    48 }

    一个融合了并发、缓冲、退出通知等多重特性的生成器如下:

     1 package main
     2 
     3 import "math/rand"
     4 
     5 import "fmt"
     6 
     7 func generateIntA(done chan struct{}) chan int {
     8     ch := make(chan int, 5)
     9     go func() {
    10     Lable:
    11         for {
    12             select {
    13             case ch <- rand.Int():
    14             case <-done:
    15                 break Lable
    16             }
    17         }
    18         close(ch)
    19     }()
    20     return ch
    21 }
    22 
    23 func generateIntB(done chan struct{}) chan int {
    24     ch := make(chan int, 10)
    25     go func() {
    26     Lable:
    27         for {
    28             select {
    29             case ch <- rand.Int():
    30             case <-done:
    31                 break Lable
    32             }
    33         }
    34         close(ch)
    35     }()
    36     return ch
    37 }
    38 
    39 func generateInt(done chan struct{}) chan int {
    40     ch := make(chan int)
    41     send := make(chan struct{})
    42     go func() {
    43     Lable:
    44         for {
    45             select {
    46             case ch <- <-generateIntA(send):
    47             case ch <- <-generateIntB(send):
    48             case <-done:
    49                 break Lable
    50             }
    51         }
    52         close(ch)
    53     }()
    54     return ch
    55 }
    56 
    57 func main() {
    58     done := make(chan struct{})
    59     ch := generateInt(done)
    60     for i := 0; i < 10; i++ {
    61         fmt.Println(<-ch)
    62     }
    63     done <- struct{}{}
    64     fmt.Println("stop generate")
    65 }

    通道可以分为两个方向,一个是读,一个是写。假如一个函数的输入参数和输出参数都是相同的chan类型,则该函数可以调用自己,最终形成一个调用链。当然多个具有相同参数类型的函数也能组成一个调用链,这很像是linux系统的管道,是一个有类型的管道。下面通过具体的实例演示golang这种链式处理能力:

     1 package main
     2 
     3 import "fmt"
     4 
     5 func chain(in chan int) chan int {
     6     out := make(chan int)
     7     go func() {
     8         for v := range in {
     9             out <- 1 + v
    10         }
    11         close(out)
    12     }()
    13     return out
    14 }
    15 
    16 func main() {
    17     in := make(chan int)
    18     go func() {
    19         for i := 0; i < 10; i++ {
    20             in <- i
    21         }
    22         close(in)
    23     }()
    24     // 连续调用三次chain,相当于把in中的每个元素都加3
    25     out := chain(chain(chain(in)))
    26     for v := range out {
    27         fmt.Print(v, " ")
    28     }
    29 }

    若每个请求对应一个goroutine,则这种并发模式相对比较简单,就是来一个请求或任务就启动一个goroutine去处理,典型的就是golang中的http server服务。下面以计算100个自然数的和来举例,将计算任务拆分为多个task,每个task启动一个goroutine进行处理:

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "sync"
     6 )
     7 
     8 // 工作任务
     9 type task struct {
    10     begin, end int
    11     result     chan<- int
    12 }
    13 
    14 // 任务执行:计算begin到end的和
    15 // 执行结果写入chan result
    16 func (t *task) do() {
    17     sum := 0
    18     for i := t.begin; i <= t.end; i++ {
    19         sum += i
    20     }
    21     t.result <- sum
    22 }
    23 
    24 // 构建task并写入task通道
    25 func initTask(taskchan chan<- task, r chan int, p int) {
    26     qu := p / 10
    27     mod := p % 10
    28     high := qu * 10
    29     for j := 0; j < qu; j++ {
    30         b := 10*j + 1
    31         e := 10 * (j + 1)
    32         tsk := task{
    33             begin:  b,
    34             end:    e,
    35             result: r,
    36         }
    37         taskchan <- tsk
    38     }
    39     if mod != 0 {
    40         tsk := task{
    41             begin:  high + 1,
    42             end:    p,
    43             result: r,
    44         }
    45         taskchan <- tsk
    46     }
    47     close(taskchan)
    48 }
    49 
    50 // 读取task chan,每个task启动一个worker goroutine进行处理
    51 // 并等待每个task运行完,关闭结果通道
    52 func distributeTask(taskchan <-chan task, wait *sync.WaitGroup, result chan int) {
    53     for v := range taskchan {
    54         wait.Add(1)
    55         go processTask(v, wait)
    56     }
    57     wait.Wait()
    58     close(result)
    59 }
    60 
    61 // goroutine处理具体工作,并将处理结果发送到结果通道
    62 func processTask(t task, wait *sync.WaitGroup) {
    63     t.do()
    64     wait.Done()
    65 }
    66 
    67 // 读取结果通道,汇总结果
    68 func processResult(resultchan chan int) int {
    69     sum := 0
    70     for r := range resultchan {
    71         sum += r
    72     }
    73     return sum
    74 }
    75 
    76 func main() {
    77     // 创建用户通道
    78     taskchan := make(chan task, 10)
    79     // 创建结果通道
    80     resultchan := make(chan int, 10)
    81     // wait用于同步等待任务的执行
    82     wait := &sync.WaitGroup{}
    83     // 初始化task的goroutine,计算100个自然数的和
    84     go initTask(taskchan, resultchan, 100)
    85     // 每个task启动一个goroutine进行处理
    86     go distributeTask(taskchan, wait, resultchan)
    87     // 通过结果通道获取结果并汇总
    88     sum := processResult(resultchan)
    89     fmt.Println("sum =", sum)
    90 }

    程序逻辑分析如下:

    1. initTask函数构建task并发送到task通道中
    2. 分发任务函数distributeTask为每个task启动一个goroutine处理任务,等待其处理完成,然后关闭结果通道
    3. processResult函数读取并统计所有的结果

    这几个函数分别在不同的goroutine中运行,它们通过通道和sync.WaitGroup进行通信和同步。

    服务器编程中使用最多的就是通过线程池来提升服务的并发处理能力。在golang编程中,一样可以轻松地构建固定数目的goroutines作为工作线程池。下面还是以计算多个整数的和为例来说明这种并发范式。

    程序中除了主要的main goroutine,还开启了如下几类goroutine:

    1. 初始化任务的goroutine
    2. 分发任务的goroutine
    3. 等待所有worker结束通知,然后关闭结果通道的goroutine

    main函数负责拉起上述goroutine,并从结果通道获取最终的结果。程序采用三个通道,分别是:

    1. 传递task任务的通道
    2. 传递task结果的通道
    3. 接受worker处理完任务后所发送通知的通道

    相关代码如下:

      1 package main
      2 
      3 import "fmt"
      4 
      5 // NUMBER 是工作池的goroutine数目
      6 const NUMBER = 10
      7 
      8 // 工作任务
      9 type task struct {
     10     begin, end int
     11     result     chan<- int
     12 }
     13 
     14 // 任务处理:计算从begin到end的和
     15 // 执行结果写入结果chan result
     16 func (t *task) do() {
     17     sum := 0
     18     for i := t.begin; i <= t.end; i++ {
     19         sum += i
     20     }
     21     t.result <- sum
     22 }
     23 
     24 // 初始化待处理task chan
     25 func initTask(taskchan chan<- task, r chan int, p int) {
     26     qu := p / 10
     27     mod := p % 10
     28     high := qu * 10
     29     for j := 0; j < qu; j++ {
     30         b := 10*j + 1
     31         e := 10 * (j + 1)
     32         tsk := task{
     33             begin:  b,
     34             end:    e,
     35             result: r,
     36         }
     37         taskchan <- tsk
     38     }
     39     if mod != 0 {
     40         tsk := task{
     41             begin:  high + 1,
     42             end:    p,
     43             result: r,
     44         }
     45         taskchan <- tsk
     46     }
     47     close(taskchan)
     48 }
     49 
     50 // 读取task chan并分发到worker goroutine处理,总的数量是workers
     51 func distributeTask(taskchan <-chan task, workers int, done chan struct{}) {
     52     for i := 0; i < workers; i++ {
     53         go processTask(taskchan, done)
     54     }
     55 }
     56 
     57 // 工作goroutine处理具体工作,并将处理结果发送到结果chan
     58 func processTask(taskchan <-chan task, done chan struct{}) {
     59     for t := range taskchan {
     60         t.do()
     61     }
     62     done <- struct{}{}
     63 }
     64 
     65 // 通过done channer同步等待所有工作goroutine的结束,然后关闭结果chan
     66 func closeResult(done chan struct{}, resultchan chan int, workers int) {
     67     for i := 0; i < workers; i++ {
     68         <-done
     69     }
     70     close(done)
     71     close(resultchan)
     72 }
     73 
     74 // 读取结果通道,汇总结果
     75 func processResult(resultchan chan int) int {
     76     sum := 0
     77     for r := range resultchan {
     78         sum += r
     79     }
     80     return sum
     81 }
     82 
     83 func main() {
     84     workers := NUMBER
     85     // 工作通道
     86     taskchan := make(chan task, 10)
     87     // 结果通道
     88     resultchan := make(chan int, 10)
     89     // worker信号通道
     90     done := make(chan struct{}, 10)
     91     // 初始化task的goroutine,计算100个自然数之和
     92     go initTask(taskchan, resultchan, 100)
     93     // 分发任务到number个goroutine池
     94     distributeTask(taskchan, workers, done)
     95     // 获取各个goroutine处理完任务的通知,并关闭结果通道
     96     go closeResult(done, resultchan, workers)
     97     // 通过结果通道获取结果并汇总
     98     sum := processResult(resultchan)
     99     fmt.Println("sum =", sum)
    100 }

    程序逻辑分析如下:

    1. 构建task并发送到task通道中
    2. 分别启动n个工作线程,不停地从task通道中获取任务,然后将结果写入结果通道。如果任务通道被关闭,则负责向瘦脸结果的goroutine发送通知,告诉其当前worker已经完成工作
    3. 收敛结果的goroutine接收到所有task已经处理完毕的信号后,主动关闭结果通道
    4. main中的函数processResult读取并统计所有的结果

    编程中经常遇到在一个流程中需要调用多个子调用的情况,这些子调用相互之间没有依赖,如果串行地调用,则耗时会很长,此时可以使用golang并发编程中的future模式。future模式的基本工作原理如下:

    1. 使用chan作为函数参数
    2. 启动goroutine调用函数
    3. 通过chan传入函数
    4. 做其他可以并行处理的事情
    5. 通过chan异步获取结果

    以下是一个例子:

    package main
    
    import (
        "fmt"
        "time"
    )
    
    // 一个查询结构体
    // 这里的sql和result是一个简单的抽象,具体的应用可能是更复杂的数据类型
    type query struct {
        sql, result chan string
    }
    
    // 执行query
    func execQuery(q query) {
        // 启动协程
        go func() {
            // 获取输入
            sql := <-q.sql
            // 访问数据库
            // 输出结果通道
            q.result <- "result from " + sql
        }()
    }
    
    func main() {
        // 初始化query
        q := query{make(chan string, 1), make(chan string, 1)}
        // 执行query,注意执行的时候无需准备参数
        go execQuery(q)
        // 发送参数
        q.sql <- "select * from table"
        // 做其他事情,通过time.Sleep()来描述
        time.Sleep(1 * time.Second)
        fmt.Println(<-q.result)
    }

    future最大的好处是将函数的同步调用转换为异步调用,适用于一个交易需要多个子调用且这些子调用没有依赖的场景。

    golang中的goroutine之间没有父子关系,也没有所谓子进程退出后的通知机制,多个goroutine都是平行地被调度,多个goroutine如何协作工作设计通信、同步、通知和退出四个方面。

    • 通信:chan是goroutine之间通信的基础,这里的通信主要是指程序的数据通道
    • 同步:不带缓冲的chan提供了一个天然的同步等待机制;sync.WaitGroup也为多个goroutine协同工作提供一种同步等待机制
    • 通知:这个通知和上面通信的数据不一样,通知通常不是业务数据,而是管理、控制流数据。要处理这个,可以在输入端绑定两个chan,一个用于业务流数据,另一个用于异常通知数据,然后通过select收敛进行处理,但这不是个通用解决方案。
    • 退出:goroutine之间没有父子关系,如何通知goroutine退出?可以通过增加一个单独的通道,借助通道和select的广播机制(close channel to broadcast)实现退出。

    goalng在语法上处理某个goroutine退出通知机制很简单,但是遇到复杂的并发结构处理起来就显得力不从心。实际编程中goroutine会拉起新的goroutine,新的goroutine又会拉起另一个新的goroutine,最终形成一个树状的结构。由于goroutine里并没有父子的概念,这个树状的结构知识在程序员头脑中抽象出来的,程序的执行模型并没有维护这么一个树状结构。为了解决这个问题,go 1.7版本提供了一个标准库context来实现,它提供两种功能:退出通知和元数据传递。context库的设计目的就是跟踪goroutine调用,在其内部维护一个调用树,并在这些调用树中传递通知和元数据。注意:通知和数据都可以传递给goroutine调用树上的每一个goroutine。

    在介绍context库之前,先理解context包的整体工作机制:第一个创建context的goroutine被称为root节点。root节点负责创建一个实现context接口的具体对象,并将该对象作为参数传递到其新拉起的goroutine,下游的goroutine可以继续封装该对象,再传递到更下游的goroutine。context对象在传递的过程中最终形成一个树状的数据结构,这样通过位于root节点的context对象就能遍历整个context对象树,通知和消息就可以通过root节点传递出去,实现了上游goroutine对下游goroutine的消息传递。

    context是一个基本接口,所有的context对象都要实现该接口,context的使用者在调用接口中都使用context作为参数类型。具体分析如下:

     1 type Context interface {
     2     // 如果context实现了超时控制,则该方法返回ok true,deadline为超时时间,
     3     // 否则ok为false
     4     Deadline() (deadline time.Time, ok bool)
     5     // 后端被调的goroutine应该监听该方法返回的chan,以便及时释放资源
     6     Done() <-chan struct{}
     7     // Done返回的chan收到通知的时候,才可以访问Err()获知因为什么原因被取消
     8     Err() error
     9     // 可以访问上游goroutine传递给下游goroutine的值
    10     Value(key interface{}) interface{}
    11 }

    canceler接口是一个扩展接口,规定了取消通知的context具体类型需要实现的接口。context包中的具体类型*cancelCtx和*timerCtx都实现了该接口。示例如下:

    1 // 一个context对象如果实现了canceler接口,则可以被取消
    2 type canceler interface {
    3     // 创建cancel接口实例的goroutine调用cancel方法通知后续创建的goroutine退出
    4     cancel(removeFromParent bool, err error)
    5     // Done方法返回的chan需要后端goroutine来监听,并及时退出
    6     Done() <-chan struct{}
    7 }

    emptyCtx实现了context接口,但不具备任何功能,因为其所有的方法都是空实现。其存在的目的是作为context对象树的根节点。因为context包的使用思路就是不停地调用context包使用的包装函数来创建具有特殊功能的context实例,每一个context实例的创建都以上一个context对象为参数,最终形成一个树状的结构。package定义了两个全局变量和两个封装函数,返回两个emptyCtx实例对象,实际使用时通过调用这两个封装函数来构造context的root节点。

    cancelCtx是一个实现了context接口的具体类型,同时实现了conceler接口,conceler具有退出通知方法。注意退出通知机制不但能通知自己,也能逐层通知其children节点。

    timerCtx是一个实现了context接口的具体类型,内部封装了cancelCtx类型实例,同时有一个deadline变量,用来实现定时退出通知。

    valueCtx是一个实现了context接口的具体类型,内部封装了context接口类型,同时封装了一个k/v的存储变量。valueCtx可用来传递通知信息。

    下面这两个函数是构造context取消树的根节点对象,根节点对象用作后续With包装函数的实参。

    1 func Background() Context
    2 func TODO() Context

    With包装函数用来构建不同功能的Context具体对象,比如:

    1. 创建一个带有退出通知的context具体对象,内部创建一个cancelCtx的类型实例
    2. 创建一个带有超时通知的context具体对象,内部创建一个timerCtx的类型实例(WithDeadline)
    3. 创建一个带有超时通知的context具体对象,内部创建一个timerCtx的类型实例(WithTimeout)
    4. 创建一个能够传递数据的context具体对象,内部创建一个valueCtx的类型实例

    上述函数都有一个共同的特点,都具有parent参数,其实这就是实现context通知树的必备条件,在goroutine的调用链中,context的实例被逐层地包装并传递,每层又可以对传进来的context实例再封装自己所需的功能,整个调用树需要一个数据结构来维护,这个维护逻辑在这些包装函数内部实现。

    前面描述的With开头的构造函数是给外部程序使用的API接口函数,context具体对象的链条关系是在with函数的内部维护的。现在分析一下with函数内部使用的通用函数。

     func propagateCancel(parent Context, child canceler) 有如下几个功能:

    1. 判断parent的方法Done()返回值是否为nil,如果是,则说明parent不是一个可取消的context对象,也就无所谓取消构造树,说明child就是取消构造树的根
    2. 如果parent的方法Done()返回值不是nil,则向上回溯自己的祖先是否为cancelCtx类型实例,如果是,则将child的子节点注册维护到那棵关系树里面
    3. 如果向上回溯自己的祖先都不是cancelCtx类型实例,则说明整个链条的取消树是不连续的。此时只需监听parent和自己的取消信号即可

     func parentCancelCtx(parent Context)(*cancelCtx, bool) 判断parent中是否封装*cancelCtx的字段,或者接口里面存放的底层类型是否是*cancelCtx类型。

     func removeChild(parent Context, child canceler) 如果parent封装*cancelCtx类型字段,或者接口里面存放的底层类型是*cancelCtx类型,则将其构造树上的child节点删除。

    下面通过一段实验性质的代码阐述context的基本用法:

     1 package main
     2 
     3 import (
     4     "context"
     5     "fmt"
     6     "time"
     7 )
     8 
     9 // 定义一个包含Context字段的新类型
    10 type otherContext struct {
    11     context.Context
    12 }
    13 
    14 func work(ctx context.Context, name string) {
    15     for {
    16         select {
    17         case <-ctx.Done():
    18             fmt.Printf("%s get msg to cancel
    ", name)
    19             return
    20         default:
    21             fmt.Printf("%s is running 
    ", name)
    22             time.Sleep(1 * time.Second)
    23         }
    24     }
    25 }
    26 
    27 func workWithValue(ctx context.Context, name string) {
    28     for {
    29         select {
    30         case <-ctx.Done():
    31             fmt.Printf("%s get msg to cancel
    ", name)
    32             return
    33         default:
    34             value := ctx.Value("key").(string)
    35             fmt.Printf("%s is running value = %s
    ", name, value)
    36             time.Sleep(1 * time.Second)
    37         }
    38     }
    39 }
    40 
    41 func main() {
    42     // 使用context.Background()构建一个WithCancel类型的上下文
    43     ctxa, cancel := context.WithCancel(context.Background())
    44     // work模拟运行并检测前端的退出通知
    45     go work(ctxa, "work1")
    46     // 使用withdeadline包装前面的上下文对象ctxa
    47     tm := time.Now().Add(3 * time.Second)
    48     ctxb, _ := context.WithDeadline(ctxa, tm)
    49     go work(ctxb, "work2")
    50     // 使用withvalue包装前面的上下文对象ctxb
    51     oc := otherContext{ctxb}
    52     ctxc := context.WithValue(oc, "key", "andes,pass from main ")
    53     go workWithValue(ctxc, "work3")
    54     // 故意sleep10秒让work2、work3超时退出
    55     time.Sleep(10 * time.Second)
    56     // 显式调用work1的cancel方法通知其退出
    57     cancel()
    58     // 等待work1打印退出信息
    59     time.Sleep(5 * time.Second)
    60     fmt.Println("main stop")
    61 }

    在使用context的过程中,程序在底层实际上维护了两条关系链:

    1. children key构成从根到叶子节点context实例的引用关系,这个关系在调用with函数时进行维护(调用上文介绍的 propagateCancel(parent Context, child canceler) 函数维护)程序有一层这样的树状结构:
      ctxa.children--->ctxb
      ctxb.children--->ctxc

      这个树提供一种从根节点开始遍历树的方法,context包的取消广播通知的核心就是基于这一点实现的。取消通知沿着这条链从根节点向下层节点逐层广播。当然也可以在任意一个子树上调用取消通知,一样会扩散到整棵树。示例程序中ctxa收到退出通知,会通知其绑定work1,同时会广播给ctxb和ctxc绑定的work2和work3。同理,ctxc收到退出通知,会通知到其绑定的work2,同时会广播给ctxc绑定的work3。

    2. 在构造context的对象中不断地包裹context实例形成一个引用关系链,这个关系链的方向是相反的,是自底向上的。示例程序中多个context对象的关系如下:
      ctxc.Context --> oc
      ctxc.Context.Context --> ctxb
      ctxc.Context.Context.cancelCtx --> ctxa
      ctxc.Context.Context.cancelCtx.Context --> new(emptyCtx)

      这个关系链主要用来切断当前Context实例和上层的Context实例之间的关系,比如ctxb调用了退出通知或定时器到期了,ctxb后续就没有必要在通知广播树上继续存在,它需要找到自己的parent,然后执行 delete(parent.children, ctxb) ,把自己从广播树上清理掉。

    再来看一下对核心代码的解读:

     1     ctxa, cancel := context.WithCancel(context.Background())
     2     /*
     3         ctxa内部状态---->ctxa=&cancelCtx{
     4             Context: new(emptyCtx)
     5         }
     6     */
     7     // work模拟运行并检测前端的退出通知
     8     go work(ctxa, "work1")
     9     // 使用withdeadline包装前面的上下文对象ctxa
    10     tm := time.Now().Add(3 * time.Second)
    11     ctxb, _ := context.WithDeadline(ctxa, tm)
    12     /*
    13         ctxb内部状态---->ctxb=&timerCtx{
    14             cancelCtx: ctxa
    15             dataline: tm
    16         }
    17         同时触发ctxa,在children中维护ctxb作为子节点
    18     */
    19     go work(ctxb, "work2")
    20     // 使用withvalue包装前面的上下文对象ctxb
    21     oc := otherContext{ctxb}
    22     ctxc := context.WithValue(oc, "key", "andes,pass from main ")
    23     /*
    24         ctxc---->&cancelCtx{
    25             Context: oc
    26         }
    27         同时通过oc.Context找到ctxb,通过ctxb.cancelCtx找到ctxa,在ctxa的children字段中维护ctxc作为其子节点
    28     */
    29     go workWithValue(ctxc, "work3")

    通过上文实例梳理出使用Context包的一般流程如下:

    1. 创建一个Context根对象。例如:
      func Background() Context
      func TODO() Context
    2. 包装上一步创建的Context对象,使其具有特定的功能。这些包装函数是context package的核心,几乎所有的封装都是从包装函数开始的。原因很简单,使用context包的核心就是使用其退出通知广播功能。示例如下:
      func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
      func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
      func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
      func WithValue(parent Context, key, val interface{}) Context
    3. 将上一步创建的对象作为实参传给后续启动的并发函数(通常作为函数的第一个参数),每个并发函数内部可以继续使用包装函数对传进来的Context对象进行包装,添加自己所需的功能。
    4. 顶端的goroutine在超时后调用cancel退出通知函数,通知后端所有goroutine释放资源。
    5. 后端的goroutine通过select监听Context.Done()返回的chan,及时响应前端goroutine的退出通知,一般停止本次处理,释放所占用的资源。

    关于使用context传递数据的争议,首先要清楚使用context包主要是解决goroutine的通知退出,传递数据是其一个额外的功能,可以使用它传递一些元信息。总之使用context传递的信息不能影响正常的业务流程,程序不要期待在context中传递一些必要的参数等,没有这些参数,程序也应能正常工作。

    在context中传递数据的坏处如下:

    1. 传递的都是interface{}类型的值,编译器不能进行严格的类型校验
    2. 从interface{}到具体类型需要使用类型断言和接口查询,有一定的运行期开销和性能损失
    3. 值在传递过程中有可能被后续的服务覆盖,且不易被发现
    4. 传递信息不简明,较晦涩;不能通过代码或文档一眼看到传递的是什么,不利于后续维护。

    context应该传递什么数据:

    1. 日志信息
    2. 调试信息
    3. 不影响业务主逻辑的可选数据

    context包提供的核心功能是多个goroutine之间的退出通知机制,传递数据只是一个辅助功能,应谨慎使用context传递数据。

    下面讲一点跟并发相关的知识。

    《Communicating Sequential Processes》(CSP)是计算机科学领域的“大牛”托尼霍尔于1978年发表的一篇论文,后期不断优化,最终发展为一个代数理论,用来描述并发系统消息通信模型并验证其正确性。其最基本的思想是:将并发系统抽象为channel和process两部分,channel用来传递消息,process用于执行,channel和process相互独立,没有从属关系,消息的发送和接收有严格的时序限制。golang主要借鉴了channel和process的概念,在go中channel就是通道,process就是goroutine。

    应用程序的并发模型是多样的,有如下三种:

    • 多进程模型:进程都能被多核CPU并发调度,优点是每个进程都有自己独立的内存空间,隔离性好、健壮性高;缺点是进程比较重,进程的切换消耗较大,进程间的通信需要多次在内核区和用户区之间复制数据
    • 多线程模型:这里的多线程是指多个内核线程进行处理,线程的优点是通过共享内存进行通信更快捷,切换代价小;缺点是多个线程共享内存空间,极易导致数据访问混乱,某个线程误操作内存可能危及整个线程组,健壮性不高
    • 用户级多线程模型:又分为两种情况,一种是M:1的方式,M个用户线程对应一个内核进程,这种情况很容易因为一个系统阻塞,其他用户进程都会被阻塞,不能利用机器多核的优势。还有一种模式就是M:N的方式,M个用户线程对应N个内核线程,这种模式一般需要语言运行时或库的支持,效率最高

    程序并发处理的要求越来越高,但是不能无限制地增加系统线程数,线程数过多会导致操作系统的调度开销变大,单个线程的单位时间内被分配的运行时间片减少,单个线程的运行速度降低,单靠增加系统线程数不能满足要求。为了不让系统线程无限膨胀,于是就有了协程的概念。协程是一种用户态的轻量级线程,协程的调度完全由用户态程序控制,协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,每个内核线程可以对应多个用户协程,当一个协程执行体阻塞了,调度器会调度另一个协程执行,最大效率地利用操作系统分给系统线程的时间片。前面提到的用户级多线程模型就是一种协程模型,尤其以M:N模型最为高效。这样的好处显而易见:

    1. 控制了系统线程数,保证每个线程的运行时间片充足
    2. 调度曾能进行用户态的切换,不会导致单个协程阻塞整个程序的情况,尽量减少上下文切换,提升运行效率

    由此可见,协程是一种非常高效、理想的执行模型。golang的并发执行模型就是一种变种的协程模型。golang在语言层面引入goroutine,有以下好处:

    • goroutine可以在用户空间调度,避免了内核态和用户态的切换导致的成本
    • goroutine是语言原生支持的,提供了非常简洁的语法,屏蔽了大部分复杂底层实现
    • goroutine更小的栈空间允许用户创建成千上万的实例

    goroutine的调度模型可以抽象出三个实体:M、P、G

    G(goroutine)是golang运行时对goroutine的抽象描述。G中存放并发执行的代码入口地址、上下文、运行环境(关联的P和M)、运行栈等执行相关的元信息。G的新建、休眠、恢复、停止都受到golang运行时的管理。golang运行时的监控线程会监控G的调度,G不会长久地阻塞系统线程,运行时的调度器会自动切换到其他G上继续运行。G新建或恢复时会添加到运行队列,等待M取出并运行。

    M(machine)代表os内核线程,是操作系统层面调度和执行的实体。M仅负责执行,M不停地被唤醒或创建,然后执行。M启动时进入的是运行时的管理代码,由这段代码获取G和P资源,然后执行调度。另外,golang运行时会单独创建一个监控线程,负责对程序的内存、调度等信息进行监控和控制

    P(processor)代表M运行G所需要的资源,是对资源的一种抽象和管理,P不是一段代码实体,而是一个管理的数据结构,P主要是降低M管理调度G的复杂性,增加一个间接的控制层数据结构。把P看作资源,而不是处理器,P控制golang代码的并行度,它不是运行实体。P持有G的队列,P可以隔离调度,解决P和M的绑定就解除了M对一串G的调用。P在运行模型中只是一个数据模型,而不是程序控制模型,理解这一点非常重要。

    M和P一起构成一个运行时环境,每个P有一个本地的可调度的G队列,队列里的G会被M依次调度执行,如果本地队列空了,则会去全局队列偷取一部分G,如果全局队列也是空的,则去其他的P中偷取一部分G,这也就是work stealing算法的基本原理。G并不是执行体,而是用于存放并发执行体的元信息,包括并发执行的入口函数、堆栈、上下文等信息。G由于保存的是元信息,为了减少队形的分配和回收,G对象是可以复用的,只需将相关元信息初始化为新值即可。M仅负责执行,M启动时进入运行时的管理代码,这段管理代码必须拿到可用的P后,才能执行调度。P的数目默认是CPU核心的数量,可以通过runtime.GOMAXPROCS函数设置或查询,M和P的数目差不多,但运行时会根据当前的状态动态地创建M,M有一个最大值上线,目前是10000;G和P是一种M:N的关系,M可以成千上万,远远大于N。

    golang中还有特殊的M和G,它们是m0和g0。m0是启动程序后的主线程,这个m对应的信息会存放在全局变量m0中,m0负责执行初始化操作和启动第一个g,之后m0就和其他的M一样了。每个M都会有一个自己的管理堆栈g0,g0不指向任何可执行的函数,g0仅在M执行管理和调度逻辑时使用。在调度或系统调用时会切换到g0的栈空间,全局变量的g0是m0的g0。

    go程序启动初始化过程如下:

    1. 分配和检查栈空间
    2. 初始化参数和环境变量
    3. 当前运行线程标记为m0,m0是程序启动的主线程
    4. 调用运行时初始化函数runtime.schedinit进行初始化。主要是初始化内存空间分配器、GC、生成空闲P列表
    5. 在m0上调度第一个G,这个G运行runtime.main函数

    runtime.main会拉起运行时的监控线程,然后调用main包的init()初始化函数,最后执行main函数。

    在程序启动过程中会初始化空闲P列表,P是在这个时候被创建的,同时第一个G也是在初始化过程中被创建的。后续在有go并发调用的地方都有可能创建G。由于G只是一个数据结构,并不是执行实体,所以G是可以被复用的。在需要G结构时,首先要去P的空闲G列表里面寻找已经运行结束的goroutine,其G会被缓存起来。

    每个并发调用都会初始化一个新的G任务,然后唤醒M执行任务。这个唤醒不是特定唤醒某个线程去工作,而是先尝试获取当前线程M。如果无法获取,则从全局调度的空闲M列表中获取可用的M;如果没有可用的,则新建M,然后绑定P和G再运行。所以M和P不是一一对应的,M是按需分配的,但是运行时会设置一个上限值(默认是10000),超出最大值将导致程序崩溃。注意:创建新的M有一个自己的栈m0,在没有执行并发程序的过程中,M一直是在g0栈上工作的。M一定要拿到P才能执行,G、M和P维护着绑定关系,M在自己的堆栈g0上运行恢复G上下文的逻辑。完成初始化后,M从g0栈切换到G的栈,并跳转到并发程序代码点开始执行。M线程里有管理调度和切换堆栈的逻辑,但是M必须拿到P后才能运行,可以看到M是自驱动的,但需要P的配合,这是一个好的设计。

    抢占调度的原因如下:

    1. 不让某个G长久地被系统调用阻塞,阻碍其他G运行
    2. 不让某个G一直占用某个M不释放
    3. 避免全局队列里面的G得不到执行

    抢占调度的策略如下:

    1. 在进入系统调用(syscall)前后,各封装一层代码检测G的状态,当检测到当前G已经被监控线程抢占调度,则M停止执行当前G,进行调度切换
    2. 监控线程经过一段时间检测感知到P运行超过一段时间,取消P和M的关联,这也是一种更高层次的调度
    3. 监控线程经过一段时间检测感知到G一直运行,超过了一定的时间,设置G标记,G执行栈扩展逻辑检测到抢占标记,根据相关条件决定是否抢占调度

    golang程序运行时是比较复杂的,涉及内存分配、垃圾回收、goroutine调度和通信管理等诸多方面。整个运行时的初始化过程也很繁琐、复杂。

  • 相关阅读:
    我这些年的项目管理心得...
    14条建议,使你的IT职业生涯更上一层楼
    手机通过WIFI连上ZXV10 H618B路由器但不能上网问题的解决
    优秀中层必备的十大能力
    IMX51启动模式
    VS2005工程由Pocket PC 2003 SDK转为WINCE6.0 SDK的问题
    VS2005工程增加SDK
    VS2005下开发PPC2003和WM50编译器一些设置
    CTO俱乐部下午茶:技术团队管理中的那些事儿
    Android通过JNI调用驱动程序(完全解析实例)
  • 原文地址:https://www.cnblogs.com/JHSeng/p/12214964.html
Copyright © 2011-2022 走看看