zoukankan      html  css  js  c++  java
  • Go语言之并发

    Go语言直接支持内置支持并发。当一个函数创建为goroutine时,Go会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行。

    Go语言运行时的调度器是一个复杂的软件,这个调度器在操作系统之上。操作系统的线程与语言运行时的逻辑处理器绑定,并在逻辑处理器上运行goroutine。

    Go语言的并发同步逻辑来自一个叫做通信顺讯进程(CSP)的范型。CSP是一种消息传递模型,通过在goroutine之间传递数据来传递消息,而不是通过对数据进行加锁来实现同步访问。这种数据的类型叫做通道(channel) 。

    并发与并行

    在操作系统中,一个应用程序就可以看作一个进程,而每个进程至少包含一个线程。每个进程的初始线程被称为主线程。

    操作系统会在物理处理器(CPU)上调度线程来运行,而Go语言会在逻辑处理器来调度goroutine来运行。1.5版本之上,Go语言的运行时默认会为每个可用的物理处理器分配一个逻辑处理器。1.5之前,默认给整个应用程序只分配一个逻辑处理器。

    如下图,在运行时把goroutine调度到逻辑处理器上运行,逻辑处理器绑定到唯一的操作系统线程。

    表

    当goroutine执行了一个阻塞的系统调用(就是一个非纯CPU的任务)时,调度器会将这个线程与处理器分离,并创建一个新线程来运行这个处理器上提供的服务。

    表2

    语言运行默认限制每个程序最多创建10000个线程。

    注意并发≠并行!并行需要至少2个逻辑处理器。

    goroutine

    以并发的形式分别显示大写和小写的英文字母

      1: package main
    
      2: 
    
      3: import (
    
      4: 	"fmt"
    
      5: 	"runtime"
    
      6: 	"sync"
    
      7: )
    
      8: 
    
      9: func main() {
    
     10: 	// 分配一个逻辑处理器给调度器使用
    
     11: 	runtime.GOMAXPROCS(1)
    
     12: 	// wg用来等待程序完成
    
     13: 	var wg sync.WaitGroup
    
     14: 	// 计数器加2,表示要等待两个goroutine
    
     15: 	wg.Add(2)
    
     16: 	fmt.Println("Start!")
    
     17: 	// 声明一个匿名函数,并创建一个goroutime
    
     18: 	go func() {
    
     19: 		// 通知main函数工作已经完成
    
     20: 		defer wg.Done()
    
     21: 		// 显示字母表3次
    
     22: 		for count:=0; count<3;count++ {
    
     23: 			for char:='a';char<'a'+26;char++ {
    
     24: 				fmt.Printf("%c ", char)
    
     25: 			}
    
     26: 		}
    
     27: 	}()
    
     28: 	// 同上
    
     29: 	go func() {
    
     30: 		// 通知main函数工作已经完成
    
     31: 		defer wg.Done()
    
     32: 		// 显示字母表3次
    
     33: 		for count:=0; count<3;count++ {
    
     34: 			for char:='A';char<'A'+26;char++ {
    
     35: 				fmt.Printf("%c ", char)
    
     36: 			}
    
     37: 		}
    
     38: 	}()
    
     39: 	// 等待goroutine结束
    
     40: 	fmt.Println("Waiting!")
    
     41: 	wg.Wait()
    
     42: 	fmt.Println("
    Finish!")
    
     43: }

    运行结果后,可以看到先输出的是所有的大写字母,最后才是小写字母。是因为第一个goroutine完成所有显示需要花时间太短了,以至于在调度器切换到第二个goroutine之前,就完成了所有任务。

    调度器为了防止某个goroutine长时间占用逻辑处理器,会停止当前正运行的goroutine,运行其他可运行的goroutine运行的机会。

    创建两个相同的长时间才能完成其工作的goroutine就可以看到,比如说显示5000以内的素数值。

    代码结构如下

      1: go printPrime("A")
    
      2: go printPrime("B")
    
      3: 
    
      4: func printPrime(prefix string) {
    
      5: 	...
    
      6: }

    结果类似

      1: B:2
    
      2: B:3
    
      3: ...
    
      4: B:4591
    
      5: A:3
    
      6: A:5
    
      7: ...
    
      8: A:4561
    
      9: A:4567
    
     10: B:4603
    
     11: B:4621
    
     12: ...
    
     13: // Completed B
    
     14: A:4457
    
     15: ...
    
     16: // Completed A

    如何修改逻辑处理器的数量

      1: runtime.GOMAXPROCS(runtime.NUMCPU())

    稍微改动下上面的代码,结果就会大不同

      1: package main
    
      2: 
    
      3: import (
    
      4: "fmt"
    
      5: "runtime"
    
      6: "sync"
    
      7: )
    
      8: 
    
      9: func main() {
    
     10: 	// 分配两个逻辑处理器给调度器使用
    
     11: 	runtime.GOMAXPROCS(2)
    
     12: 	// wg用来等待程序完成
    
     13: 	var wg sync.WaitGroup
    
     14: 	// 计数器加2,表示要等待两个goroutine
    
     15: 	wg.Add(2)
    
     16: 	fmt.Println("Start!")
    
     17: 	// 声明一个匿名函数,并创建一个goroutime
    
     18: 	go func() {
    
     19: 		// 通知main函数工作已经完成
    
     20: 		defer wg.Done()
    
     21: 		// 显示字母表3次
    
     22: 		for count:=0; count<10;count++ {
    
     23: 			for char:='a';char<'a'+26;char++ {
    
     24: 				fmt.Printf("%c ", char)
    
     25: 			}
    
     26: 		}
    
     27: 	}()
    
     28: 	// 同上
    
     29: 	go func() {
    
     30: 		// 通知main函数工作已经完成
    
     31: 		defer wg.Done()
    
     32: 		// 显示字母表3次
    
     33: 		for count:=0; count<10;count++ {
    
     34: 			for char:='A';char<'A'+26;char++ {
    
     35: 				fmt.Printf("%c ", char)
    
     36: 			}
    
     37: 		}
    
     38: 	}()
    
     39: 	// 等待goroutine结束
    
     40: 	fmt.Println("Waiting!")
    
     41: 	wg.Wait()
    
     42: 	fmt.Println("
    Finish!")
    
     43: }

    结果类似下面的(根据CPU单核的性能结果可能结果稍微不一样)

      1: Start!
    
      2: Waiting!
    
      3: a b c d e f g h i j k l m n o A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g
    
      4: h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
    
      5: a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s
    
      6: t u v w x y z M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X
    
      7: Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q
    
      8: R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
    
      9: Finish!
    可以发现,goroutine是并行运行的。

    只有在有多个逻辑处理器且可以同时让每个goroutine运行在一个可用的物理处理器上的时候,goroutine才会并行运行。

    竞争状态

    如果两个或者多个goroutine在没有互相同步的情况下,访问某个共享的资源,并且试图同时读和写这个资源,就处于相互竞争的状态。

    在竞争状态,每个goroutine都会覆盖另一个goroutine的工作。这种覆盖发生在goroutine发生切换的时候。

    每个goroutien都会创造自己的共享变量副本。当切换到领另一个goroutine时,如果这个变量的值在上一个goroutine发生改变,这个goroutine再次运行时,虽然变量的值改变了,但是由于这个goroutine没有更新自己的那个副本的值,而是继续使用,并且将其存回变量的值,从而覆盖上一个goroutine 的工作。

    go build –race用来竞争检测器标志来编译程序

    锁住共享资源

    原子函数

    原子函数能够以底层的枷锁机制来同步访问整型变量和指针。省略部分代码如下:

      1: var counter int64
    
      2: go incCounter(1)
    
      3: go incCounter(2)
    
      4: func incCounter(id int) {
    
      5: 	for count:=0;count<2;count++{
    
      6: 		//安全地对counter加1
    
      7: 		atomic.AddInt64(&counter, 1)
    
      8: 		//当前goroutine从线程退出,并放回队列
    
      9: 		runtime.Gosched()
    
     10: 	}
    
     11: }

    使用atmoic包的AddInt64函数。这些goroutine都会自动根据所引用的变量做同步处理。

    另外两个原子函数是LoadInt64和StoreInt64。用法如下:

      1: // shutdown是通知正在执行的goroutine停止工作的标志
    
      2: var shutdown int64
    
      3: var wg sync.WaitGroup
    
      4: // 该停止工作了,安全地设置shutdown标志
    
      5: atomic.StoreInt64(&shutdown, 1)
    
      6: // 等待goroutine结束
    
      7: wg.Wait()
    
      8: // 检测是否停止工作,如果shutdown==1那么goroutine就会终止
    
      9: if atomic.LoadInt64(&shutdown) == 1 {
    
     10: 	break
    
     11: }
    
     12: 

    互斥锁

    另一种同步访问共享资源的方式是互斥锁。主要代码如下:

      1: var (
    
      2: 	// counter是所有goroutine都要增加其值的变量
    
      3: 	counter int
    
      4: 	wg sync.WaitGroup
    
      5: 	// mutex用来定义一段代码临界区
    
      6: 	mutex sync.Mutex
    
      7: )
    
      8: func main...
    
      9: // 业务代码
    
     10: func incCounter(id int) {
    
     11: 	defer wg.Done()
    
     12: 	for i:=0;i<2;i++ {
    
     13: 		//同一时期只允许一个goroutine进入
    
     14: 		mutex.Lock()
    
     15: 		//大括号并不是必须的
    
     16: 		{
    
     17: 			//捕获counter的值
    
     18: 			value := counter
    
     19: 			//当前goroutine从线程退出,并返回到队列
    
     20: 			runtime.Gosched()
    
     21: 			//增加本地value变量的值
    
     22: 			value++
    
     23: 			//将该值保存回counter
    
     24: 			counter = value
    
     25: 		}
    
     26: 		// 释放锁,允许其他正在等待的goroutine
    
     27: 		mutex.Unlock()
    
     28: 	}
    
     29: }

    通道

    通道在goroutine之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。

    可以通过通道共享内置类型,命名类型,结构类型和引用类型的值或者指针。

    go语言需要使用make来创建一个通道,chan是关键字:

      1: // 无缓冲的整型通道
    
      2: unbuffered := make(chan int)
    
      3: // 有缓冲的字符串通道
    
      4: buffered := make(chan string, 10)
    向通道发送值
      1: buffered := make(chan string, 10)
    
      2: // 通过通道发送一个字符串
    
      3: buffered <- "Gopher"
    
      4: // 从通道接收一个字符串
    
      5: value := <-buffered

    无缓冲的通道是指在接收前没有能力保存任何值的通道。发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。如果没有准备好,通道会导致goroutine阻塞等待。所以无缓冲通道保证了goroutine之间同一时间进行数据交换。

      1: // 四个goroutine间的接力比赛
    
      2: package main
    
      3: 
    
      4: import (
    
      5: 	"fmt"
    
      6: 	"sync"
    
      7: 	"time"
    
      8: )
    
      9: 
    
     10: var wg sync.WaitGroup
    
     11: 
    
     12: func main()  {
    
     13: 	//创建一个无缓冲的通道
    
     14: 	baton := make(chan int)
    
     15: 	wg.Add(1)
    
     16: 	// 第一步跑步者持有接力棒
    
     17: 	go Runner(baton)
    
     18: 	// 开始比赛
    
     19: 	baton <- 1
    
     20: 	// 等待比赛结束
    
     21: 	wg.Wait()
    
     22: }
    
     23: 
    
     24: // Ruuner模拟接力比赛中的一位跑步者
    
     25: func Runner(baton chan int) {
    
     26: 	var newRunner int
    
     27: 	// 等待接力棒
    
     28: 	runner := <-baton
    
     29: 	// 开始跑步
    
     30: 	fmt.Printf("运动员%d带着Baton跑
    ", runner)
    
     31: 	// 创建下一步跑步者
    
     32: 	if runner != 4{
    
     33: 		newRunner = runner + 1
    
     34: 		fmt.Printf("运动员%d上线
    ", newRunner)
    
     35: 		go Runner(baton)
    
     36: 	}
    
     37: 	// 围绕跑到跑
    
     38: 	time.Sleep(100 * time.Millisecond)
    
     39: 	// 比赛结束了吗?
    
     40: 	if runner == 4{
    
     41: 		fmt.Printf("运动员%d完成,比赛结束
    ", runner)
    
     42: 		wg.Done()
    
     43: 		return
    
     44: 	}
    
     45: 	// 将接力棒交给下一位跑步者
    
     46: 	fmt.Printf("运动员%d与运动员%d交换
    ", runner, newRunner)
    
     47: 	baton <- newRunner
    
     48: }

    结果

      1: 运动员1带着Baton跑
    
      2: 运动员2上线
    
      3: 运动员1与运动员2交换
    
      4: 运动员2带着Baton跑
    
      5: 运动员3上线
    
      6: 运动员2与运动员3交换
    
      7: 运动员3带着Baton跑
    
      8: 运动员4上线
    
      9: 运动员3与运动员4交换
    
     10: 运动员4带着Baton跑
    
     11: 运动员4完成,比赛结束

    有缓冲的通道则能在接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求goroutine之间必须同时完成发送和接收。只有在通道没有可用缓冲区或者没有要接收的值时,发送或者接收才会阻塞。

      1: package main
    
      2: 
    
      3: import (
    
      4: 	"fmt"
    
      5: 	"math/rand"
    
      6: 	"sync"
    
      7: 	"time"
    
      8: )
    
      9: 
    
     10: const (
    
     11: 	// goroutine的数量
    
     12: 	numberGoroutines = 4
    
     13: 	// 工作的数量
    
     14: 	taskLoad = 10
    
     15: )
    
     16: 
    
     17: var wg sync.WaitGroup
    
     18: 
    
     19: // 初始化随机数种子
    
     20: func init() {
    
     21: 	rand.Seed(time.Now().Unix())
    
     22: }
    
     23: func main() {
    
     24: 	// 创建一个有缓冲的通道来管理工作
    
     25: 	tasks := make(chan string, taskLoad)
    
     26: 	wg.Add(numberGoroutines)
    
     27: 	// 增加一组要完成的工作
    
     28: 	for post:=1;post<taskLoad;post++ {
    
     29: 		tasks <- fmt.Sprintf("Task:%d", post)
    
     30: 	}
    
     31: 	// 启动goroutine来处理工作
    
     32: 	for i:=1;i<numberGoroutines+1;i++ {
    
     33: 		go worker(tasks, i)
    
     34: 	}
    
     35: 	// 所有工作处理完时关闭通道
    
     36: 	close(tasks)
    
     37: 
    
     38: 	wg.Wait()
    
     39: 	fmt.Printf("all finished!")
    
     40: 
    
     41: }
    
     42: 
    
     43: func worker(tasks chan string, worker_id int) {
    
     44: 	defer wg.Done()
    
     45: 
    
     46: 	for {
    
     47: 		//等待分配工作
    
     48: 		task, ok := <-tasks
    
     49: 		if !ok {
    
     50: 			//通道变空
    
     51: 			fmt.Printf("Worker%d shut down
    ", worker_id)
    
     52: 			return
    
     53: 		}
    
     54: 		// 开始工作
    
     55: 		fmt.Printf("Worker%d start %s
    ", worker_id, task)
    
     56: 
    
     57: 		// 随机等待一段时间
    
     58: 		sleep := rand.Int63n(100)
    
     59: 		time.Sleep(time.Duration(sleep)*time.Millisecond)
    
     60: 		// 显示完成了工作
    
     61: 		fmt.Printf("Worker%d Completed %s
    ", worker_id, task)
    
     62: 	}
    
     63: }
    输出结果:
      1: Worker4 start Task:1
    
      2: Worker1 start Task:2
    
      3: Worker2 start Task:3
    
      4: Worker3 start Task:4
    
      5: Worker3 Completed Task:4
    
      6: Worker3 start Task:5
    
      7: Worker4 Completed Task:1
    
      8: Worker4 start Task:6
    
      9: Worker2 Completed Task:3
    
     10: Worker2 start Task:7
    
     11: Worker3 Completed Task:5
    
     12: Worker3 start Task:8
    
     13: Worker2 Completed Task:7
    
     14: Worker2 start Task:9
    
     15: Worker3 Completed Task:8
    
     16: Worker3 shut down
    
     17: Worker4 Completed Task:6
    
     18: Worker4 shut down
    
     19: Worker1 Completed Task:2
    
     20: Worker1 shut down
    
     21: Worker2 Completed Task:9
    
     22: Worker2 shut down
    
     23: all finished!

    由于程序和Go语言的调度器有随机的成分,结果每次都会不一样。不过总流程不会大变。

    当通道关闭后,goroutine依旧从通道里的缓冲区获取数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,兵返回一个通道类型的零值。

    关于实际工程里的并发模式,下一篇再讲。

  • 相关阅读:
    jQuery Validate 表单验证
    在同一个页面使用多个不同的jQuery版本,让它们并存而不冲突
    移动端手势库hammerJS 2.0.4官方文档翻译
    Linux下查看nginx安装目录
    Mac Mini中添加VNC访问
    CSS3属性选择通配符
    【LeetCode】9 Palindrome Number 回文数判定
    【LeetCode】8. String to Integer (atoi) 字符串转整数
    【LeetCode】7. Reverse Integer 整型数反转
    【LeetCode】6. ZigZag Conversion 锯齿形转换
  • 原文地址:https://www.cnblogs.com/haoqirui/p/10269476.html
Copyright © 2011-2022 走看看