zoukankan      html  css  js  c++  java
  • Golang---Channel

      摘要:今天我们来学习 Go 语言中 channel , 这是 Go 语言中非常重要的一个特性。

    基础知识

      创建

      在使用 channel 之前需要使用 make 来创建一个 channel, 如果在使用之前没有使用 make, 则会造成死锁(原因在后面死锁部分进行说明)

    ch := make(chan int)     //创建无缓冲的channel
    ch := make(chan int, N)  //创建有缓冲的channel

      读写

    N := <- ch   //读操作
    ch <- N      //写操作

      分类

      无缓冲:发送和接收动作是同时发生的,如果没有 goroutine 读取(<- ch),则发送者(ch <- )会一致阻塞

      有缓冲:channel 类似一个有容量的队列,当队列满的时候发送者会阻塞,当队列空的时候接收者会阻塞

      关闭

       channel 不像文件,通常不需要去关闭它们,只有在接收者必须要知道没有更多的数据的时候,才需要发送端去关闭(只有客户端才应该去关闭),比如在 range 循环遍历中。

    ch := make(chan int)
    close(ch)   //关闭 channel

      关闭需要注意的几点:

    1, 重复关闭 channel 会导致 panic
    2, 向关闭的 channel 发送数据会导致 panic
    3, 从关闭的 channel 读数据不会发生 panic, 读出 channel 中已有的数据之后, 读出的就是 channel 中值类型的默认值

      判断一个 channel 是否关闭可以使用 ok-idiom 的方式,这种方式在 map 中比较常用:

    //ok-idiom 方式
    val, ok := <- ch
    if ok == false {
        fmt.println("closed")
    }else {
        fmt.println("not closed")
    }

    典型用法

      goroutine 通信

    func TestChannel() {
        ch := make(chan int)
        go func() {
            ch <- 1
        }()
        fmt.Println(<- ch)
    }
    goroutine 通信

      select

    select {
        case v, ok := <- ch1:
            if(!ok) {
                fmt.println("ch1 channel closed!")
            }else {
                fmt.println("ch1 do something")
            }
        case v, ok := <- ch2:
            if(!ok) {
                fmt.println("ch2 channel closed!")
            }else {
                fmt.println("ch2 do something")
            }
        default:
                fmt.println("ch1 not ready and ch2 not ready")
    }
    select

      range

      range 可以直接取到 channel 中的值,当我们使用 range 来操作 channel 的时候, 一旦 channel 关闭,channel 内部数据读完之后循环自动结束

    //消费者
    func consumer(ch chan int) {
        for x := range ch {
            fmt.Println(x)
            // do something with x
        }
    }
    //生产者
    func producer(ch chan int) {
        values := make([]int, 5)
        for _, v := range values {
            ch <- v
        }
    }
    range

      超时控制

    func queryDb(ch chan int) {
        time.Sleep(time.Second)
        ch <- 100
    }
    
    func main() {
        ch := make(chan int)
        go queryDb(ch)
        t := time.NewTicker(time.Second)
        select {
        case v := <- ch:
            fmt.Println("res: ", v)
        case <- t.C:
            fmt.Println("timeout")
    }
    超时控制 

    死锁

      死锁情况1

    func deadlock1() {
        ch := make(chan int)
        ch <- 1
    }
    
    func deadlock2() {
        ch := make(chan int)
        <- ch
    }
    deadLockCase1

      死锁分析:无缓冲信道不存储值,无论是传值还是取值都会阻塞,无缓冲信道必须同时传值和取值。

      死锁情况2

    func Deadlock3() {
        ch1 := make(chan string)
        ch2 := make(chan string)
        go func() {
            ch2 <- "ch2 value"  // block point
            ch1 <- "ch1 value"
        }()
    
        <- ch1  //block point
        <- ch2
    }
    deadLockCase2

      死锁分析: 在 main goroutine 中,ch1 等待数据,而在 goroutine 中,ch2 在等待数据,所以造成死锁。

      死锁情况3

    func Deadlock4() {
        chs := make(chan string, 2)
        chs <- "first"
        chs <- "second"
        //close(chs) 需要发送端主动去关闭
    
        for ch := range chs {
            fmt.Println(ch)
        }
    }
    deadLockCase3

      死锁分析: 从空 channel 中读取会导致阻塞,同死锁情况1。

    源码分析

      数据结构

    //from go/src/runtime/chan.go
    type hchan struct {
        qcount   uint           // total data in the queue
        dataqsiz uint           // size of the circular queue
        buf      unsafe.Pointer // points to an array of dataqsiz elements
        elemsize uint16
        closed   uint32
        elemtype *_type // element type
        sendx    uint   // send index
        recvx    uint   // receive index
        recvq    waitq  // list of recv waiters
        sendq    waitq  // list of send waiters
    
        // lock protects all fields in hchan, as well as several
        // fields in sudogs blocked on this channel.
        //
        // Do not change another G's status while holding this lock
        // (in particular, do not ready a G), as this can deadlock
        // with stack shrinking.
        lock mutex
    }
    
    type waitq struct {
        first *sudog
        last  *sudog
    }
    hchan

     从上面的定义我们可以知道:一个核心的部分是存放 channel 数据的环形队列,由 qcount 和 elemsize 分别指定了队列的总容量和当前的使用量; 另一个部分就是 recvq 和 sendq 两个链表(双向),如果一个 goroutine 阻塞于 channel 了,那么它被挂在 recvq 或 sendq 中。

    //from go/src/runtime/chan.go
    func makechan(t *chantype, size int) *hchan {
        elem := t.elem
    
        // compiler checks this but be safe.
        if elem.size >= 1<<16 {
            throw("makechan: invalid channel element type")
        }
        if hchanSize%maxAlign != 0 || elem.align > maxAlign {
            throw("makechan: bad alignment")
        }
    
        mem, overflow := math.MulUintptr(elem.size, uintptr(size)) // 求出 size 所占空间的大小
        if overflow || mem > maxAlloc-hchanSize || size < 0 {
            panic(plainError("makechan: size out of range"))
        }
    
        // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
        // buf points into the same allocation, elemtype is persistent.
        // SudoG's are referenced from their owning thread so they can't be collected.
        // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
        var c *hchan
        switch {
        case mem == 0:
            // Queue or element size is zero.
            c = (*hchan)(mallocgc(hchanSize, nil, true)) //remark1: 只分配 hchan 结构体空间
            // Race detector uses this location for synchronization.
            c.buf = c.raceaddr()
        case elem.ptrdata == 0:
            // Elements do not contain pointers.
            // Allocate hchan and buf in one call.
            c = (*hchan)(mallocgc(hchanSize+mem, nil, true))  //remark2: hchan 结构体和 chan 中size 个元素的空间一并进行分配
            c.buf = add(unsafe.Pointer(c), hchanSize)
        default:
            // Elements contain pointers.
            c = new(hchan)
            c.buf = mallocgc(mem, elem, true)  //remark3:先分配 hchan 结构体,包含指针的元素以特殊的方式分配
        }
    
        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        c.dataqsiz = uint(size)
        lockInit(&c.lock, lockRankHchan)
    
        if debugChan {
            print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "
    ")
        }
        return c
    }
    makechan

      注:上述代码中标记为 remark 的部分解释了 hchan 结构体中的缓冲区内存的分配方式

      recvq: 因读这个通道而阻塞的 goroutine

      sendq: 因写这个通道而阻塞的 goroutine

    //from go/src/runtime/chan.go
    type sudog struct {
        // The following fields are protected by the hchan.lock of the
        // channel this sudog is blocking on. shrinkstack depends on
        // this for sudogs involved in channel ops.
    
        g *g
    
        next *sudog
        prev *sudog
        elem unsafe.Pointer // data element (may point to stack)
    
        // The following fields are never accessed concurrently.
        // For channels, waitlink is only accessed by g.
        // For semaphores, all fields (including the ones above)
        // are only accessed when holding a semaRoot lock.
    
        acquiretime int64
        releasetime int64
        ticket      uint32
    
        // isSelect indicates g is participating in a select, so
        // g.selectDone must be CAS'd to win the wake-up race.
        isSelect bool
    
        parent   *sudog // semaRoot binary tree
        waitlink *sudog // g.waiting list or semaRoot
        waittail *sudog // semaRoot
        c        *hchan // channel
    }
    SudoG

      注:该结构中主要的就是一个 g 和一个 elem. elem 用于存储 goroutine 的数据。读通道时:数据从 Hchan 的队列中拷贝到 SudoG 的 elem 域;写通道时:数据则是从 SudoG 的 elem 域拷贝到 Hchan 的队列中。

      发送 channel

      如果执行 ch <- v 操作,在底层运行时库中对应的是一个 runtime.chansend 函数,源码为:

    //from go/src/runtime/chan.go
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        if c == nil {    //remark1: 如果向一个 nil 的 chan 发送消息
            if !block {  //remark1.1: 如果是非阻塞模式,则直接返回
                return false
            }
            //remark1.2: 如果是阻塞模式,为 unlockf 分配一个 m, 并运行 unlockf, 如果返回 flase, 则唤醒睡眠,此处传入 nil, 则会一致休眠,直到 timeout
            gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    
        if debugChan {
            print("chansend: chan=", c, "
    ")
        }
    
        if raceenabled {
            racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
        }
    
        // Fast path: check for failed non-blocking operation without acquiring the lock.
        //
        // After observing that the channel is not closed, we observe that the channel is
        // not ready for sending. Each of these observations is a single word-sized read
        // (first c.closed and second full()).
        // Because a closed channel cannot transition from 'ready for sending' to
        // 'not ready for sending', even if the channel is closed between the two observations,
        // they imply a moment between the two when the channel was both not yet closed
        // and not ready for sending. We behave as if we observed the channel at that moment,
        // and report that the send cannot proceed.
        //
        // It is okay if the reads are reordered here: if we observe that the channel is not
        // ready for sending and then observe that it is not closed, that implies that the
        // channel wasn't closed during the first observation. However, nothing here
        // guarantees forward progress. We rely on the side effects of lock release in
        // chanrecv() and closechan() to update this thread's view of c.closed and full().
        if !block && c.closed == 0 && full(c) {
            return false
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        lock(&c.lock)
    
        if c.closed != 0 {  //remark2: 如果向一个已经关闭的 channel 发送数据,直接报错
            unlock(&c.lock)
            panic(plainError("send on closed channel"))
        }
    
        if sg := c.recvq.dequeue(); sg != nil {  //remark3: 如果此时有因为读操作阻塞的 goroutine,取出该 goroutine
            // Found a waiting receiver. We pass the value we want to send
            // directly to the receiver, bypassing the channel buffer (if any).
            /*
            *remark4: 将值(ep)直接复制到 接收者(sg) 的 sudog.elem 中,并将 sudog 放入到就绪队列中,状态置 ready, 然后返回
            */
            send(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true
        }
        //remark5: 如果循环队列空间可用,则直接把 send 元素入队
        if c.qcount < c.dataqsiz {
            // Space is available in the channel buffer. Enqueue the element to send.
            qp := chanbuf(c, c.sendx)
            if raceenabled {
                raceacquire(qp)
                racerelease(qp)
            }
            typedmemmove(c.elemtype, qp, ep)
            c.sendx++
            if c.sendx == c.dataqsiz {
                c.sendx = 0
            }
            c.qcount++
            unlock(&c.lock)
            return true
        }
    
        if !block {
            unlock(&c.lock)
            return false
        }
    
        // Block on the channel. Some receiver will complete our operation for us.
        //remark6: 如果没有空间可用,则阻塞
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
        // No stack splits between assigning elem and enqueuing mysg
        // on gp.waiting where copy stack can find it.
        mysg.elem = ep
        mysg.waitlink = nil
        mysg.g = gp
        mysg.isSelect = false
        mysg.c = c
        gp.waiting = mysg
        gp.param = nil
        c.sendq.enqueue(mysg)  //remark7: 此处为什么还能入栈??,并进入阻塞状态
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
        // Ensure the value being sent is kept alive until the
        // receiver copies it out. The sudog has a pointer to the
        // stack object, but sudogs aren't considered as roots of the
        // stack tracer.
        KeepAlive(ep)
    
        // someone woke us up.
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        gp.activeStackChans = false
        if gp.param == nil {
            if c.closed == 0 {
                throw("chansend: spurious wakeup")
            }
            panic(plainError("send on closed channel"))
        }
        gp.param = nil
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        mysg.c = nil
        releaseSudog(mysg)
        return true
    }
    chansend

      从源码分析中(remark 注释部分)可知:写操作一共分三种情况

      (1) 有读 goroutine 阻塞在 recvq 上,此时直接复制要发送的信息到 阻塞 goroutine 的 sudog 的 elem 域

      (2) hchan.buf 还有可用空间,此时直接入队,挂到 sendq 上

      (3) hchan.buf 没有可用空间,此时阻塞当前 goroutine

      接收 channel

      写 channel 分析过程基本和 读 channel 过程类似,这里不再具体展开分析

      总结

      Golang 的 channel 实现集中在文件 runtime/chan.go 中,本身的代码不是很复杂,但是涉及到很多其他的细节,比如 gopark 等,读起来还是有点费劲的。

    参考资料:

    https://tour.golang.org/concurrency/4

    http://legendtkl.com/2017/08/06/golang-channel-implement/

  • 相关阅读:
    WPF MarkupExtension
    WPF Binding小数,文本框不能输入小数点的问题
    WPF UnhandledException阻止程序奔溃
    .Net Core的总结
    C#单元测试
    Csla One or more properties are not registered for this type
    unomp 矿池运行问题随记
    矿池负载运行监测记录
    MySql 数据库移植记录
    后台服务运行后无故停止运行,原因不明
  • 原文地址:https://www.cnblogs.com/zpcoding/p/13169028.html
Copyright © 2011-2022 走看看