zoukankan      html  css  js  c++  java
  • Goroutine、Channel、WaitGroup、select

    Go语言中的并发程序可以用两种手段来实现,第一种是传统的并发模型,多线程共享内存,第二种则是现代的并发模型,顺序通信进程(CSP),Go语言使用goroutine和channel来支持顺序通信进程。


    一、Goroutine
    1. 在Go语言中,每一个并发的执行单元叫作一个goroutine。
    2. main goroutine:当一个程序启动时,其主函数即在一个单独的goroutine中运行,称为main goroutine。
    3. goroutine创建:新的goroutine使用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的goroutine中运行,而go语句本身会迅速地完成。

    4. 主函数返回时,所有的goroutine都会被直接打断,程序退出。


    二、Channel
    如果说goroutine是Go语言程序的并发体的话,那么channels就是它们之间的通信机制。
    通过channel,一个goroutine可以给另一个goroutine发送值信息。
    每个channel能发送的值类型只有一种。
    1. channel创建

    channel是引用类型变量。
    2. channel比较
    两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相同的对象,那么比较的结果为真。一个channel也可以和nil进行比较。
    3. channel数据发送与接收

    一个不使用接收结果的接收操作也是合法的。
    4. channel关闭

    基于已经关闭的channel的任何发送操作都将导致panic异常。
    基于已经关闭的channel执行接收操作依然可以接收到之前已经发送成功的数据,如果channel中已经没有数据的话,后续接收操作将不再阻塞,而是立即返回一个零值。
    5. 不带缓存的channel
    一个基于无缓存channel的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的channel上执行接收操作,当发送的值通过channel成功传输以后,两个goroutine才能继续执行后面的语句。反之,如果接收操作先发生,那么接收者goroutine也将阻塞,直至有另一个goroutine在相同的channel上执行发送操作。
    基于无缓存channel的发送和接收操作将导致两个goroutine做一次同步操作,因为这个原因,无缓存channel有时也被称为同步channel。
    注:
    有些消息事件并不携带额外的信息,它们仅仅是用作两个goroutine之间的同步,这时候可以使用struct{}空结构体作为Channels元素的类型。
    6. 串联的channels(pipeline)
    channels也可以用于将多个goroutine链接在一起,一个channel的输出作为下一个channel的输入。这种串联的channels就是所谓的管道(pipeline)。
    7. 单方向的channel
    chan<- int:只发送int的channel,只能发送不能接收
    <-chan int:只接收int的channel,只能接收不能发送
    这种限制将在编译期检测。
    注:
    因为关闭操作只用于断言不再向channel发送新的数据,所以只有在发送者所在的goroutine才会调用close函数,因此对一个只接收的channel调用close将是一个编译错误。
    任何双向channel向单向channel变量的赋值都将导致隐式转换,从双向channel转换为单向channel。不存在反向转换的语法。
    8. 带缓存的channel
    带缓存的channel内部持有一个元素队列。

    channel内部缓存的容量:cap(ch)
    channel内部缓存的长度:len(ch)
    基于带缓存channel的发送操作就是向其内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到另一个goroutine执行接收操作而释放了新的队列空间。相反,如果channel是空的,接收操作将阻塞直到有另一个goroutine执行发送操作而向队列插入元素。
    channel的缓存队列解耦了接收和发送的goroutine。


    三、并发的循环
    1. goroutine泄露
    某个channel的内部缓存队列满了,但是没有goroutine向这个channel执行接收操作,从而导致向这个channel发送值的所有goroutine都永远阻塞下去,并且永远都不会退出。这种情况,称为goroutine泄露,这将是一个bug。和垃圾变量不同,泄露的goroutine并不会被自动回收,因此确保每个不再需要的goroutine都能正常退出是很重要的。
    2. sync.WaitGroup
    有时候,我们需要多个goroutine都运行结束以后做一些事情,比如关闭一个channel。Go语言提供了一种特殊的计数器,用于检测多个goroutine是否都已运行结束。它在每一个goroutine启动时自增1,在每一个goroutine退出时自减1,且会一直等待直至计数器自减为0,即表示所有goroutine都已运行结束。
    使用示例:

    func makeThumbnails(filenames <-chan string) int64 {
    	sizes := make(chan int64)
    	var	wg sync.WaitGroup//number of working goroutines
    	for	f := range filenames {
    		wg.Add(1)//必需在worker goroutines开始之前调用,才能保证Add()是在closer goroutine调用Wait()之前被调用
    		//worker goroutines
    		go func(f string) {
    			defer wg.Done()//使用defer来确保计数器即使是在程序出错的情况下依然能够正确地自减
    			thumb, err := thumbnail.ImageFile(f)
    			if err != nil {
    				log.Println(err)
    				return
    			}
    			info, _ := os.Stat(thumb)
    			sizes <- info.Size()
    		}(f)
    	}
    	//closer goroutine
    	go func() {
    		wg.Wait()
    		close(sizes)
    	}()
    	var	total int64
    	for	size := range sizes {
    		total += size
    	}
    	return total
    }
    

    四、基于select的多路复用
    有时候我们需要等待多个channel的其中一个返回事件,但是我们又无法做到从每一个channel中接收信息。假如我们试图从其中一个channel中接收信息,而这个channel又没有返回信息,那么程序会立刻被阻塞,从而无法收到其他channel返回的信息。这时,基于select的多路复用就派上用场了。

    select会一直等待直至有能够执行的case分支才去执行这个case分支。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去。
    如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。
    channel的零值是nil,对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。

    五、消息广播
    有时候,我们需要将某个消息广播通知所有运行中的goroutine,但是我们又无法知道goroutine的数量,这时候可以使用这种策略:创建一个空的channel,将从这个channel中接收信息的操作加入基于select的多路复用,由于这时channel是空的,所有的goroutine都无法从中接收到值;当我们需要广播消息的时候,关闭这个channel,这样所有的goroutine都能从中接收到零值,以此表示该消息已传达。(一个channel只能用于表达一种消息)

    六、一个综合运用的实例程序——并发的字典遍历

    /**
    *广度遍历目录文件,计算文件数量与大小
    *每隔0.5秒输出已计算的文件数量与大小
    *当标准输入有值时,中断程序运行
    **/
    package main
    
    import (
    	"flag"
    	"fmt"
    	"io/ioutil"
    	"os"
    	"path/filepath"
    	"sync"
    	"time"
    )
    
    //广度遍历目录文件,计算文件数量与大小
    func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    	defer n.Done()
    	if cancelled() {
    		return
    	}
    	for _, entry := range dirents(dir) {
    		if entry.IsDir() {
    			n.Add(1)
    			subdir := filepath.Join(dir, entry.Name())
    			go walkDir(subdir, n, fileSizes)
    		} else {
    			fileSizes <- entry.Size()
    		}
    	}
    }
    
    /*
    *目的:控制同一时刻打开的文件数量,防止程序占用太多资源
    *实现:创建一个带缓存的channel,通过channel的缓存队列大小控制同一时刻打开的文件数量。
    *读取文件之前向该channel发送一个信号量,读取完以后向该channel接收一个信号量,若是channel的缓存
    *队列满了,则会阻塞,无法继续打开文件。
     */
    var sema = make(chan struct{}, 2)
    
    func dirents(dir string) []os.FileInfo {
    	select {
    	case sema <- struct{}{}:
    	case <-done:
    		return nil
    	}
    	defer func() { <-sema }()
    	entries, err := ioutil.ReadDir(dir)
    	if err != nil {
    		fmt.Fprint(os.Stderr, "du1: %
    ", err)
    		return nil
    	}
    	return entries
    }
    
    /*
    *目的:读取标准输入,只要有值则停止运行程序
    *实现:创建一个空channel,每个goroutine读取文件之前都先从channel中接收值,如果阻塞,表示程序可以继续运行,
    *当标准输入有值时,关闭该channel,所有未读取文件的goroutine从channel中接收到零值,中断程序运行。
     */
    var done = make(chan struct{})
    
    func cancelled() bool {
    	select {
    	case <-done:
    		return true
    	default:
    		return false
    	}
    }
    
    func main() {
    	//读取标准输入,只要有输入则停止程序
    	go func() {
    		os.Stdin.Read(make([]byte, 1))
    		close(done)
    	}()
    
    	flag.Parse()
    	roots := flag.Args()
    	if len(roots) == 0 {
    		roots = []string{"."}
    	}
    	//用于发送文件大小的channel
    	fileSizes := make(chan int64)
    
    	/*
    	*目的:当读取文件大小的所有goroutine都结束运行时,关闭用于发送文件大小的channel
    	*实现:使用特殊计数器sync.WaitGroup,每次新增一个goroutine则自增1,每次结束一个goroutine
    	*则自减1,当其自减为0时,表示所有goroutine都已结束运行,可以关闭channel了
    	 */
    	var n sync.WaitGroup
    	for _, root := range roots {
    		n.Add(1)
    		go walkDir(root, &n, fileSizes)
    	}
    	go func() {
    		n.Wait()
    		close(fileSizes)
    	}()
    
    	/*
    	*目的:每隔0.1秒输出文件数量与大小计算的结果
    	*实现:使用time.Tick函数。time.Tick函数返回一个channel,程序会周期性地像一个节拍器一样向这个channel发送事件,
    	*每一个事件的值是一个时间戳。当从该channel中接收到值的时候,就可以打印了。
    	 */
    	var tick <-chan time.Time
    	tick = time.Tick(100 * time.Millisecond)
    
    	var nfiles, nbytes int64
    loop:
    	for {
    		select {
    		case <-done:
    			/*
    			*程序结束,需要先吧fileSizes这个channel中的内容排空,以保证对walkDir的调用不会被向fileSizes发送信息阻塞住,
    			*可以正确地完成,防止goroutine泄露。
    			 */
    			for range fileSizes {
    			}
    		case size, ok := <-fileSizes:
    			if !ok { //fileSizes这个channel已关闭,退出循环loop
    				break loop
    			}
    			nfiles++
    			nbytes += size
    		case <-tick:
    			printDiskUsage(nfiles, nbytes)
    		}
    	}
    }
    
    //打印已经计算的文件数量与大小
    func printDiskUsage(nfiles, nbytes int64) {
    	fmt.Printf("%d files  %.1f GB
    ", nfiles, float64(nbytes)/1e9)
    }
    
  • 相关阅读:
    npm registry
    JS函数addEventListener的浏览器差异性封装
    C# WinForm 异步执行耗时操作并将过程显示在界面中
    在server 2008/2003中 取消对网站的安全检查/去除添加信任网站
    SQL语句中将Datetime类型转换为字符串类型
    未在本地计算机上注册 Microsoft.Jet.OLEDB.4.0 提供程序
    当应用程序不是以 UserInteractive 模式运行时显示模式对话框或窗体是无效操作
    TFS2012常见问题及解答
    笔记《Hbase 权威指南》
    读Java 804
  • 原文地址:https://www.cnblogs.com/wujuntian/p/11241171.html
Copyright © 2011-2022 走看看