zoukankan      html  css  js  c++  java
  • golang学习笔记 ---并发

    预备知识
    1.1 进程、线程、协程

    进程(Process):在内存中的程序。有自己独立的独占的虚拟 CPU 、虚拟的 Memory、虚拟的 IO devices。  

    OS 直接支持并调度。进程之间只能通过系统提供的 IO 机制通讯。共享内存(变量)是不可能的!

    (1) 每一进程占用独立的地址空间。

    此处的地址空间包括代码、数据及其他资源。
    (2) 进程间的通信开销较大且受到许多限制。

    对象(或函数)接口、通信协议、…
    (3) 进程间的切换开销也较大。

    又称Context Switch。
    上下文包括代码、数据、堆栈、处理器状态、资源、…

    线程(Thread):轻量级进程。在现代操作系统中,是进程中程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。
    
    一个进程由若干线程组成,它们共享进程的计算、存储、IO资源。因此,程序员必须使用系统提供的同步、消息机制,处理资源的竞争和消息的通讯。
    

      

    (1) 多个线程共享进程的地址空间(代码、数据、其他资源等)。

    线程也需要自己的资源,如程序计数器、寄存器组、调用栈等。
    (2) 线程间的通信开销较少且比较简单。

    因为共享而减少了需要通信的内容。
    但也因为充分共享而无法对共享资源进行保护。
    (3) 线程间的切换开销也较小。

    只需保存每一线程的程序计数器、寄存器组、堆栈等空间。
    不必切换或复制整个地址空间,从而成本大为降低(约1/10)
    线程有分为两大类:

    操作系统管理的线程(Core Thread),通常根据 CPU 资源决定线程的数量,一般为 CPU 数量的两倍。
    语言提供的线程库管理的线程(User Thread),它执行时映射到系统线程,按任务类型(计算密集型,IO密集型)决定线程池的管理方式与数量。

    协程(coroutine/fiber):轻量级线程。 是可以并发执行的函数,由编译或用户指定位置将控制权交给协程调度程序执行的方式。它是非抢占式的,可以避免反复系统调用,还有进程切换造成的开销,给你上几千个逻辑流,也称用户级别线程。
    

     

    并行和并发

    并行(parallel):指在同一时刻,有多条指令在多个处理器上同时执行。
    
    并发(concurrency):指在同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行,使得在宏观上具有多个进程同时执行的效果,但在微观上并不是同时执行的,只是把时间分成若干段,使多个进程快速交替的执行。

    并行是两个队列同时使用两台咖啡机,并发是两个队列交替使用一台咖啡机

    Go语言并发优势

    有人把Go比作21世纪的C语言,第一是因为Go语言设计简单,第二,21世纪最重要的就是并行程序设计,而Go从语言层面就支持了并行。同时,并发程序的内存管理有时候是非常复杂的,而Go语言提供了自动垃圾回收机制。Go语言中有个概念叫做goroutine, 这类似我们熟知的线程,但是更轻。一般情况下,一个普通的桌面计算机跑十几二十个线程就有点负载过大了,但是同样这台机器却可以轻松地让成百上千甚至过万个goroutine进行资源竞争。

    goroutine是什么

    goroutine是Go并行设计的核心。goroutine说到底其实就是协程,但是它比线程更小,十几个goroutine可能体现在底层就是五六个线程,Go语言内部帮你实现了这些goroutine之间的内存共享。执行goroutine只需极少的栈内存(大概是4~5KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine比thread更易用、更高效、更轻便。 

    创建goroutine
    只需在函数调⽤语句前添加 go 关键字,就可创建并发执⾏单元。开发⼈员无需了解任何执⾏细节,调度器会自动将其安排到合适的系统线程上执行。在并发编程里,我们通常想讲一个过程切分成几块,然后让每个goroutine各自负责一块工作。当一个程序启动时,其主函数即在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。

    package main
     
    import (
        "fmt"
        "time"
    )
     
    func newTask() {
        i := 0
        for {
            i++
            fmt.Printf("new goroutine: i = %d
    ", i)
            time.Sleep(1 * time.Second) //延时1s
        }
    }
     
    func main() { 
        go newTask()   //创建一个 goroutine,启动另外一个任务
        i := 0
        for {          //main goroutine 循环打印
            i++
            fmt.Printf("main goroutine: i = %d
    ", i)
            time.Sleep(1 * time.Second) //延时1s
        }
    }
    

    程序运行结果:

    main goroutine: i = 15
    
    new goroutine: i = 15
    
    new goroutine: i = 16
    
    main goroutine: i = 16
    
    new goroutine: i = 17
    
    main goroutine: i = 17
    
    new goroutine: i = 18
    
    main goroutine: i = 18
    
    main goroutine: i = 19
    
    new goroutine: i = 19
    
    new goroutine: i = 20
    
    main goroutine: i = 20
    
    main goroutine: i = 21
    
    new goroutine: i = 21
    
    new goroutine: i = 22
    
    main goroutine: i = 22
    
    。。。
    

      

    串行地去执行两次loop函数:

    package main
    
    import (
    	"fmt"
    )
    
    func loop() {
    	for i := 0; i < 10; i++ {
    		fmt.Printf("%d ", i)
    	}
    }
    
    func main() {
    	loop()
    	loop()
    }
    

      

    输出会是这样的:

    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
    

     

    把一个loop放在一个goroutine里跑,我们可以使用关键字go来定义并启动一个goroutine:

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func loop() {
    	for i := 0; i < 10; i++ {
    		fmt.Printf("%d ", i)
    	}
    }
    
    func main() {
    	go loop()
    	loop()
    	time.Sleep(time.Second)
    }
    

    注意,如果不用 time.Sleep(time.Second), 则可能在goroutine还没来得及跑loop的时候,主函数已经退出了

    main函数退出地太快了,我们要想办法阻止它过早地退出,一个办法是让main等待一下:,目的达到了。可是采用等待的办法并不好,如果goroutine在结束的时候,告诉下主线说“Hey, 我要跑完了!”就好了,这就是接下来要讲到的信道。

    信道

    信道是什么?简单说,是goroutine之间互相通讯的东西。类似我们Unix上的管道(可以在进程间传递消息), 用来goroutine之间发消息和接收消息。其实,就是在做goroutine之间的内存共享。
    

      示例:

    package main
    
    import (
    	"fmt"
    )
    
    func main() {
    	var messages chan string = make(chan string)
    	go func(message string) {
    		messages <- message // 存消息
    	}("Ping!")
    	fmt.Println(<-messages) // 取消息
    }
    

      

    默认的,信道的存消息和取消息都是阻塞的 (叫做无缓冲的信道,不过缓冲这个概念稍后了解,先说阻塞的问题)。

    也就是说, 无缓冲的信道在取消息和存消息的时候都会挂起当前的goroutine,除非另一端已经准备好。

    package main
    
    import (
    	"fmt"
    )
    
    var ch chan int = make(chan int)
    
    func foo() {
    	ch <- 0 // 向ch中加数据,如果没有其他goroutine来取走这个数据,那么挂起foo, 直到main函数把0这个数据拿走
    }
    
    func main() {
    	go foo()
    	data := <-ch // 从ch取数据,如果ch中还没放数据,那就挂起main线,直到foo函数中放数据为止
    	fmt.Println(data)
    
    }
    

      

    那既然信道可以阻塞当前的goroutine, 那么回到上一部分「goroutine」所遇到的问题「如何让goroutine告诉主线我执行完毕了」 的问题来, 使用一个信道来告诉主线即可:

    package main
    
    import (
    	"fmt"
    )
    
    var complete chan int = make(chan int)
    
    func loop() {
    	for i := 0; i < 10; i++ {
    		fmt.Printf("%d ", i)
    	}
    	complete <- 0 // 执行完毕了,发个消息
    }
    
    func main() {
    	go loop()
    	<-complete // 直到线程跑完, 取到消息. main在此阻塞住
    
    }
    

      

    如果不用信道来阻塞主线的话,主线就会过早跑完,loop线都没有机会执行、、、

    其实,无缓冲的信道永远不会存储数据,只负责数据的流通,为什么这么讲呢?

    从无缓冲信道取数据,必须要有数据流进来才可以,否则当前线阻塞

    数据流入无缓冲信道, 如果没有其他goroutine来拿走这个数据,那么当前线阻塞
    无缓冲信道的大小都是0 (len(channel))

    如果信道正有数据在流动,我们还要加入数据,或者信道干涩,我们一直向无数据流入的空信道取数据呢? 就会引起死锁

    死锁

    一个死锁的例子:

    package main
    
    func main() {
    	ch := make(chan int)
    	<-ch // 阻塞main goroutine, 信道c被锁
    }
    

      执行结果:

    fatal error: all goroutines are asleep - deadlock!
    
    goroutine 1 [chan receive]:
    main.main()
    	D:/GOWORK/src/jsonstudy/main.go:5 +0x2c
    

      

    何谓死锁? 操作系统有讲过的,所有的线程或进程都在等待资源的释放。如上的程序中, 只有一个goroutine, 所以当你向里面加数据或者存数据的话,都会锁死信道, 并且阻塞当前 goroutine, 也就是所有的goroutine(其实就main线一个)都在等待信道的开放(没人拿走数据信道是不会开放的),也就是死锁咯。

    package main
    
    import (
    	"fmt"
    )
    
    func f(ch chan int) {
    	ch <- 10 // 将一个数据value写入至channel,这会导致阻塞,直到有其他goroutine从这个channel中读取数据
    }
    
    func main() {
    	var ch chan int     // 声明一个传递int类型的channel
    	ch = make(chan int) // 使用内置函数make()定义一个channel
    	go f(ch)
    	//=========
    
    	value := <-ch // 从channel中读取数据,如果channel之前没有写入数据,也会导致阻塞,直到channel中被写入数据为止
    	fmt.Println(value)
    
    	//=========
    	close(ch)
    
    }
    

      总结来看,为什么会死锁?非缓冲信道上如果发生了流入无流出,或者流出无流入,也就导致了死锁。或者这样理解 Go启动的所有goroutine里的非缓冲信道一定要一个线里存数据,一个线里取数据,要成对才行 。所以下面的示例一定死锁:

    package main
    
    func main() {
    	c, quit := make(chan int), make(chan int)
    
    	go func() {
    		c <- 1    // c通道的数据没有被其他goroutine读取走,堵塞当前goroutine
    		quit <- 0 // quit始终没有办法写入数据
    	}()
    
    	<-quit // quit 等待数据的写
    
    }
    

      

    仔细分析的话,是由于:主线等待quit信道的数据流出,quit等待数据写入,而func被c通道堵塞,所有goroutine都在等,所以死锁。

    简单来看的话,一共两个线,func线中流入c通道的数据并没有在main线中流出,肯定死锁。

    package main
    
    func main() {
    	
        c := make(chan int)
     
        go func() {
           c <- 1
        }()
    
    
    }
    

      

    程序正常退出了,很简单,并不是我们那个总结不起作用了,还是因为一个让人很囧的原因,main又没等待其它goroutine,自己先跑完了, 所以没有数据流入c信道,一共执行了一个goroutine, 并且没有发生阻塞,所以没有死锁错误

    那么死锁的解决办法呢?

    最简单的,把没取走的数据取走,没放入的数据放入, 因为无缓冲信道不能承载数据,那么就赶紧拿走!

    package main
    
    func main() {
    
    	c, quit := make(chan int), make(chan int)
    
    	go func() {
    		c <- 1
    		quit <- 0
    	}()
    
    	<-c // 取走c的数据!
    	<-quit
    
    }
    

    一个解决办法是缓冲信道, 即设置c有一个数据的缓冲大小:

    c := make(chan int, 1)
    

      

    这样的话,c可以缓存一个数据。也就是说,放入一个数据,c并不会挂起当前线, 再放一个才会挂起当前线直到第一个数据被其他goroutine取走, 也就是只阻塞在容量一定的时候,不达容量不阻塞。

    无缓冲信道的数据进出顺序
    我们已经知道,无缓冲信道从不存储数据,流入的数据必须要流出才可以。

    package main
    
    import (
    	"fmt"
    )
    
    var ch chan int = make(chan int)
    
    func foo(id int) { //id: 这个routine的标号
    	ch <- id
    }
    
    func main() {
    	for i := 0; i < 5; i++ { // 开启5个routine
    		go foo(i)
    	}
    	for i := 0; i < 5; i++ { // 取出信道中的数据
    		fmt.Print(<-ch)
    	}
    }
    

      

    输出:01234

    我们开了5个goroutine,然后又依次取数据。其实整个的执行过程细分的话,5个线的数据 依次流过信道ch, main打印之, 而宏观上我们看到的即 无缓冲信道的数据是先到先出,但是 无缓冲信道并不存储数据,只负责数据的流通

    缓冲信道
    终于到了这个话题了, 其实缓存信道用英文来讲更为达意: buffered channel.

    缓冲这个词意思是,缓冲信道不仅可以流通数据,还可以缓存数据。它是有容量的,存入一个数据的话 , 可以先放在信道里,不必阻塞当前线而等待该数据取走。

    当缓冲信道达到满的状态的时候,就会表现出阻塞了,因为这时再也不能承载更多的数据了,「你们必须把 数据拿走,才可以流入数据」。

    在声明一个信道的时候,我们给make以第二个参数来指明它的容量(默认为0,即无缓冲):

    var ch chan int = make(chan int, 2) // 写入2个元素都不会阻塞当前goroutine, 存储个数达到2的时候会阻塞
    

      如下的例子,缓冲信道ch可以无缓冲的流入3个元素:

    func main() {
        ch := make(chan int, 3)
        ch <- 1
        ch <- 2
        ch <- 3
    }
    

      

    如果你再试图流入一个数据的话,信道ch会阻塞main线, 报死锁。

    也就是说,缓冲信道会在满容量的时候加锁。

    其实,缓冲信道是先进先出的,我们可以把缓冲信道看作为一个线程安全的队列:

    func main() {
        ch := make(chan int, 3)
        ch <- 1
        ch <- 2
        ch <- 3
     
        fmt.Println(<-ch) // 1
        fmt.Println(<-ch) // 2
        fmt.Println(<-ch) // 3
    }
    

      

    信道数据读取和信道关闭

    你也许发现,上面的代码一个一个地去读取信道简直太费事了,Go语言允许我们使用range来读取信道

    func main() {
        ch := make(chan int, 3)
        ch <- 1
        ch <- 2
        ch <- 3
     
        for v := range ch {
            fmt.Println(v)
        }
    }
    

     

    如果你执行了上面的代码,会报死锁错误的,原因是range不等到信道关闭是不会结束读取的。也就是如果 缓冲信道干涸了,那么range就会阻塞当前goroutine, 所以死锁咯。

    那么,我们试着避免这种情况,比较容易想到的是读到信道为空的时候就结束读取:

    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3
    for v := range ch {
        fmt.Println(v)
        if len(ch) <= 0 { // 如果现有数据量为0,跳出循环
            break
        }
    }
    

    以上的方法是可以正常输出的,但是注意检查信道大小的方法不能在信道存取都在发生的时候,用于取出所有数据,这个例子 是因为我们只在ch中存了数据,现在一个一个往外取,信道大小是递减的。 

    或者显式地关闭信道:

    func main() {
    	ch := make(chan int, 3)
    	ch <- 1
    	ch <- 2
    	ch <- 3
    
    	for v := range ch {
    		fmt.Println(v)
    		if len(ch) <= 0 {
    			close((ch))
    		}
    	}
    }
    

      

    被关闭的信道会禁止数据流入, 是只读的。我们仍然可以从关闭的信道中取出数据,但是不能再写入数据了。

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func produce(p chan<- int) {
    	for i := 0; i < 10; i++ {
    		p <- i
    		fmt.Println("send:", i)
    	}
    }
    func consumer(c <-chan int) {
    	for i := 0; i < 10; i++ {
    		v := <-c
    		fmt.Println("receive:", v)
    	}
    }
    func main() {
    	ch := make(chan int)
    	go produce(ch)
    	go consumer(ch)
    	time.Sleep(1 * time.Second)
    }
    

     输出:

    receive: 0

    send: 0

    send: 1

    receive: 1

    receive: 2

    send: 2

    send: 3

    receive: 3

    receive: 4

    send: 4

    send: 5

    receive: 5

    receive: 6

    send: 6

    send: 7

    receive: 7

    receive: 8

    send: 8

    send: 9

    receive: 9

     

    在这段代码中,因为channel是没有缓冲的,所以当生产者给channel赋值后,生产者这个线程会阻塞,直到消费者线程将channel中的数据取出。消费者第一次将数据取出后,进行下一次循环时,消费者的线程也会阻塞,因为生产者还没有将数据存入,这时程序会去执行生产者的线程。程序就这样在消费者和生产者两个线程间不断切换,直到循环结束。

    下面我们再看一个带缓冲的例子:

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func produce(p chan<- int) {
    	for i := 0; i < 10; i++ {
    		p <- i
    		fmt.Println("send:", i)
    	}
    }
    func consumer(c <-chan int) {
    	for i := 0; i < 10; i++ {
    		v := <-c
    		fmt.Println("receive:", v)
    	}
    }
    func main() {
    	ch := make(chan int, 10)
    	go produce(ch)
    	go consumer(ch)
    	time.Sleep(1 * time.Second)
    }
    

      输出:

    send: 0

    send: 1

    send: 2

    send: 3

    send: 4

    send: 5

    send: 6

    send: 7

    send: 8

    send: 9

    receive: 0

    receive: 1

    receive: 2

    receive: 3

    receive: 4

    receive: 5

    receive: 6

    receive: 7

    receive: 8

    receive: 9

    在这个程序中,缓冲区可以存储10个int类型的整数,在执行生产者线程的时候,线程就不会阻塞,一次性将10个整数存入channel,在读取的时候,也是一次性读取。

    等待多gorountine的方案
    那好,我们回到最初的一个问题,使用信道堵塞主线,等待开出去的所有goroutine跑完。

    这是一个模型,开出很多小goroutine, 它们各自跑各自的,最后跑完了向主线报告。

    我们讨论如下2个版本的方案:

    只使用单个无缓冲信道阻塞主线

    使用容量为goroutines数量的缓冲信道

    对于方案1, 示例的代码大概会是这个样子:

    package main
    
    import (
    	"fmt"
    )
    
    var quit chan int // 只开一个信道
    
    func foo(id int) {
    	fmt.Println(id)
    	quit <- 0 // ok, finished
    }
    
    func main() {
    	count := 1000
    	quit = make(chan int) // 无缓冲
    
    	for i := 0; i < count; i++ {
    		go foo(i)
    	}
    
    	for i := 0; i < count; i++ {
    		<-quit
    	}
    }
    

      

    对于方案2, 把信道换成缓冲1000的:

    package main
    
    import (
    	"fmt"
    )
    
    var count = 1000
    
    func foo(id int, quit chan int) {
    	fmt.Println(id)
    	quit <- 0 // ok, finished
    }
    
    func main() {
    
    	quit := make(chan int, count) // 容量1000
    	for i := 0; i < count; i++ {
    		go foo(i, quit)
    	}
    
    	for i := 0; i < count; i++ {
    		<-quit
    	}
    }
    

      或者

    package main
    
    import (
    	"fmt"
    )
    
    var count = 1000
    var quit = make(chan int, count) // 容量1000
    
    func foo(id int) {
    
    	fmt.Println(id)
    	quit <- 0 // ok, finished
    }
    
    func main() {
    
    	for i := 0; i < count; i++ {
    		go foo(i)
    	}
    
    	for i := 0; i < count; i++ {
    		<-quit
    	}
    }
    

      

    其实区别仅仅在于一个是缓冲的,一个是非缓冲的。

    对于这个场景而言,两者都能完成任务, 都是可以的。

    • 无缓冲的信道是一批数据一个一个的「流进流出」

    • 缓冲信道则是一个一个存储,然后一起流出去

    Go语言的并发和并行

    package main
    
    import (
    	"fmt"
    )
    
    var quit chan int = make(chan int)
    
    func loop() {
    	for i := 0; i < 10; i++ {
    		fmt.Printf("%d ", i)
    	}
    	quit <- 0
    }
    
    func main() { // 开两个goroutine跑函数loop, loop函数负责打印10个数
    	go loop()
    	go loop()
    	for i := 0; i < 2; i++ {
    		<-quit
    	}
    
    }
    

      

     输出:

    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

    并行和并发

    从概念上讲,并发和并行是不同的

    • 两个队列,一个Coffee机器,那是并发
    • 两个队列,两个Coffee机器,那是并行

    从上面的两个例子执行后的表现来看,多个goroutine跑loop函数会挨个goroutine去进行,而sleep则是一起执行的。这是为什么?

    默认地, Go所有的goroutines只能在一个线程里跑 。也就是说, 以上两个代码都不是并行的,但是都是是并发的。

     

    如果当前goroutine不发生阻塞,它是不会让出CPU给其他goroutine的, 所以例子一中的输出会是一个一个goroutine进行的,而sleep函数则阻塞掉了 当前goroutine, 当前goroutine主动让其他goroutine执行, 所以形成了逻辑上的并行, 也就是并发

    真正的并行

    为了达到真正的并行,我们需要告诉Go我们允许同时最多使用多个核。

    回到起初的例子,我们设置最大开2个原生线程, 我们需要用到runtime包(runtime包是goroutine的调度器):

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    var quit chan int = make(chan int)
    
    func loop() {
    	for i := 0; i < 100; i++ { //为了观察,跑多些
    		fmt.Printf("%d ", i)
    	}
    	quit <- 0
    }
    
    func main() {
    	runtime.GOMAXPROCS(2) // 最多使用2个核
    	go loop()
    	go loop()
    	for i := 0; i < 2; i++ {
    		<-quit
    	}
    }
    

      这下会看到两个goroutine会抢占式地输出数据了。

    0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 0 1 2 3 4 5 6 7 8 9 10 11 12 26 27 28 29 30 31 32 33 34 35 36 37 38 39 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 37 38 39 40 41 42 43 44 45 88 89 90 91 92 93 94 95 96 97 98 99 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 

    我们还可以这样显式地让出CPU时间:

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    var quit chan int = make(chan int)
    
    func loop() {
    	for i := 0; i < 10; i++ {
    		runtime.Gosched() // 显式地让出CPU时间给其他goroutine
    		fmt.Printf("%d ", i)
    	}
    	quit <- 0
    }
    
    func main() {
    	go loop()
    	go loop()
    	for i := 0; i < 2; i++ {
    		<-quit
    	}
    }
    

      

    观察下结果会看到这样有规律的输出:

    0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9

    其实,这种主动让出CPU时间的方式仍然是在单核里跑。但手工地切换goroutine导致了看上去的“并行”。

    如下的程序,按照理解应该打印下5次 "world"呀,可是为什么什么也没有打印

    package main
    
    import "fmt"
    
    func say(s string) {
    	for i := 0; i < 5; i++ {
    		fmt.Println(s)
    	}
    }
    
    func main() {
    	go say("world") //开一个新的Goroutines执行
    	for {
    	}
    }
    

      


    楼下的答案已经很棒了,这里Go仍然在使用单核,for死循环占据了单核CPU所有的资源,而main线和say两个goroutine都在一个线程里面, 所以say没有机会执行。解决方案还是两个:

    允许Go使用多核(runtime.GOMAXPROCS)

    手动显式调动(runtime.Gosched)

    关于runtime包几个函数:

    • Gosched 让出cpu
    • NumCPU 返回当前系统的CPU核数量
    • GOMAXPROCS 设置最大的可同时使用的CPU核数
    • Goexit 退出当前goroutine(但是defer语句会照常执行)

    总结
    我们从例子中可以看到,默认的, 所有goroutine会在一个原生线程里跑,也就是只使用了一个CPU核。

    在同一个原生线程里,如果当前goroutine不发生阻塞,它是不会让出CPU时间给其他同线程的goroutines的,这是Go运行时对goroutine的调度,我们也可以使用runtime包来手工调度。

    我们可以在Golang官方网站的这里 找到一句话:

    When a coroutine blocks, such as by calling a blocking system call, the run-time automatically moves other coroutines on the same operating system thread to a different, runnable thread so they won't be blocked.

    也就是说:

    当一个goroutine发生阻塞,Go会自动地把与该goroutine处于同一系统线程的其他goroutines转移到另一个系统线程上去,以使这些goroutines不阻塞

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    var quit chan int = make(chan int)
    
    func loop(id int) { // id: 该goroutine的标号
    	for i := 0; i < 10; i++ { //打印10次该goroutine的标号
    		fmt.Printf("%d ", id)
    	}
    	quit <- 0
    }
    
    func main() {
    	runtime.GOMAXPROCS(2) // 最多同时使用2个核
    
    	for i := 0; i < 3; i++ { //开三个goroutine
    		go loop(i)
    	}
    
    	for i := 0; i < 3; i++ {
    		<-quit
    	}
    }
    

    跑几次会看到类似这些输出(不同机器环境不一样):

    0 0 0 2 2 2 2 2 2 1 1 1 1 1 1 1 1 1 1 0 2 2 2 2 0 0 0 0 0 0

    2 2 2 2 2 2 2 0 0 0 0 0 0 0 0 0 0 1 2 2 2 1 1 1 1 1 1 1 1 1 

    2 2 2 2 2 2 2 2 2 2 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1

    goroutine不阻塞不放开CPU

    执行它我们会发现以下现象:

    • 有时会发生抢占式输出(说明Go开了不止一个原生线程,达到了真正的并行)
    • 有时会顺序输出, 打印完0再打印1, 再打印2(说明Go开一个原生线程,单线程上的goroutine不阻塞不松开CPU)

    那么,我们还会观察到一个现象,无论是抢占地输出还是顺序的输出,都会有那么两个数字表现出这样的现象:

    • 一个数字的所有输出都会在另一个数字的所有输出之前

    原因是, 3个goroutine分配到至多2个线程上,就会至少两个goroutine分配到同一个线程里,单线程里的goroutine 不阻塞不放开CPU, 也就发生了顺序输出。

    4 runtime包
    4.1 Gosched
    runtime.Gosched() 用于让出CPU时间片,让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次某个时候从该位置恢复执行。

    这就像跑接力赛,A跑了一会碰到代码runtime.Gosched() 就把接力棒交给B了,A歇着了,B继续跑。

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    func main() {
    	//创建一个goroutine
    	go func(s string) {
    		for i := 0; i < 2; i++ {
    			fmt.Println(s)
    		}
    	}("world")
    
    	for i := 0; i < 2; i++ {
    		runtime.Gosched() //import "runtime"
    		/*
    		   屏蔽runtime.Gosched()运行结果如下:
    		       hello
    		       hello
    		   没有runtime.Gosched()运行结果如下:
    		       world
    		       world
    		       hello
    		       hello
    		*/
    		fmt.Println("hello")
    	}
    }
    

      

    4.2 Goexit
    调用 runtime.Goexit() 将立即终止当前 goroutine 执⾏,调度器确保所有已注册 defer 延迟调用被执行。

    示例代码:

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    func main() {
    	go func() {
    		defer fmt.Println("A.defer")
    
    		func() {
    			defer fmt.Println("B.defer")
    			runtime.Goexit() // 终止当前 goroutine, import "runtime"
    			fmt.Println("B") // 不会执行
    		}()
    
    		fmt.Println("A") // 不会执行
    	}() //别忘了()
    
    	//死循环,目的不让主goroutine结束
    	for {
    	}
    }
    

      

    程序运行结果: 

    1.  B.defer
    2.  A.defer

    4.3 GOMAXPROCS

    调用 runtime.GOMAXPROCS() 用来设置可以并行计算的CPU核数的最大值,并返回之前的值。

    示例代码:

    func main() {
        //n := runtime.GOMAXPROCS(1) //打印结果:111111111111111111110000000000000000000011111...
        n := runtime.GOMAXPROCS(2)     //打印结果:010101010101010101011001100101011010010100110...
        fmt.Printf("n = %d
    ", n)
     
        for {
            go fmt.Print(0)
            fmt.Print(1)
        }
    }
    

      

    在第一次执行(runtime.GOMAXPROCS(1))时,最多同时只能有一个goroutine被执行。所以
    会打印很多1。过了一段时间后,GO调度器会将其置为休眠,并唤醒另一个goroutine,这时候就开始打印很多0了,在打印的时候,goroutine是被调度到操作系统线程上的。

    在第二次执行(runtime.GOMAXPROCS(2))时,我们使用了两个CPU,所以两个goroutine可以一起被执行,以同样的频率交替打印0和1。

    1 select作用

    Go里面提供了一个关键字select,通过select可以监听channel上的数据流动。

    select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。

    与switch语句可以选择任何可使用相等比较的条件相比, select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:

        select {
        case <-chan1:
            // 如果chan1成功读到数据,则进行该case处理语句
        case chan2 <- 1:
            // 如果成功向chan2写入数据,则进行该case处理语句
        default:
            // 如果上面都没有成功,则进入default处理流程
        }
    

      

    在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。

    如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。

    如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有两种可能的情况:

    • 如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。
    • 如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去。
    func fibonacci(c, quit chan int) {
        x, y := 1, 1
        for {
            select {
            case c <- x:
                x, y = y, x+y
            case <-quit:
                fmt.Println("quit")
                return
            }
        }
    }
     
    func main() {
        c := make(chan int)
        quit := make(chan int)
     
        go func() {
            for i := 0; i < 6; i++ {
                fmt.Println(<-c)
            }
            quit <- 0
        }()
     
        fibonacci(c, quit)
    }
    

     程序运行结果: 

    1
    2
    3
    4
    5
    6
    7
    8
    quit
    

      超时

    有时候会出现goroutine阻塞的情况,那么我们如何避免整个程序进入阻塞的情况呢?我们可以利用select来设置超时,通过如下的方式实现:

    func main() {
        c := make(chan int)
        o := make(chan bool)
        go func() {
            for {
                select {
                case v := <-c:
                    fmt.Println(v)
                case <-time.After(5 * time.Second):
                    fmt.Println("timeout")
                    o <- true
                    break
                }
            }
        }()
        //c <- 666 // 注释掉,引发 timeout
        <-o
    }
    

      select有几个重要的点要强调:

    1.如果有多个case都可以运行,select会随机公平地选出一个执行,其他不会执行 

    select有几个重要的点要强调:
    
    1.如果有多个case都可以运行,select会随机公平地选出一个执行,其他不会执行 
    

      

    输出:

    (随机)二者其一

    2.case后面必须是channel操作,否则报错。

     

     



     

    1 select作用Go里面提供了一个关键字select,通过select可以监听channel上的数据流动。
    select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。
    与switch语句可以选择任何可使用相等比较的条件相比, select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:
        select {    case <-chan1:        // 如果chan1成功读到数据,则进行该case处理语句    case chan2 <- 1:        // 如果成功向chan2写入数据,则进行该case处理语句    default:        // 如果上面都没有成功,则进入default处理流程    }在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。
    如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。
    如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有两种可能的情况:
    如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去。示例代码:
    func fibonacci(c, quit chan int) {    x, y := 1, 1    for {        select {        case c <- x:            x, y = y, x+y        case <-quit:            fmt.Println("quit")            return        }    }} func main() {    c := make(chan int)    quit := make(chan int)     go func() {        for i := 0; i < 6; i++ {            fmt.Println(<-c)        }        quit <- 0    }()     fibonacci(c, quit)}程序运行结果: 
    12345678quit2 超时
    有时候会出现goroutine阻塞的情况,那么我们如何避免整个程序进入阻塞的情况呢?我们可以利用select来设置超时,通过如下的方式实现:
    func main() {    c := make(chan int)    o := make(chan bool)    go func() {        for {            select {            case v := <-c:                fmt.Println(v)            case <-time.After(5 * time.Second):                fmt.Println("timeout")                o <- true                break            }        }    }()    //c <- 666 // 注释掉,引发 timeout    <-o}select有几个重要的点要强调:
    1.如果有多个case都可以运行,select会随机公平地选出一个执行,其他不会执行 
    func main() {    ch := make (chan int, 1)     ch<-1    select {    case <-ch:        fmt.Println("Good luck!")    case <-ch:        fmt.Println("Good day!")    }}输出:
    (随机)二者其一
    2.case后面必须是channel操作,否则报错。
    func main() {    ch := make (chan int, 1)    ch<-1    select {    case <-ch:        fmt.Println("Good luck!")    case 2:        fmt.Println("Good day!")    }}输出报错:
    2 evaluated but not usedselect case must be receive, send or assign recv
    3.select中的default子句总是可运行的。所以没有default的select才会阻塞等待事件 上代码:
    func main() {    ch := make (chan int, 1)    // ch<-1   <= 注意这里备注了。    select {    case <-ch:        fmt.Println("Good luck!")    default:        fmt.Println("Good day!")    }}输出:
    Good day!
    4.没有运行的case,那么江湖阻塞事件发生报错(死锁)
    func main() {    ch := make (chan int, 1)    // ch<-1   <= 注意这里备注了。    select {    case <-ch:        fmt.Println("Good luck!")    }}输出报错:
    fatal error: all goroutines are asleep - deadlock!
    select的应用场景
    1.timeout 机制(超时判断)
    func main() {    timeout := make (chan bool, 1)    go func() {        time.Sleep(1*time.Second) // 休眠1s,如果超过1s还没I操作则认为超时,通知select已经超时啦~        timeout <- true    }()    ch := make (chan int)    select {    case <- ch:    case <- timeout:        fmt.Println("超时啦!")    }}以上是入门版,通常代码中是这么写的:
    func main() {    ch := make (chan int)    select {    case <-ch:    case <-time.After(time.Second * 1): // 利用time来实现,After代表多少时间后执行输出东西        fmt.Println("超时啦!")    }}2.判断channel是否阻塞(或者说channel是否已经满了)
    func main() {    ch := make (chan int, 1)  // 注意这里给的容量是1    ch <- 1    select {    case ch <- 2:    default:        fmt.Println("通道channel已经满啦,塞不下东西了!")    }}3.退出机制
    func main() {    i := 0    ch := make(chan string, 0)    defer func() {        close(ch)    }()     go func() {        DONE:         for {            time.Sleep(1*time.Second)            fmt.Println(time.Now().Unix())            i++             select {            case m := <-ch:                println(m)                break DONE // 跳出 select 和 for 循环            default:            }        }    }()     time.Sleep(time.Second * 4)    ch<-"stop"}输出:
    153239047115323904721532390473stop1532390474
    这边要强调一点:退出循环一定要用break + 具体的标记,或者goto也可以。否则其实不是真的退出。
    func main() {    i := 0    ch := make(chan string, 0)    defer func() {        close(ch)    }()     go func() {         for {            time.Sleep(1*time.Second)            fmt.Println(time.Now().Unix())            i++             select {            case m := <-ch:                println(m)                goto DONE // 跳出 select 和 for 循环            default:            }        }        DONE:    }()     time.Sleep(time.Second * 4)    ch<-"stop"}输出:
    1532390525153239052615323905271532390528stop
    9.select死锁select不注意也会发生死锁,前文有提到一个,这里分几种情况,重点再次强调:
    1.如果没有数据需要发送,select中又存在接收通道数据的语句,那么将发送死锁
    func main() {      ch := make(chan string)    select {    case <-ch:    }}预防的话加default。
    空select,也会引起死锁
    package main func main() {      select {}} 
    goroutine 编程相关的库
    1 尽可能避免使用 runtime 库的函数和方法
    尽管你可以使用以下函数,但不建议你使用。
    GOMAXPROCS 设置 M 数量,除非你打算写 coroutineGoexit 退出当前 goroutine, 用 close(c) 或 select 通知 goroutine 退出GoroutineProfile 除非 debugGosched 放弃 yield 当前的 M , 用 time.sleep(1) 更稳健NumGoroutine 当前 goroutine 数量LockOSThread/UnLockOSThread 强制 goroutine 独占 M 直到 Unlock, 除非你打算做客户化函数的调度2 最常用的类型与库
    channel 类型与通讯:
    读、写 channel,阻塞 goroutineselect case,或 select case default 读多个 channel 阻塞 goroutine (见 gotour concurrency)close channel消息(数据)驱动编程!time 库
    sleep() : Sleep pauses the current goroutine。这是最重要的语句,要及时让 runtime 调度 goroutineTick(d Duration): 定周期产生时间消息After(d Duration): 定时产生消息Timer: 类型
    func main() {    tick := time.Tick(100 * time.Millisecond)    boom := time.After(500 * time.Millisecond)    for {        select {        case t := <-tick:            fmt.Println("tick.", t)        case t := <-boom:            fmt.Println("BOOM!")            return//      default://          fmt.Println("    .")//          time.Sleep(50 * time.Millisecond)        }    }}加 default 和 不加 default 的区别?…
    时间事件是 goroutine 的最重要事件源之一!
    sync 库 Mutex :互斥量。WaitGroup:信号量Cond:带锁的通知量Once 确保函数线程安全地仅只执行一次。(做单实例的神器)3 集合类型都不是线程安全的
    slicemap
    利用sync.WaitGroup实现协程同步      使用sync.WaitGroup。WaitGroup顾名思义,就是用来等待一组操作完成的。WaitGroup内部实现了一个计数器,用来记录未完成的操作个数,它提供了三个方法,Add()用来添加计数。Done()用来在操作结束时调用,使计数减一。Wait()用来等待所有的操作结束,即计数变为0,该函数会在计数不为0时等待,在计数为0时立即返回。 
    package main import (    "fmt"    "sync") func main() {     var wg sync.WaitGroup     wg.Add(2) // 因为有两个动作,所以增加2个计数    go func() {        fmt.Println("Goroutine 1")        wg.Done() // 操作完成,减少一个计数    }()     go func() {        fmt.Println("Goroutine 2")        wg.Done() // 操作完成,减少一个计数    }()     wg.Wait() // 等待,直到计数为0}GMP调度()
    操作系统中的程序可以往下分解为进程、线程和协程。Go中,协程被称为goroutine,被runtime进行调度。GMP调度中,G代表goroutine,M代表线程,P是Processor,它包含了运行goroutine的资源,只有当 M 关联一个 P 后才能执行 G 。
    调度器的两大思想:
    复用线程:2个体现:1.work stealing,当本线程无可运行的G时,尝试从其它县城绑定的P偷取G,而不是销毁线程;2.hand off,当本线程因为G进行系统调用阻塞是,线程释放绑定的P,把P转移给其它空闲的线程执行;利用并行:GOMAXPROCS设置P的数量,当GOMAXPROCS大于1时,就最多有GOMAXPROCS个线程处于运行状态,这些线程可能分布在多个CPU核上同时运行,使得并发利用并行。
    在Go中,线程是运行goroutine的实体,调度器的功能就是把可运行的goroutine分配到工作线程上。
     
    前天去面试,被问到golang是如何实现高并发的,之前在 GO并发编程实战 这本书看到过介绍,但是没有引起重视。传统的并发形式:多线程共享内存,这也是Java、C#或者C++等语言中的多线程开发的常规方法,其实golang语言也支持这种传统模式,另外一种是Go语言特有的,也是Go语言推荐的:CSP(communicating sequential processes)并发模型。不同于传统的多线程通过共享内存来通信,CSP讲究的是“以通信的方式来共享内存”。“不要以共享内存的方式来通信,相反,要通过通信来共享内存。”
    go语言使用MPG模式来实现CSP在传统的并发中起很多线程只会加大CPU和内存的开销,太多的线程会大量的消耗计算机硬件资源,造成并发量的瓶颈。
    M指的是Machine,一个M直接关联了一个内核线程。P指的是”processor”,代表了M所需的上下文环境,也是处理用户级代码逻辑的处理器。G指的是Goroutine,其实本质上也是一种轻量级的线程。


    我个人的理解:M关联了一个内核线程,通过调度器P(上下文)的调度,可以连接1个或者多个G,相当于把一个内核线程切分成了了N个用户线程,M和P是一对一关系(但是实际调度中关系多变),通过P调度N个G(P和G是一对多关系),实现内核线程和G的多对多关系(M:N),通过这个方式,一个内核线程就可以起N个Goroutine,同样硬件配置的机器可用的用户线程就成几何级增长,并发性大幅提高。 ————————————————版权声明:本文为CSDN博主「wade3015」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。原文链接:https://blog.csdn.net/wade3015/article/details/88133145

  • 相关阅读:
    MySQL百万级数据量分页查询方法及其优化
    Windows10内置Linux子系统初体验
    谈谈区块链(18):以太坊的UTXO
    永久告别mac屏幕涂层脱落
    Cloud Foundry中DEA启动应用实例时环境变量的使用
    jQuery 事件方法大全-超全的总结
    UVA12304-2D Geometry 110 in 1!
    Hbase总结(五)-hbase常识及habse适合什么场景
    Android笔记之 网络http通信
    Mac下安装Redis
  • 原文地址:https://www.cnblogs.com/saryli/p/15397560.html
Copyright © 2011-2022 走看看