Go语言中的并发程序可以用两种手段来实现,第一种是传统的并发模型,多线程共享内存,第二种则是现代的并发模型,顺序通信进程(CSP),Go语言使用goroutine和channel来支持顺序通信进程。
一、Goroutine
1. 在Go语言中,每一个并发的执行单元叫作一个goroutine。
2. main goroutine:当一个程序启动时,其主函数即在一个单独的goroutine中运行,称为main goroutine。
3. goroutine创建:新的goroutine使用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的goroutine中运行,而go语句本身会迅速地完成。
4. 主函数返回时,所有的goroutine都会被直接打断,程序退出。
二、Channel
如果说goroutine是Go语言程序的并发体的话,那么channels就是它们之间的通信机制。
通过channel,一个goroutine可以给另一个goroutine发送值信息。
每个channel能发送的值类型只有一种。
1. channel创建
channel是引用类型变量。
2. channel比较
两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相同的对象,那么比较的结果为真。一个channel也可以和nil进行比较。
3. channel数据发送与接收
一个不使用接收结果的接收操作也是合法的。
4. channel关闭
基于已经关闭的channel的任何发送操作都将导致panic异常。
基于已经关闭的channel执行接收操作依然可以接收到之前已经发送成功的数据,如果channel中已经没有数据的话,后续接收操作将不再阻塞,而是立即返回一个零值。
5. 不带缓存的channel
一个基于无缓存channel的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的channel上执行接收操作,当发送的值通过channel成功传输以后,两个goroutine才能继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直至有另一个goroutine在相同的channel上执行发送操作。
基于无缓存channel的发送和接收操作将导致两个goroutine做一次同步操作,因为这个原因,无缓存channel有时也被称为同步channel。
注:
有些消息事件并不携带额外的信息,它们仅仅是用作两个goroutine之间的同步,这时候可以使用struct{}空结构体作为Channels元素的类型。
6. 串联的channels(pipeline)
channels也可以用于将多个goroutine链接在一起,一个channel的输出作为下一个channel的输入。这种串联的channels就是所谓的管道(pipeline)。
7. 单方向的channel
chan<- int:只发送int的channel,只能发送不能接收
<-chan int:只接收int的channel,只能接收不能发送
这种限制将在编译期检测。
注:
因为关闭操作只用于断言不再向channel发送新的数据,所以只有在发送者所在的goroutine才会调用close函数,因此对一个只接收的channel调用close将是一个编译错误。
任何双向channel向单向channel变量的赋值都将导致隐式转换,从双向channel转换为单向channel。不存在反向转换的语法。
8. 带缓存的channel
带缓存的channel内部持有一个元素队列。
channel内部缓存的容量:cap(ch)
channel内部缓存的长度:len(ch)
基于带缓存channel的发送操作就是向其内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到另一个goroutine执行接收操作而释放了新的队列空间。相反,如果channel是空的,接收操作将阻塞直到有另一个goroutine执行发送操作而向队列插入元素。
channel的缓存队列解耦了接收和发送的goroutine。
三、并发的循环
1. goroutine泄露
某个channel的内部缓存队列满了,但是没有goroutine向这个channel执行接收操作,从而导致向这个channel发送值的所有goroutine都永远阻塞下去,并且永远都不会退出。这种情况,称为goroutine泄露,这将是一个bug。和垃圾变量不同,泄露的goroutine并不会被自动回收,因此确保每个不再需要的goroutine都能正常退出是很重要的。
2. sync.WaitGroup
有时候,我们需要多个goroutine都运行结束以后做一些事情,比如关闭一个channel。Go语言提供了一种特殊的计数器,用于检测多个goroutine是否都已运行结束。它在每一个goroutine启动时自增1,在每一个goroutine退出时自减1,且会一直等待直至计数器自减为0,即表示所有goroutine都已运行结束。
使用示例:
func makeThumbnails(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup//number of working goroutines
for f := range filenames {
wg.Add(1)//必需在worker goroutines开始之前调用,才能保证Add()是在closer goroutine调用Wait()之前被调用
//worker goroutines
go func(f string) {
defer wg.Done()//使用defer来确保计数器即使是在程序出错的情况下依然能够正确地自减
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb)
sizes <- info.Size()
}(f)
}
//closer goroutine
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}
四、基于select的多路复用
有时候我们需要等待多个channel的其中一个返回事件,但是我们又无法做到从每一个channel中接收信息。假如我们试图从其中一个channel中接收信息,而这个channel又没有返回信息,那么程序会立刻被阻塞,从而无法收到其他channel返回的信息。这时,基于select的多路复用就派上用场了。
select会一直等待直至有能够执行的case分支才去执行这个case分支。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去。
如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。
channel的零值是nil,对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。
五、消息广播
有时候,我们需要将某个消息广播通知所有运行中的goroutine,但是我们又无法知道goroutine的数量,这时候可以使用这种策略:创建一个空的channel,将从这个channel中接收信息的操作加入基于select的多路复用,由于这时channel是空的,所有的goroutine都无法从中接收到值;当我们需要广播消息的时候,关闭这个channel,这样所有的goroutine都能从中接收到零值,以此表示该消息已传达。(一个channel只能用于表达一种消息)
六、一个综合运用的实例程序——并发的字典遍历
/**
*广度遍历目录文件,计算文件数量与大小
*每隔0.5秒输出已计算的文件数量与大小
*当标准输入有值时,中断程序运行
**/
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
)
//广度遍历目录文件,计算文件数量与大小
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
/*
*目的:控制同一时刻打开的文件数量,防止程序占用太多资源
*实现:创建一个带缓存的channel,通过channel的缓存队列大小控制同一时刻打开的文件数量。
*读取文件之前向该channel发送一个信号量,读取完以后向该channel接收一个信号量,若是channel的缓存
*队列满了,则会阻塞,无法继续打开文件。
*/
var sema = make(chan struct{}, 2)
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}:
case <-done:
return nil
}
defer func() { <-sema }()
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprint(os.Stderr, "du1: %
", err)
return nil
}
return entries
}
/*
*目的:读取标准输入,只要有值则停止运行程序
*实现:创建一个空channel,每个goroutine读取文件之前都先从channel中接收值,如果阻塞,表示程序可以继续运行,
*当标准输入有值时,关闭该channel,所有未读取文件的goroutine从channel中接收到零值,中断程序运行。
*/
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
func main() {
//读取标准输入,只要有输入则停止程序
go func() {
os.Stdin.Read(make([]byte, 1))
close(done)
}()
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
//用于发送文件大小的channel
fileSizes := make(chan int64)
/*
*目的:当读取文件大小的所有goroutine都结束运行时,关闭用于发送文件大小的channel
*实现:使用特殊计数器sync.WaitGroup,每次新增一个goroutine则自增1,每次结束一个goroutine
*则自减1,当其自减为0时,表示所有goroutine都已结束运行,可以关闭channel了
*/
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
/*
*目的:每隔0.1秒输出文件数量与大小计算的结果
*实现:使用time.Tick函数。time.Tick函数返回一个channel,程序会周期性地像一个节拍器一样向这个channel发送事件,
*每一个事件的值是一个时间戳。当从该channel中接收到值的时候,就可以打印了。
*/
var tick <-chan time.Time
tick = time.Tick(100 * time.Millisecond)
var nfiles, nbytes int64
loop:
for {
select {
case <-done:
/*
*程序结束,需要先吧fileSizes这个channel中的内容排空,以保证对walkDir的调用不会被向fileSizes发送信息阻塞住,
*可以正确地完成,防止goroutine泄露。
*/
for range fileSizes {
}
case size, ok := <-fileSizes:
if !ok { //fileSizes这个channel已关闭,退出循环loop
break loop
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
}
//打印已经计算的文件数量与大小
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB
", nfiles, float64(nbytes)/1e9)
}