zoukankan      html  css  js  c++  java
  • Go 通道 Chan 详解

    首先我们来看线程,在golang里面也叫goroutine

    Go 语言 select 语句

    select是Go中的一个控制结构,类似于用于通信的switch语句。每个case必须是一个通信操作,要么是发送要么是接收。

    select随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行。一个默认的子句应该总是可运行的。

    语法

    Go 编程语言中 select 语句的语法如下:

    select {
        case communication clause  :
           statement(s);      
        case communication clause  :
           statement(s); 
        /* 你可以定义任意数量的 case */
        default : /* 可选 */
           statement(s);
    }

    以下描述了 select 语句的语法:

    • 每个case都必须是一个通信
    • 所有channel表达式都会被求值
    • 所有被发送的表达式都会被求值
    • 如果任意某个通信可以进行,它就执行;其他被忽略。
    • 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。 
      否则:
      1. 如果有default子句,则执行该语句。
      2. 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。

    [plain] view plain copy
    1. package main  
    2.   
    3. import (  
    4.     "fmt"  
    5. )  
    6.   
    7. func main() {  
    8.     var chan_test chan interface{}    //双项通道  
    9.     var read_test <-chan interface{}  //单项通道 只支持 读  
    10.     var write_test chan<- interface{} //单项通道 只支持 写  
    11.     read_test <- "xiao"               //会报错。因为这个通道 只是单项通道只支持读 不支持写  
    12.     <-write_test                      //会报错。因为这个通道 只是单项通道只支持写 不支持读  
    13.     fmt.Println(chan_test)  
    14.     fmt.Println(read_test)  
    15.     fmt.Println(write_test)  

    我们需要了解一下并发与并行。golang的线程是一种并发机制,而不是并行。它们之间的区别大家可以上网搜一下,网上有很多的介绍。

    下面我们先来看一个例子吧

    import(
             "fmt"
    )
    
    funcmain(){
    
        go fmt.Println("1")
        fmt.Println("2")    
    }

    在golang里面,使用go这个关键字,后面再跟上一个函数就可以创建一个线程。后面的这个函数可以是已经写好的函数,也可以是一个匿名函数

    funcmain(){
    
        var i=3
    
        go func(a int) {
            fmt.Println(a)
            fmt.Println("1")
        }(i)
        fmt.Println("2")
    
    }

    上面的代码就创建了一个匿名函数,并且还传入了一个参数i,下面括号里的i是实参,a是形参。

    那么上面的代码能按照我们预想的打印1、2、3吗?告诉你们吧,不能,程序只能打印出2。下面我把正确的代码贴出来吧

    import(
    
        "fmt"
        "time"    
    )
    funcmain(){
    
        var i = 3
        go func(a int) {
            fmt.Println(a)
            fmt.Println("1")
        }(i)
        fmt.Println("2")
        time.Sleep(1 * time.Second)
    }

    我只是在最后加了一行让主线程休眠一秒的代码,程序就会依

    次打印出2、3、1。 
    那为什么会这样呢?因为程序会优先执行主线程,主线程执行完成后,程序会立即退出,没有多余的时间去执行子线程。如果在程序的最后让主线程休眠1秒钟,那程序就会有足够的时间去执行子线程。

    线程先讲到这里,下面我们来看看通道吧。

    通道又叫channel,顾名思义,channel的作用就是在多线程之间传递数据的。

    创建无缓冲channel

    chreadandwrite :=make(chan int)

    chonlyread := make(<-chan int) //创建只读channel 
    chonlywrite := make(chan<- int) //创建只写channel 
    下面我们来看一个例子:

        ch :=make(chan int)     
        ch <- 1
          go func() {
            <-ch
            fmt.Println("1")
          }()
          fmt.Println("2")  

    这段代码执行时会出现一个错误:fatal error: all goroutines are asleep - deadlock!

    这个错误的意思是说线程陷入了死锁,程序无法继续往下执行。那么造成这种错误的原因是什么呢?

    我们创建了一个无缓冲的channel,然后给这个channel赋值了,程序就是在赋值完成后陷入了死锁。因为我们的channel是无缓冲的,即同步的,赋值完成后来不及读取channel,程序就已经阻塞了。这里介绍一个非常重要的概念:channel的机制是先进先出,如果你给channel赋值了,那么必须要读取它的值,不然就会造成阻塞,当然这个只对无缓冲的channel有效。对于有缓冲的channel,发送方会一直阻塞直到数据被拷贝到缓冲区;如果缓冲区已满,则发送方只能在接收方取走数据后才能从阻塞状态恢复。

    对于上面的例子有两种解决方案:

    1、给channel增加缓冲区,然后在程序的最后让主线程休眠一秒,代码如下:

        ch :=make(chan int,1)
        ch <- 1
        go func() {
            v := <-ch
            fmt.Println(v)
        }()
        time.Sleep(1 * time.Second)
        fmt.Println("2")

    这样的话程序就会依次打印出1、2

    2、把ch<-1这一行代码放到子线程代码的后面,代码如下:

        ch :=make(chan int)
    
        go func() {
            v := <-ch
            fmt.Println(v)
        }()
        ch <- 1
        fmt.Println("2")

    这里就不用让主线程休眠了,因为channel在主线程中被赋值后,主线程就会阻塞,直到channel的值在子线程中被取出。

    最后我们看一个生产者和消费者的例子:

    import (
    
        "fmt"
        "time"
    )
    func produce(p chan<- int) {
        for i := 0; i < 10; i++ {
            p <- i
            fmt.Println("send:", i)
        }
    }
    func consumer(c <-chan int) {
        for i := 0; i < 10; i++ {
            v := <-c
            fmt.Println("receive:", v)
        }
    }
    func main() {
        ch := make(chan int)
        go produce(ch)
        go consumer(ch)
        time.Sleep(1 * time.Second)
    }

    在这段代码中,因为channel是没有缓冲的,所以当生产者给channel赋值后,生产者这个线程会阻塞,直到消费者线程将channel中的数据取出。消费者第一次将数据取出后,进行下一次循环时,消费者的线程也会阻塞,因为生产者还没有将数据存入,这时程序会去执行生产者的线程。程序就这样在消费者和生产者两个线程间不断切换,直到循环结束。

    下面我们再看一个带缓冲的例子:

    import (
    
        "fmt"
        "time"
    )
    func produce(p chan<- int) {
        for i := 0; i < 10; i++ {
            p <- i
            fmt.Println("send:", i)
        }
    }
    func consumer(c <-chan int) {
        for i := 0; i < 10; i++ {
            v := <-c
            fmt.Println("receive:", v)
        }
    }
    func main() {
        ch := make(chan int, 10)
        go produce(ch)
        go consumer(ch)
        time.Sleep(1 * time.Second)
    }

    在这个程序中,缓冲区可以存储10个int类型的整数,在执行生产者线程的时候,线程就不会阻塞,一次性将10个整数存入channel,在读取的时候,也是一次性读取。


    channels 是 goroutines之间通信的工具, 可以理解为管道, 虽然go也提供共享变量的方式, 但是更加推荐使用channel

    [plain] view plain copy
    1. func TestChan(t *testing.T) {  
    2.     c := make(chan int)  
    3.   
    4.     go func() {  
    5.         c <- 48  
    6.     }()  
    7.   
    8.     fmt.Println(<- c)  
    9.     // 保持持续运行  
    10.     holdRun()  
    11. }  
    12.   
    13. func holdRun() {  
    14.     time.Sleep(1 * time.Hour)  
    15. }  

    c := make(chan int)    声明一个  传输整形 的unbuffer  chan,(接收消息和发送消息者将会阻塞,直到channel ”可用“)

    <-  操作符用来接受和发送消息   chan <- 48    发送“48“ 进入管道,   <-chan   接收消息

    如果: c: = make(chan int, 10)   声明一个  传输整形 的buffer  chan, 容量为10, 接收消息将可以立即返回除非channel里面没有消息, 发送者返回除非容量满


    [plain] view plain copy
    1. func TestDeadLock(t *testing.T) {  
    2.     c := make(chan int)  
    3.     c <- 42  
    4.     val := <-c  
    5.     fmt.Println(val)  
    6. }  
    7.   
    8. func TestDeadLock1(t *testing.T) {  
    9.     c := make(chan int)  
    10.     //c := make(chan int, 0)  
    11.   
    12.     go func() {  
    13.         c <- 48  
    14.     }()  
    15.   
    16.     val := <-c  
    17.     fmt.Println(val)  
    18.   
    19. }  
    20.   
    21. func TestDeadLock2(t *testing.T) {  
    22.     c := make(chan int, 1)  
    23.     c <- 42  
    24.     val := <-c  
    25.     fmt.Println(val)  
    26. }  
    对于方法, TestDeadLock  将:fatal error: all goroutines are asleep - deadlock! 因为c <- 42 将会一直阻塞,直到出现消费者, 无容量的chan是同步, 正确的写法是 TestDeadLock1 这样不会死锁, 或者  TestDeadLock2 也不会死锁

    [plain] view plain copy
    1. func TestChan(t *testing.T) {  
    2.     c := make(chan int,  10)  
    3.   
    4.     go func() {  
    5.         c <- 48  
    6.         c <- 96  
    7.         time.Sleep(2 * time.Second)  
    8.         c <- 200  
    9.     }()  
    10.   
    11.     time.Sleep(1 * time.Second)  
    12.     for v := range c {  
    13.         fmt.Println(v)  
    14.     }  
    15.   
    16.     // 保持持续运行  
    17.     holdRun()  
    18. }  
    chan  可以配合 range 使用, 相当于每次foreach 每次去取一次


    [plain] view plain copy
    1. func TestDChan(t *testing.T) {  
    2.     c := make(chan int)  
    3.     go f1(c)  
    4.   
    5.     holdRun()  
    6. }  
    7.   
    8. func f1(c chan <- int) {  
    9.     c <- 0  
    10.     <- c   
    11. }  
    f1的参数类型是  chan <- int   表明 这个chan单向的, 只能用来接收。 f1函数编译错误:invalid operation: <-c (receive from send-only type chan<- int)

    相对应的发送的chan  : c <-chan string


    select 关键字可以和  chan使用, 类似与switch

    [plain] view plain copy
    1. func TestSelect(t *testing.T) {  
    2.     c1 := make(chan int)  
    3.     c2 := make(chan int, 10)  
    4.     c3 := make(chan int, 20)  
    5.   
    6.     go func(c1, c2, c3 chan<- int) {  
    7.         for {  
    8.   
    9.             time.Sleep(1 * time.Second)  
    10.             c1 <- 1  
    11.             time.Sleep(3 * time.Second)             
    12.             c2 <- 2  
    13.             time.Sleep(1 * time.Second)  
    14.             c3 <- 3  
    15.         }  
    16.   
    17.     }(c1, c2, c3)  
    18.   
    19.     for {  
    20.         select {  
    21.         case int1 := <-c1:  
    22.             fmt.Println("c1 value :", int1)  
    23.         case int2 := <-c2:  
    24.             fmt.Println("c2 value :", int2)  
    25.         case int3 := <-c3:  
    26.             fmt.Println("c3 vaule :", int3)  
    27.         case <-time.After(2 * time.Second):  
    28.             fmt.Println("timeount")  
    29.         }  
    30.   
    31.     }  
    32. }  
    select 将阻塞直到有一个chan ready或者 超时,即 time.After


    [plain] view plain copy
    1. select {  
    2.         case int1 := <-c1:  
    3.             fmt.Println("c1 value :", int1)  
    4.         case int2 := <-c2:  
    5.             fmt.Println("c2 value :", int2)  
    6.         case int3 := <-c3:  
    7.             fmt.Println("c3 vaule :", int3)  
    8.         default:  
    9.             fmt.Println("nothing ready")  
    10.         }  

    select 将不会阻塞, 直接执行 default


    1、通道关闭时间: 
    一般紧跟在往通道输入最后一个数据之后。

        jobs := make(chan int, 5)
        for i := 1; i < 4; i++ {
            jobs <- i
            fmt.Println("sent job", i)
            //      if i == 3 {
            //          close(jobs)
            //      }
        }
        close(jobs)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2、读取关闭的无缓存通道: 
    读取关闭后的无缓存通道,不管通道中是否有数据,返回值都为0和false。

        done := make(chan int)
        go func() {
            done <- 1
        }()
        close(done)
        for i := 1; i <= 3; i++ {
            t, ok := <-done
            fmt.Println(i, ":", t, ok)
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    运行结果: 
    1:0 false 
    2:0 false 
    3:0 false

    3、读取关闭的有缓存通道: 
    读取关闭后的有缓存通道,将缓存数据读取完后,再读取返回值为0和false。

        done := make(chan int 1)
        go func() {
            done <- 1
        }()
        close(done)
        for i := 1; i <= 3; i++ {
            t, ok := <-done
            fmt.Println(i, ":", t, ok)
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    运行结果: 
    1:1 true 
    2:0 false 
    3:0 false

    4、range遍历通道: 
    通道写完后,必须关闭通道,否则range遍历会出现死锁。


    附 :

        time.Afer 实现:

    [plain] view plain copy
    1. func After(d Duration) <-chan Time {  
    2.     return NewTimer(d).C  
    3. }  


    [plain] view plain copy
    1. func NewTimer(d Duration) *Timer {  
    2.     c := make(chan Time, 1)  
    3.     t := &Timer{  
    4.         C: c,  
    5.         r: runtimeTimer{  
    6.             when: when(d),  
    7.             f:    sendTime,  
    8.             arg:  c,  
    9.         },  
    10.     }  
    11.     startTimer(&t.r)  
    12.     return t  
    13. }  

    NewTimer 返回一个 timer (timer是一个一次性时间,d 时间后 发送当前时间给  C) , 由于C在此之前会一直阻塞。从而达到超时的效果


    1.广播

    当一个通道关闭时, 所有此通道的读取都会退出阻塞. 利用此特性可以实现广播功能

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
        c := make(chan bool)
    
        for i := 0; i < 5; i++ {
            go func(n int) {
                <-c //读取到数据或通道关闭时会退出阻塞
                fmt.Println("收到通知:", n)
            }(i)
    
        }
    
        fmt.Println("广播通知")
        close(c) //关闭通道, 广播通知
    
        time.Sleep(time.Second * 1) //等待其它协程处理
    
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.同时读取多个通道

    有时需要监视多个通道, 这个时候可以使用select

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
        c1 := make(chan int)
        c2 := make(chan int)
    
        go func() {
            var n int
            select {
            case n = <-c1:
            case n = <-c2:
            }
            fmt.Println("数据:", n)
        }()
    
        fmt.Println("写入")
        c1 <- 1
        //c2 <- 2
    
        close(c1)
        close(c2)
    
        time.Sleep(time.Second * 1) //等待其它协程处理
    
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3.超时

    channel本身无法设置超时, 可以使用select和定时器实现超时功能

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
        c := make(chan int, 1)
    
        go func() {
            timeout := false
            var n int
    
            select {
            case n = <-c:
            case <-time.After(time.Second * 1):
                timeout = true
            }
    
            if timeout {
                fmt.Println("超时")
            } else {
                fmt.Println("读取到数据:", n)
            }
        }()
    
        time.Sleep(time.Second * 2)
        c <- 2
    
        time.Sleep(time.Second * 3) //等待其它协程处理
    
    }


    正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
  • 相关阅读:
    连接池的配置与维护
    对面向对象的理解
    ActiveMQ重试机制
    activemq持久化的几种方式详解
    zookeeper写数据流程
    G1垃圾回收器
    REDIS哨兵模式和集群模式
    REDIS复制
    REDIS参数配置和运行状态
    slow-log 和bin-log相关参数介绍
  • 原文地址:https://www.cnblogs.com/candlia/p/11920164.html
Copyright © 2011-2022 走看看