zoukankan      html  css  js  c++  java
  • golang(8):channel读写 & goroutine 通信

    goroutine 

    1、进程和线程

    A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位
    B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位
    C. 一个进程可以创建和撤消多个线程;同一个进程中的多个线程之间可以并发执行

    2. 协程和线程

    协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户线线程的调度也是自己实现的 (go func() 起的就是协程)
    线程:一个线程上可以跑多个协程,协程是轻量级的线程 (一个物理线程可以跑多个协程)

    3. 不同 goroutine 之间进行通信

    // a. 全局变量 和 锁同步
    // b. Channel

    打印1到10所有数的阶乘(全局变量 和 锁同步)

    // 示例代码:
    package main 
    
    import (
        "fmt"
        "time"
        "sync"        // 修改公共数据要加锁
    )
    
    var (
        m = make(map[int]uint64)        // 定义一个全局变量的切片,用于保存生成的阶乘结果
        lock sync.Mutex                        // 定义一个互斥锁
    )
    
    type task struct {
        n int
    }
    
    func calc(p *task){
        var sum uint64
        sum = 1
    
        for i := 1; i <= p.n; i++ {
            sum *= uint64(i)        // i 是 int 类型, sum 是 uint 类型,要强转一下
        }
    
        lock.Lock()            // 加锁;修改公共数据    要加锁
        m[p.n] = sum
        lock.Unlock()        // 释放锁
    
    }
    
    func main(){
        for i:= 0; i <= 10; i++ {
            t := &task{i}
            fmt.Printf("%d addr:%p
    ",i,&t)
            go calc(t)        // 开启 11 个新线程
        }    
    
        time.Sleep(time.Second * 5)
    
        lock.Lock()        // 由于读取的公共数据,而且不知道 sleep 5 秒后所有的 goroutine 是否已经全部执行完,所以此时读取 m 也要加锁
        for k,v := range m {
            fmt.Printf("%d! = %v
    ",k,v)
        }
        lock.Unlock()
    
    
        // 如果想知道你的程序在执行过程中有没有资源竞争的情况,可以在 build 的时候加上 --race 参数
    }
    
    
    // 运行结果:
    [root@NEO example01_goroutine01_factorial]# go run main/main.go
    0 addr:0xc00000e030            // 每次生成的 t 都是不同的地址
    1 addr:0xc00000e040
    2 addr:0xc00000e048
    3 addr:0xc00000e050
    4 addr:0xc00000e058
    5 addr:0xc00000e060
    6 addr:0xc00000e068
    7 addr:0xc00000e070
    8 addr:0xc00000e078
    9 addr:0xc00000e080
    10 addr:0xc00000e088
    10! = 3628800
    3! = 6
    4! = 24
    5! = 120
    6! = 720
    7! = 5040
    0! = 1
    1! = 1
    2! = 2
    8! = 40320
    9! = 362880
    [root@NEO example01_goroutine01_factorial]# 
    
    // 第一次写上面的代码的时候,犯了如下错误:
    func main(){
        var t task        // 错误原因: 全程只生成了这一个 t 
        for i:= 0; i <= 10; i++ {
            t.n = i        // 每次修改 t.n 是都是在对同一个 t 的 n 作修改
            fmt.Printf("%d addr:%p
    ",i,&t)
            go calc(&t)        //  calc(&t) 在调用 t.n 时,好多线程用的都是同一个 t 中的 n
        }    
        
        ...
    }

    channel

    channel概念:

    a. 类似 unix 中的管道(pipe)
    b. 先进先出
    c. 线程安全,多个 goroutine 同时访问,不需要加锁
    d. channel 是有类型的,一个整数的 channel 只能存放整数

    管道分类:

    1) 无缓存区管道,如下:
    ch := make(chan int)        // By default, sends and receives block until the other side is ready. (无缓存区管道中,只有当发送者和接收者都准备好的时候,管道才不会阻塞)
    
    2) 有缓存区管道,如下:
    ch := make(chan int, 100)
    // Sends to a buffered channel block only when the buffer is full. Receives block when the buffer is empty.  (有缓存区的管道)

    无缓存区管道示例:

    1) 错误示例:
    package main
    
    func main(){
        var ch chan int
        ch = make(chan int)        // 无缓存区管道
        
        ch <- 10        // 此时会阻塞,因为无缓存区管道中,只有当接收者和发送者都准备好时,才不会阻塞,但程序永远都到不了接收读数据(下面一行的代码),所以程序会阻塞在这一步,由于又是阻塞在了主线程,main函数无法继续执行以后的代码,所以会 panic
        <- ch        // 从管道中取出一个数据
    }
    
    // 运行结果:
    [root@NEO ~]# go run main.go
    fatal error: all goroutines are asleep - deadlock!        // 直接死锁
    
    goroutine 1 [chan send]:
    main.main()
        /root/main.go:16 +0x55
    exit status 2
    [root@NEO ~]#
    
    
    2) 正确示例:
    package main
    
    import (
        "fmt"
    )
    
    func main(){
        var ch chan int
        ch = make(chan int)        // 无缓存区管道
            
        go func(){            // 新开一个 goroutine;
            ch <- 10        // 新开的子线程也会阻塞在这一步;但由于不是阻塞了主线程,程序就不会 panic
        }()
        a :=<- ch                // 从管道中读取数据;当从管道中读取数据时,上面新开的 goroutine 就不会阻塞了
        fmt.Println(a)
    }
    
    // 运行结果:
    [root@NEO ~]# go run main.go
    10
    [root@NEO ~]# 

    无缓存区管道死锁参考:https://blog.csdn.net/u011328417/article/details/89473323

    channel 声明

    // var 变量名 chan 类型
    var test chan int
    var test chan string
    var test chan map[string]string
    var test chan stu
    var test chan *stu

    示例代码:

    package main
    
    import "fmt"
    
    type student struct{
        name string
    }
    
    func main(){
        var mapChan chan map[string]string                // 声明一个 channel;chan + 变量类型 才是管道的类型
        mapChan = make(chan map[string]string,10)        // 管道初始化;管道满了后就不能再往里面写了(也可以阻塞)
        m := make(map[string]string,16)                    // 声明并初始化;超过16个元素后 m 会自动扩容
        m["stu01"] = "neo01"
        m["stu02"] = "neo02"
        mapChan <- m            // 把 m 写入 mapChan 管道
    
        var stuChan chan *student
        stuChan = make(chan *student,10)    // 初始化的时候也要 chan *student
        stu := student{name:"neo"}
        stuChan <- &stu        // 写入是传入地址
    
        var interfChan chan interface{}            // interface{} --> 这样 channel 中可存任何类型
        interfChan = make(chan interface{},10)
        interfChan <- stu
    
        /*
        stu01 :=<- interfChan            // 读取 interfChan;声明并初始化 stu01
        fmt.Printf("stu01:%v
    ",stu01)
        */
        var stu01 interface{}        // stu01 是一个接口类型
        stu01 =<- interfChan
    
        stu02,ok := stu01.(student)            // 断言
        if !ok {
            fmt.Println("can not convert")
            return
        }
        fmt.Println("stu02:",stu02)
        
    }
    
    
    // 运行结果:
    [root@NEO example02_channel01]# go run main/main.go
    stu02: {neo}
    [root@NEO example02_channel01]#

    channel 和 goroutine 相结合的事例(一个goroutine写,一个goroutine读)

    // 示例代码:
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func write(ch chan int){    // 管道 chan 后面一定要加上变量类型
        for i := 0; i < 15; i++{
            ch <- i
            fmt.Printf("put data i:%d
    ",i)
        }    
    }
    
    func read(ch chan int){
        for {
            var b int
            b =<- ch
            fmt.Println(b)
            time.Sleep(time.Second)
        }
    }
    
    
    func main() {
        var intChan chan int
        intChan = make(chan int,10)        // 管道超过10个元素后,就不能再管道中添加元素(再加就也会阻塞)
    
        go write(intChan)
        go read(intChan)
    
        time.Sleep(time.Second * 20)
    }
    
    
    // 运行结果:
    [root@NEO example03_chan_goroute]# go run main/main.go
    put data i:0
    put data i:1
    put data i:2
    put data i:3
    put data i:4
    put data i:5
    put data i:6
    put data i:7
    put data i:8
    put data i:9
    0
    put data i:10
    1
    put data i:11
    2
    put data i:12
    3
    put data i:13
    4
    put data i:14
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    [root@NEO example03_chan_goroute]# 
    // 上面代码的运行现象:同时开了 read() 和 write() 两个 goroutine,write() 中的前10个元素会立马写入到 管道 中,然后 write() 会阻塞,此时 read() 每秒读取一个,write() 再写入一个

    channels 的关闭  

    // 示例代码:
    package main
    
    import (
        "fmt"    
    )
    
    func main(){
        var ch chan int
        ch = make(chan int,10)
    
        for i := 0; i < 10;  i++ {
            ch <- i
        }
    
        close(ch)        // 关闭管道;管道关闭后就不能再往管道里面写了,但是可以从管道里面读
    
        for {
            var b int
            b,ok :=<- ch        // ok 等于 false 表示这个管道已经关了 (将管道中的数据读取完后,才会返回false)
            if ok == false{
                fmt.Println("channel is closed")
                break
            }
            fmt.Println("ok:",ok)
            fmt.Println(b)
        }
    }
    
    // 运行结果:
    [root@NEO example03_chan_close]# go run main/main.go
    ok: true
    0
    ok: true
    1
    ok: true
    2
    ok: true
    3
    ok: true
    4
    ok: true
    5
    ok: true
    6
    ok: true
    7
    ok: true
    8
    ok: true
    9
    channel is closed
    [root@NEO example03_chan_close]# 
    
    // 关闭管道的相关官方解释:
    // The loop for i := range c receives values from the channel repeatedly until it is closed.
    // Channels aren't like files; you don't usually need to close them. Closing is only necessary when the receiver must be told there are no more values coming, such as to terminate a range loop.

    关闭管道的参考链接:https://blog.csdn.net/Tovids/article/details/77867284

    利用 for range 取出管道中所有的元素

    // 示例代码:
    package main
    
    import "fmt"
    
    
    func main(){
        var ch chan int
        ch = make(chan int,10)
        for i := 0; i < 10; i++ {
            ch <- i
        }
    
        close(ch)        // 把管道关闭,当 for range 取出管道中所有元素后就会自动退出 for range 循环
        for v := range ch{        // 利用 for range 取出管道中所有元素
            fmt.Println(v)
        }
    }
    
    // 运行结果:
    [root@NEO example03_goroute_range]# go run main/main.go
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    [root@NEO example03_goroute_range]# 

    chan 之间的同步

    示例1:打印出1000以内的所有素数

    // 示例代码:
    package main
    
    import (
        "fmt"
    )
    
    
    func calc(src chan int, res chan int, exitch chan bool){        // 检查是否为素数的函数
        for {
            v,ok :=<- src        // 从集合管道中取出一个元素
            if ok == false {    // 只有 src(intChan)close() 掉后,当 src(intChan)中所有元素取完后,ok 才会变成 false,所以 intChan 一定要关掉
                break
            }
            
            flag := true        // 是否为素数的标识符
            for i := 2; i < v; i++{
                if v % i == 0{
                    flag = false
                    break
                }
            }
            
            // 如果 flag == true,则为素数,添加到 res 管道中
            if flag {
                res <- v
            }
        }
        exitch <- true        // 线程执行完后,把 true 加到 exitch 中
    }
    
    
    func closeResChan(exitch chan bool,res chan int){
        for i := 0; i <8; i++{        // 如果 exitChan 管道中能取出8个元素,表示 calc() 那8个线程都已经执行完
            <- exitch        // 从 exitch 管道中取出一个元素(扔掉)
        }
        close(res)        // 关掉 resChan
    }
    
    func main(){
        var intChan chan int
        intChan = make(chan int,1000)
        var resChan chan int
        resChan = make(chan int,1000)        // 用于存放所有的素数
        var exitChan chan bool      
        exitChan = make(chan bool,8)    // 定义一个 exitChan 管道,用于判断上面的8个线程是否已经全部执行完
    
        for i := 2; i <= 1000; i++{        // 1000以内的数放到 intChan 管道中
            intChan <- i
        }
        close(intChan)        // 关闭管道
    
        // 把素数添加到 resChan 管道中
        for i := 0; i < 8; i++ {    // 假设电脑为8核,开启8个线程
            go calc(intChan, resChan, exitChan)
        }
    
        // 当上面新开的8个线程都执行完后,才应该把 resChan close() 掉
        go closeResChan(exitChan, resChan)
    
        // 从 resChan 管道中读取结果
        for v := range resChan{        // 为了不让 main 线程出现死锁,则应该先把 resChan close() 掉
            fmt.Println(v)
        }
        
    }
    
    // for v := range channel {}    // channel 管道也能 for range 来获取管道中的元素 ,v 表示取出来的 channel 中的元素

    示例2:一个线程发送数据,一个线程接收数据

    // 示例代码:
    package main
    
    import "fmt"
    
    func send(intChan chan int, exitChan chan bool){
        for i := 0; i < 10; i++{
            intChan <- i
        }
        close(intChan)            // 写入完成之后,关闭 intChan,这样 for range intChan 时取出数据后 才会自动退出
        exitChan <- true         // 表示 send() 线程已经执行完毕
    }
    
    func recv(intChan chan int, exitChan chan bool){
        for v := range intChan{        // 由于 send() 中已经关闭了 intChan,所以当 intChan 中的数据取完后,该 for range 会自动退出
            fmt.Println(v)
        }
        
        exitChan <- true
    }
    
    func main(){
        var intChan chan int
        intChan = make(chan int,10)
        var exitChan chan bool
        exitChan = make(chan bool,2)    // 用于标识程序退出;长度 2 表示最多标识2个线程
    
        go send(intChan, exitChan)
        go recv(intChan, exitChan)
    
        for i := 0; i < 2; i++ {
            <- exitChan
        }
    }
    
    // 运行结果:
    [root@NEO example03_goroute_sync02]# go run main/main.go
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    [root@NEO example03_goroute_sync02]# 

    chan 的只读和只写

    a. 只读 chan 的声明
        var 变量名字 <-chan int
        var readChan <-chan int
    
    b. 只写 chan 的声明
        var 变量名字 chan<- int
        var writeChan chan<- int

    channels 的 select 操作

    select 用于多个channel监听并收发消息,当任何一个case满足条件则会执行,若没有可执行的case,就会执行default,如果没有default,程序就会阻塞。
    // 语法示例:
    select {
    case i := <-c:
        // use i
    default:
        // receiving from c would block
    }

    示例代码:

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main(){
        var ch01 chan int
        ch01 = make(chan int,10)
        ch02 := make(chan int,10)
    
        go func(){
            var i int
            for {
                ch01 <- i
                time.Sleep(time.Second)
                ch02 <- i * i
                time.Sleep(time.Second)
                i++ 
            }
        }()
    
        for {
            select {            // 哪个管道里面有数据,就走哪个 case;所有管道都没数据,就走 default
            case v :=<- ch01:
                fmt.Println("ch01 ele:",v)
            case v :=<- ch02:
                fmt.Println("ch02 ele:",v)
            default:
                fmt.Println("get data timeout")
                time.Sleep(time.Second)
            }
        }
    }
    
    
    // 运行结果:
    [root@NEO example03_goroute_select]# go run main/main.go
    get data timeout
    ch01 ele: 0
    ch02 ele: 0
    get data timeout
    get data timeout
    ch01 ele: 1
    ch02 ele: 1
    get data timeout
    get data timeout
    ch02 ele: 4
    ch01 ele: 2
    get data timeout
    get data timeout
    ch01 ele: 3
    ch02 ele: 9
    get data timeout
    get data timeout
    ch01 ele: 4
    ch02 ele: 16
    get data timeout
    ^Csignal: interrupt
    [root@NEO example03_goroute_select]# 

    定时器

    // 语法示例1: NewTicker()
    t := time.NewTicker(time.Second)    // NewTicker() 中的参数是执行的时间间隔(多久触发一次) --> 可作超时控制(推荐)
    for v := range t.C {                // t.C 是一个 channel,背后也是个 goroutine
        fmt.Println("hello,",v)
    }
    // time.NewTicker() 用完之后要关掉  --> t.Stop() 
    
    // 示例2:time.After()
    select {
        case  <- time.After(time.Second):        // 一秒之后会执行一下,然后以后就不会再触发 --> 可用于设置超时时间控制
            fmt.Println("after")
    }
    // 系统会回收 time.After() 的资源(性能不如 time.NewTicker() 的 t.Stop() 好)
    
    // 推荐使用 time.NewTicker()

    time.NewTicker() 示例:

    // 示例代码1:
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main(){
        t := time.NewTicker(time.Second)
        for v := range t.C {
            fmt.Println("hello,",v)
        }
    }
    
    // 运行结果:
    [root@NEO example04_time_newticker01]# go run main/main.go
    hello, 2019-08-11 02:46:37.940256992 +0800 CST m=+1.000517671
    hello, 2019-08-11 02:46:38.940289529 +0800 CST m=+2.000550209
    hello, 2019-08-11 02:46:39.940339135 +0800 CST m=+3.000599804
    hello, 2019-08-11 02:46:40.940363886 +0800 CST m=+4.000624567
    hello, 2019-08-11 02:46:41.940399863 +0800 CST m=+5.000660552
    ^Csignal: interrupt
    [root@NEO example04_time_newticker01]# 
    
    // 示例代码2:
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main(){
        var ch01 chan int
        ch01 = make(chan int,10)
        ch02 := make(chan int,10)
    
        t := time.NewTicker(time.Second)
    
        select {
        case v :=<- ch01:
            fmt.Println("ch01 ele:",v)
        case v :=<- ch02:
            fmt.Println("ch02 ele:",v)
        case <- t.C:        // 超时控制
            fmt.Println("get data timeout")
        }
        t.Stop()    // 关闭 定时器
    }
    
    // 运行结果:
    [root@NEO example04_time_newticker02]# go run main/main.go
    get data timeout                // 1秒后输出了这条结果
    [root@NEO example04_time_newticker02]# 

    time.After() 示例:

    // 示例代码:
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main(){
        var ch01 chan int
        ch01 = make(chan int,10)
        ch02 := make(chan int,10)
    
        select {
        case v :=<- ch01:
            fmt.Println("ch01 ele:",v)
        case v :=<- ch02:
            fmt.Println("ch02 ele:",v)
        case <- time.After(time.Second):        // 设置超时时间控制
            fmt.Println("get data timeout")
        }
    }
    
    // 运行结果:
    [root@NEO main]# go run main.go
    get data timeout                        // 1秒后输出了这条结果
    [root@NEO main]# 

    goroutine 的 recover

    // 如果 goroutine 中有 panic ,那整个程序也会崩溃,如下:
    // 示例代码:
    package main
    
    import (
        //"fmt"
        "time"
    )
    
    func test(){
        var m map[string]string        // 只声明未初始化,直接使用会 panic
        m["name"] = "stu01"            // goroutine 会 panic
    }
    
    func main(){
        for i := 0; i < 10; i++{
            go test()
        }    
        time.Sleep(time.Second * 10)
    }
    
    // 运行结果:
    [root@NEO example05_goroutine_panic]# go run main/main.go
    panic: assignment to entry in nil map
    
    goroutine 4 [running]:
    main.test()
        /root/go/project/src/go_dev/day08/example05_goroutine_panic/main/main.go:10 +0x4b
    created by main.main
        /root/go/project/src/go_dev/day08/example05_goroutine_panic/main/main.go:15 +0x3e
    exit status 2
    [root@NEO example05_goroutine_panic]# 

    异常捕获: recover()

    // 示例代码:
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func test(){
        
        defer func(){        // 进行异常捕获的匿名函数
            err := recover()        // recover()  --> 捕获异常
            if err != nil {
                fmt.Println("panic:",err)
            }
        }()
    
        var m map[string]string        // 只声明未初始化,直接使用会 panic
        m["name"] = "stu01"
    }
    
    func main(){
        for i := 0; i < 10; i++{
            go test()
        }    
        time.Sleep(time.Second * 10)
    }
    
    // 运行结果:
    [root@NEO example05_goroutine_panic]# go run main/main.go
    panic: assignment to entry in nil map        // 虽然 goroutine 中有异常,但由于做了异常捕获,主线程不会挂掉 (主线程和其他 goroutine 不受影响)
    panic: assignment to entry in nil map
    panic: assignment to entry in nil map
    panic: assignment to entry in nil map
    panic: assignment to entry in nil map
    panic: assignment to entry in nil map
    panic: assignment to entry in nil map
    panic: assignment to entry in nil map
    panic: assignment to entry in nil map
    panic: assignment to entry in nil map
    [root@NEO example05_goroutine_panic]# 

    单元测试

    1. 文件名必须以 _test.go 结尾
    2. 使用 go test 命令进行测试;
    3. _test.go 文件中的测试函数必须以 Test    开头
    4. 传入的参数 *testing.T
  • 相关阅读:
    Kafka如何保证读写的跨分区与会话
    Kafka topic中的partition的leader选举
    Kafka为什么这么快
    sqoop导入导出
    为什么要用redis去重
    bypass SortShuffleManager的bypass运行机制
    大数据常用端口号
    vector基础
    【拓扑排序】
    【POJ】Crazy Search(hash)
  • 原文地址:https://www.cnblogs.com/neozheng/p/11318803.html
Copyright © 2011-2022 走看看