zoukankan      html  css  js  c++  java
  • Go语言之Go语言并发

    Go 语言并发

    Golang从语言层面就对并发提供了支持,而goruntine是Go语言并发设计的核心。

    Go语言的并发机制运用起来非常舒适,在启动并发的方式上直接添加了语言级的关键字就可以实现,和其他编程语言相比更加轻量。

    进程&线程

    A、进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。

    B、线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。

    C、一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。

    并发&并行

    A、多线程程序在一个核的cpu上运行,就是并发。

    B、多线程程序在多个核的cpu上运行,就是并行。

    并发不是并行:

    并发主要由切换时间片来实现"同时"运行,并行则是直接利用多核实现多线程的运行,Go程序可以设置使用核数,以发挥多核计算机的能力。

    协程&线程

    协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。

    线程:一个线程上可以跑多个协程,协程是轻量级的线程。

    Goroutine 介绍

    goroutine 只是由官方实现的超级"线程池"。每个实力4~5KB的栈内存占用和由于实现机制而大幅减少的创建和销毁开销是Go语言高并发的根本原因。

    goroutine 奉行通过通信来共享内存,而不是共享内存来通信。只需在函数调用语句前添加 go 关键字,就可创建并发执行单元。开发人员无需了解任何执行细节,调度器会自动将其安排到合适的系统线程上执行。goroutine 是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务。

    事实上,入口函数 main 就以 goroutine 运行。另有与之配套的 channel 类型,用以实现 "以通讯来共享内存" 的 CSP 模式。

    goroutine 是通过 Go 的 runtime管理的一个线程管理器

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	go func() {
    		fmt.Println("hello word")
    	}()
    	time.Sleep(1 * time.Second)
    }
    
    
    进入 main 函数开启一个 goroutine 运行匿名函数函数体内容:fmt.Println("Hello, World!") 。主线程执行 time.Sleep(1 * time.Second) 等待 1 秒。goroutine 执行完毕回到主线程,主线程的sleep 完成结束程序。
    
    注意:若去掉 time.Sleep(1 * time.Second) 这段代码,进入 main 函数开启一个 goroutine,没等 goroutine 运行匿名函数函数体内容,主线程已经完成结束程序。
    

    Go语言Chan应用

    Channel 是 CSP 模式的具体实现,用于多个 goroutine 通讯。其内部实现了同步,确保并发安全。
    Channel 是先进先出,线程安全的,多个goroutine同时访问,不需要加锁。

    chan 阻塞

    我们定义的管道 intChan 容量是5,开启 goroutine 写入10条数据,在写满5条数据时会阻塞,而 read() 每秒会从 intChan 中读取一条,然后write() 再会写入一条数据。

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func write(ch chan int) {
    	for i := 0; i < 10; i++ {
    		ch <- i
    		fmt.Println("write data:", i)
    	}
    }
    func read(ch chan int) {
    	for {
    		i := <-ch
    		fmt.Println("read data:", i)
    		time.Sleep(time.Second)
    	}
    }
    func main() {
    	intChan := make(chan int, 5)
    	go write(intChan)
    	go read(intChan)
    
    	time.Sleep(10 * time.Second)
    }
    
    

    同步模式

    默认为同步模式,需要发送和接收配对。否则会被阻塞,直到另一方准备好后被唤醒。

    package main
    
    import "fmt"
    
    func main() {
    	data := make(chan string) // 数据交换队列
    	exit := make(chan bool)   // 退出通知
    
    	go func() {
    		for d := range data { // 从队列迭代接收数据,直到 close 。
    			fmt.Println(d)
    		}
    		fmt.Println("received over")
    		exit <- true // 发出退出通知。
    	}()
    	data <- "oldboy" // 发送数据。
    	data <- "Linux"
    	data <- "GOlang"
    	data <- "python"
    	close(data) // 关闭队列。
    	fmt.Println("send over")
    	<-exit // 等待退出通知。
    }
    
    

    异步模式

    异步方式通过判断缓冲区来决定是否阻塞。如果缓冲区已满,发送被阻塞;缓冲区为空,接收被阻塞。

    通常情况下,异步 channel 可减少排队阻塞,具备更高的效率。但应该考虑使用指针规避大对象拷贝,将多个元素打包,减小缓冲区大小。

    package main
    
    import "fmt"
    
    func main() {
    	data := make(chan string, 3) // 缓冲区可以存储 3 个元素
    	exit := make(chan bool)
    
    	data <- "old boy" // 在缓冲区未满前,不会阻塞。
    	data <- "python"
    	data <- "linux"
    
    	go func() {
    		for d := range data { // 在缓冲区未空前,不会阻塞。
    			fmt.Println(d)
    		}
    		//  表示读取出data通道中数据
    		exit <- true
    	}()
    	data <- "java" // 如果缓冲区已满,阻塞。
    	data <- "C"
    	close(data)
    	<-exit
    }
    
    

    chan 选择

    如果需要同时处理多个 channel,可使用 select 语句。它随机选择一个可用 channel 做收发操作,或执行 default case。

    用 select 实现超时控制:

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	exit := make(chan bool)
    	intChan := make(chan int, 2)
    	strChan := make(chan string, 2)
    
    	go func() {
    		select {
    		case vi := <-intChan:
    			fmt.Println(vi)
    		case vs := <-strChan:
    			fmt.Println(vs)
    		case <-time.After(time.Second * 3):
    			fmt.Println("timeout.")
    		}
    
    		exit <- true
    	}()
    
    	// intChan <- 100 // 注释掉,引发 timeout。
    	// strChan <- "oldboy"
    
    	<-exit
    }
    
    

    在循环中使用 select default case 需要小心,避免形成洪水。

    简单工厂模式

    用简单工厂模式打包并发任务和 channel。

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"time"
    )
    
    func NewTest() chan int {
    	c := make(chan int)
    	rand.Seed(time.Now().UnixNano())
    	go func() {
    		time.Sleep(time.Second)
    		c <- rand.Int()
    	}()
    	return c
    }
    func main() {
    	t := NewTest()
    	fmt.Println(<-t) // 等待 goroutine 结束返回。
    }
    
    

    Go 语言WaitGroup

    WaitGroup能够一直等到所有的goroutine执行完成,并且阻塞主线程的执行,直到所有的goroutine执行完成。

    WaitGroup总共有三个方法:Add(delta int),Done(),Wait()。简单的说一下这三个方法的作用。

    Add:添加或者减少等待goroutine的数量;

    Done:相当于Add(-1);

    Wait:执行阻塞,直到所有的WaitGroup数量变成 0;

    WaitGroup用于线程同步,WaitGroup等待一组线程集合完成,才会继续向下执行。 主线程(goroutine)调用Add来设置等待的线程(goroutine)数量。 然后每个线程(goroutine)运行,并在完成后调用Done。 同时,Wait用来阻塞,直到所有线程(goroutine)完成才会向下执行。

    WaitGroup实例如下:

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        var wg sync.WaitGroup
        for i := 0; i < 5; i++ {
            wg.Add(1)
            go func(n int) {
                // defer wg.Done()
                defer wg.Add(-1)
                EchoNum(n)
            }(i)
        }
        wg.Wait()
    }
    
    func EchoNum(i int) {
        time.Sleep(time.Second)
        fmt.Println(i)
    }
    

    程序中将每次循环的数量 sleep 1 秒钟后输出。如果程序不使用WaitGroup,将不会输出结果。因为goroutine还没执行完,主线程已经执行完毕。
    注掉的 defer wg.Done() 和 defer wg.Add(-1) 作用一样。

    WaitGroup应用

    一、用 channel 实现信号量 (semaphore)。

    package main
    
    import (
    	"sync"
    )
    
    func main() {
    	wg := sync.WaitGroup{}
    	wg.Add(3) //增加三个线程
    	sem := make(chan int, 1)
    	for i := 0; i < 3; i++ {
    		go func(id int) {
    			defer wg.Done() //减少一个线程
    			sem <- 1        // 向 sem 发送数据,阻塞或者成功。
    			for x := 0; x < 3; x++ {
    				println(id, x)
    			}
    			<-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据
    		}(i)
    	}
    	wg.Wait()
    
    }
    
    
    D:goprogramgosrcday10       
    λ go run test.go                
    0 0                             
    0 1                             
    0 2                             
    1 0                             
    1 1                             
    1 2                             
    2 0                             
    2 1                             
    2 2                             
    

    二、用 closed channel 发出退出通知。

    package main
    
    import (
    	"sync"
    	"time"
    )
    
    func main() {
    	wg := sync.WaitGroup{}
    	exit := make(chan bool)
    	for i := 0; i < 2; i++ {
    		wg.Add(1)
    		go func(n int) {
    			defer wg.Done()
    			task := func() {
    				println(n, time.Now().String())
    				time.Sleep(1 * time.Second)
    			}
    			for {
    				select {
    				case <-exit: // closed channel 不会阻塞,因此可用作退出通知。
    					return
    				default: // 执行正常任务。
    					task()
    				}
    			}
    		}(i)
    	}
    	time.Sleep(time.Second * 3) // 让测试 goroutine 运行一会。
    	close(exit)                 // 发出退出通知。
    	wg.Wait()
    }
    
    

    WaitGroup陷阱

    一、add 数量小于done数量导致 WaitGroup数量为负数

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        var wg sync.WaitGroup
        wg.Add(1)
    
        oldboy := func() {
            time.Sleep(time.Second)
            fmt.Println("The old boy welcomes you.")
            wg.Done()
        }
    
        go oldboy()
        go oldboy()
        go oldboy()
    
        time.Sleep(time.Second * 3)
        wg.Wait()
    }
    

    运行错误:

    panic: sync: negative WaitGroup counter
    

    二、add 数量大于 done 数量造成 deadlock

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        var wg sync.WaitGroup
        wg.Add(4)
    
        oldboy := func() {
            time.Sleep(time.Second)
            fmt.Println("The old boy welcomes you.")
            wg.Done()
        }
    
        go oldboy()
        go oldboy()
        go oldboy()
    
        time.Sleep(time.Second * 3)
        wg.Wait()
    }
    

    运行错误:

    fatal error: all goroutines are asleep - deadlock!
    

    三、跳过 add 和 Done 操作,直接执行 Wait

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        wg := sync.WaitGroup{}
    
        for i := 0; i < 5; i++ {
            go func(wg sync.WaitGroup, i int) {
                wg.Add(1)
                fmt.Printf("i=>%d
    ", i)
                wg.Done()
            }(wg, i)
        }
        wg.Wait()
        fmt.Println("exit")
    }
    

    WaitGroup 同步的是 goroutine, 而上面的代码却在 goroutine 中进行 Add(1) 操作。因此,可能在这些 goroutine 还没来得及 Add(1) 就已经执行 Wait 操作了。

    四、WaitGroup 拷贝传值问题

    package main
    
    import (
        "fmt"
    
        "sync"
    )
    
    func main() {
        wg := sync.WaitGroup{}
    
        for i := 0; i < 5; i++ {
            wg.Add(1)
            go func(wg sync.WaitGroup, i int) {
                fmt.Printf("i=>%d
    ", i)
                wg.Done()
            }(wg, i)
        }
        wg.Wait()
    }
    

    运行错误:

    fatal error: all goroutines are asleep - deadlock!
    

    wg 给拷贝传递到了 goroutine 中,导致只有 Add 操作,其实 Done操作是在 wg 的副本执行的,因此 Wait 就死锁了。

    正确代码实例如下:

    package main
    
    import (
        "fmt"
    
        "sync"
    )
    
    func main() {
        wg := new(sync.WaitGroup)
        // wg := &sync.WaitGroup{}
    
        for i := 0; i < 5; i++ {
            wg.Add(1)
            go func(wg *sync.WaitGroup, i int) {
                fmt.Printf("i=>%d
    ", i)
                wg.Done()
            }(wg, i)
        }
        wg.Wait()
    }
    

    Go 语言runtime

    runtime包提供Go语言运行时的系统交互的操作,例如控制goruntine的功能。

    调度器不能保证多个 goroutine 执行次序,且进程退出时不会等待它们结束。

    默认情况下,进程启动后仅允许一个系统线程服务于 goroutine。可使用环境变量或标准库函数 runtime.GOMAXPROCS 修改,让调度器用多个线程实现多核并行,而不仅仅是并发。

    runtime包常用方法

    const GOOS string = theGoos
    

    GOOS是可执行程序的目标操作系统(将要在该操作系统的机器上执行):darwin、freebsd、linux等。

    func Gosched()
    

    Gosched使当前go程放弃处理器,以让其它go程运行。它不会挂起当前go程,因此当前go程未来会恢复执行。

    func NumCPU() int
    

    NumCPU返回本地机器的逻辑CPU个数。

    func GOROOT() string
    

    GOROOT返回Go的根目录。如果存在GOROOT环境变量,返回该变量的值;否则,返回创建Go时的根目录。

    func GOMAXPROCS(n int) int
    

    GOMAXPROCS设置可同时执行的最大CPU数,并返回先前的设置。 若 n < 1,它就不会更改当前设置。本地机器的逻辑CPU数可通过 NumCPU 查询。本函数在调度程序优化后会去掉。

    func Goexit()
    

    Goexit终止调用它的go程。其它go程不会受影响。Goexit会在终止该go程前执行所有defer的函数。

    在程序的main go程调用本函数,会终结该go程,而不会让main返回。因为main函数没有返回,程序会继续执行其它的go程。如果所有其它go程都退出了,程序就会崩溃。

    func NumGoroutine() int
    

    NumGoroutine返回当前存在的Go程数。

    runtime包应用

    一、查看机器的逻辑CPU个数、Go的根目录、操作系统

    package main
    
    import "runtime"
    
    func main() {
    	println("cpu:", runtime.NumCPU())
    	println("go:", runtime.GOROOT())
    	println("操作系统:", runtime.GOOS)
    
    }
    
    
    D:goprogramgosrcday10
    λ go run test.go
    cpu: 4
    go: D:gogo
    操作系统: windows
    
    

    二、GOMAXPROCS 设置golang运行的cpu核数

    Golang 默认所有任务都运行在一个 cpu 核里,如果要在 goroutine 中使用多核,可以使用 runtime.GOMAXPROCS 函数修改,当参数小于 1 时使用默认值。

    package main
    
    import (
        "fmt"
        "runtime"
    )
    
    var (
        signal = false
    )
    
    func oldboy() {
        signal = true
    }
    
    func init() {
        runtime.GOMAXPROCS(1)
    }
    
    func main() {
        go oldboy()
        for {
            if signal {
                break
            }
        }
        fmt.Println("end")
    }
    

    上述代码单核执行如果for前面或者中间不延迟,主线程不会让出CPU,导致异步的线程无法执行,从而无法设置signal的值,从而出现死循环。

    运行的cpu核数设置成2核

    package main
    
    import (
        "fmt"
        "runtime"
    )
    
    var (
        signal = false
    )
    
    func oldboy() {
        signal = true
    }
    
    func init() {
        runtime.GOMAXPROCS(2)
    }
    
    func main() {
        go oldboy()
        for {
            if signal {
                break
            }
        }
        fmt.Println("end")
    }
    

    运行结果:

    end
    

    三、Gosched 让当前的 goroutine 让出 CPU

    这个函数的作用是让当前 goroutine 让出 CPU,当一个 goroutine 发生阻塞,Go 会自动地把与该 goroutine 处于同一系统线程的其他 goroutine 转移到另一个系统线程上去,以使这些 goroutine 不阻塞。当前的 goroutine 不会挂起,当前的 goroutine 程未来会恢复执行。

    runtime.Gosched()用于让出CPU时间片。这就像跑接力赛,A跑了一会碰到代码runtime.Gosched()就把接力棒交给B了,A歇着了,B继续跑。

    package main
    
    import (
    	"runtime"
    	"sync"
    )
    
    func main() {
    	wg := new(sync.WaitGroup)
    	wg.Add(1)
    	go func() {
    		for i := 0; i < 6; i++ {
    			println(i)
    			runtime.Gosched()
    		}
    		defer wg.Done()
    	}()
    	for i := 0; i < 6; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			println("Hello.Golang!")
    		}()
    	}
    	wg.Wait()
    }
    
    
    D:goprogramgosrcday10
    λ go run test.go
    0
    1
    2
    3
    4
    5
    Hello.Golang!
    Hello.Golang!
    Hello.Golang!
    Hello.Golang!
    Hello.Golang!
    Hello.Golang!
    

    四、Goexit 终止当前 goroutine 执行

    调用 runtime.Goexit 将立即终止当前 goroutine 执行,调度器确保所有已注册 defer 延迟调用被执行。

    package main
    
    import (
        "fmt"
        "runtime"
        "sync"
    )
    
    func main() {
        wg := new(sync.WaitGroup)
        wg.Add(1)
    
        go func() {
            defer wg.Done()
            defer fmt.Println("A.defer")
            func() {
                defer fmt.Println("B.defer")
                runtime.Goexit() // 终止当前 goroutine
                fmt.Println("B") // 不会执行
            }()
    
            fmt.Println("A") // 不会执行
        }()
    
        wg.Wait()
    }
    
    B.defer
    A.defer
    
  • 相关阅读:
    BZOJ 1003--[ZJOI2006]物流运输(最短路)
    BZOJ 1002--[FJOI2007]轮状病毒(高精度)
    BZOJ 1001--[BeiJing2006]狼抓兔子(最短路&对偶图)
    BZOJ 1719--[Usaco2006 Jan] Roping the Field 麦田巨画(几何&区间dp)
    BZOJ 2821--作诗(分块)
    BZOJ 2724--蒲公英(分块)
    BZOJ 2388--旅行规划(分块&单调栈&二分)
    用python操作Git
    JS常用到的日期函数
    Python3.7使用celery出现from . import async, base SyntaxError: invalid syntax错误
  • 原文地址:https://www.cnblogs.com/heych/p/12579607.html
Copyright © 2011-2022 走看看