Channel作为Go CSP的重要组成部分
在传统的编程语言中,并发编程模型是基于线程和内存同步访问控制。
而CSP是一种新的并发编程模型,CSP的并发哲学:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而要通过通信来实现内存共享。
Go 是第一个将 CSP 的这些思想引入,并且发扬光大的语言。
Go 的并发编程(CSP)的模型则用 goroutine 和 channel 来替代。
channel 提供了一种通信机制,通过它,一个 goroutine 可以向另一 goroutine 发送消息,channel内部有mutex 用于内存同步访问控制。
chan数据结构
src/runtime/chan.go:hchan定义了channel的数据结构:
type hchan struct { qcount uint // 当前队列中剩余元素个数 dataqsiz uint // 环形队列长度,即可以存放的元素个数 buf unsafe.Pointer // 环形队列指针 elemsize uint16 // 每个元素的大小 closed uint32 // 标识关闭状态 elemtype *_type // 元素类型 sendx uint // 队列下标,指示元素写入时存放到队列中的位置 recvx uint // 队列下标,指示元素从队列的该位置读出 recvq waitq // 等待读消息的goroutine队列 sendq waitq // 等待写消息的goroutine队列 lock mutex // 互斥锁,chan不允许并发读写 }
属性解析
从数据结构可以看出channel由队列、类型信息、goroutine等待队列组成,下面分别说明其原理。
buf 指向底层环形队列,只有缓冲型的 channel 才有。
sendx,recvx 均指向底层环形队列,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。
sendq,recvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试向channel发送数据 或从 channel 读取数据而被阻塞。
waitq 是 sudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:
type waitq struct { first *sudog last *sudog }
type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g selectdone *uint32 // CAS to 1 to win select race (may point to stack) next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
一个channel同时仅允许被一个goroutine读写,lock 用来保证每个读 channel 或写 channel 的操作都是原子的。
一个channel只能传递一种类型的值,类型信息存储在hchan数据结构中。
elemtype代表类型,用于数据传递过程中的赋值;
elemsize代表类型大小,用于在buf中定位元素位置。
环形队列
chan内部实现了一个环形队列作为其缓冲区,队列的长度是创建chan时指定的。
下图展示了一个可缓存6个整型类型元素的channel示意图:
dataqsiz指示了队列长度为6,即可缓存6个元素;
buf指向队列的内存,队列中还剩余两个元素;
qcount表示队列中还有两个元素;
sendx指示后续写入的数据存储的位置,取值[0, 6);
recvx指示从该位置读取数据, 取值[0, 6);
创建channel
使用make创建channel
// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)
创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。
源码
func makechan(t *chantype, size int64) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) { panic(plainError("makechan: size out of range")) } var c *hchan if elem.kind&kindNoPointers != 0 || size == 0 { // case 1: channel 不含有指针 // case 2: size == 0,即无缓冲 channel // Allocate memory in one call. // Hchan does not contain pointers interesting for GC in this case: // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. // 在堆上分配连续的空间用作 channel c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true)) if size > 0 && elem.size != 0 { c.buf = add(unsafe.Pointer(c), hchanSize) } else { // race detector uses this location for synchronization // Also prevents us from pointing beyond the allocation (see issue 9401). c.buf = unsafe.Pointer(c) } } else { // 有缓冲 channel 初始化 c = new(hchan) // 堆上分配 buf 内存 c.buf = newarray(elem, int(size)) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, " ") } return c }
channel特性
- 关闭一个未初始化(nil) 的 channel 会产生 panic;
- 重复关闭同一个 channel 会产生 panic;
- 向一个已关闭的 channel 中发送消息会产生 panic;
- 从已关闭的 channel 读取消息不会产生 panic,且能读出 channel 中还未被读取的消息,若消息均已读出,则会读到类型的零值。从一个已关闭的 channel 中读取消息永远不会阻塞,并且会返回一个为 false 的 ok-idiom,可以用它来判断 channel 是否关闭;
- 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息。
- 从无缓存的 channel 中读取消息会阻塞,直到有 goroutine 向该 channel 中发送消息;
- 向无缓存的 channel 中发送消息也会阻塞,直到有 goroutine 从 channel 中读取消息。
- 有缓存的 channel 当缓存未满时,向 channel 中发送消息时不会阻塞,当缓存满时,发送操作将被阻塞,直到有其他 goroutine 从中读取消息;
- 有缓存的 channel 当消息不为空时,读取channel中消息不会出现阻塞,当 channel 为空时,读取操作会造成阻塞,直到有 goroutine 向 channel 中写入消息。
向channel写数据
向一个channel中写数据简单过程如下:
- 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
- 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
- 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;
源码
// entry point for c <- x from compiled code //go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c))) } /* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed. it is easiest to loop and re-run * the operation; we'll see that it's now closed. */ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { //当 channel 未初始化或为 nil 时,向其中发送数据将会永久阻塞 if c == nil { if !block { return false } // gopark 会使当前 goroutine 休眠,并通过 unlockf 唤醒,但是此时传入的 unlockf 为 nil, 因此,goroutine 会一直休眠 gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, " ") } if raceenabled { racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend)) } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasn't closed during the first observation. if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 获取同步锁 lock(&c.lock) //向已经关闭的 channel 发送消息会产生 panic if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // CASE1: 当有 goroutine 在 recv 队列上等待时,跳过缓存队列,将消息直接发给 reciever goroutine if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // CASE2: 缓存队列未满,则将消息复制到缓存队列上 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // CASE3: 缓存队列已满,将goroutine 加入 send 队列 // 初始化 sudog // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.selectdone = nil mysg.c = c gp.waiting = mysg gp.param = nil // 加入sendq队列 c.sendq.enqueue(mysg) // 休眠 goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) // 唤醒 goroutine // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true }
从channel读数据
向一个channel中写数据简单过程如下:
- 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
- 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
- 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;
简单流程图如下:
源码
接收操作有两种写法,一种带 "ok",反应 channel 是否关闭;一种不带 "ok",这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。两种写法,都有各自的应用场景。
// entry points for <- c from compiled code func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
chanrecv1
函数处理不带 "ok" 的情形,chanrecv2
则通过返回 "received" 这个字段来反应 channel 是否被关闭。接收值则比较特殊,会“放到”参数 elem
所指向的地址了,这很像 C/C++ 里的写法。如果代码里忽略了接收值,这里的 elem 为 nil。
无论如何,最终转向了 chanrecv
函数:
// 位于 src/runtime/chan.go // chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。 // 如果 ep 是 nil,说明忽略了接收值。 // 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false) // 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false) // 否则,用返回值填充 ep 指向的内存地址。返回 (true, true) // 如果 ep 非空,则应该指向堆或者函数调用者的栈 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 省略 debug 内容 ………… // 如果是一个 nil 的 channel if c == nil { // 如果不阻塞,直接返回 (false, false) if !block { return } // 否则,接收一个 nil 的 channel,goroutine 挂起 gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2) // 不会执行到这里 throw("unreachable") } // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回 // 当我们观察到 channel 没准备好接收: // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待 // 2. 缓冲型,但 buf 里没有元素 // 之后,又观察到 closed == 0,即 channel 未关闭。 // 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的, // 因此在这种情况下可以直接宣布接收失败,返回 (false, false) if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 加锁 lock(&c.lock) // channel 已关闭,并且循环数组 buf 里没有元素 // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况 // 也就是说即使是关闭状态,但在缓冲型的 channel, // buf 里有元素的情况下还能接收到元素 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(unsafe.Pointer(c)) } // 解锁 unlock(&c.lock) if ep != nil { // 从一个已关闭的 channel 执行接收操作,且未忽略返回值 // 那么接收的值将是一个该类型的零值 // typedmemclr 根据类型清理相应地址的内存 typedmemclr(c.elemtype, ep) } // 从一个已关闭的 channel 接收,selected 会返回true return true, false } // 等待发送队列里有 goroutine 存在,说明 buf 是满的 // 这有可能是: // 1. 非缓冲型的 channel // 2. 缓冲型的 channel,但 buf 满了 // 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine) // 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 缓冲型,buf 里有元素,可以正常接收 if c.qcount > 0 { // 直接从循环数组里找到要接收的元素 qp := chanbuf(c, c.recvx) // ………… // 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 清理掉循环数组里相应位置的值 typedmemclr(c.elemtype, qp) // 接收游标向前移动 c.recvx++ // 接收游标归零 if c.recvx == c.dataqsiz { c.recvx = 0 } // buf 数组里的元素个数减 1 c.qcount-- // 解锁 unlock(&c.lock) return true, true } if !block { // 非阻塞接收,解锁。selected 返回 false,因为没有接收到值 unlock(&c.lock) return false, false } // 接下来就是要被阻塞的情况了 // 构造一个 sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 待接收数据的地址保存下来 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.selectdone = nil mysg.c = c gp.param = nil // 进入channel 的等待接收队列 c.recvq.enqueue(mysg) // 将当前 goroutine 挂起 goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) // 被唤醒了,接着从这里继续执行一些扫尾工作 if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
关闭channel
func closechan(c *hchan) { // 关闭一个 nil channel,panic if c == nil { panic(plainError("close of nil channel")) } // 上锁 lock(&c.lock) // 如果 channel 已经关闭 if c.closed != 0 { unlock(&c.lock) // panic panic(plainError("close of closed channel")) } // ………… // 修改关闭状态 c.closed = 1 var glist *g // 将 channel 所有等待接收队列的里 sudog 释放 for { // 从接收队列里出队一个 sudog sg := c.recvq.dequeue() // 出队完毕,跳出循环 if sg == nil { break } // 如果 elem 不为空,说明此 receiver 未忽略接收数据 // 给它赋一个相应类型的零值 if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } // 取出 goroutine gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, unsafe.Pointer(c)) } // 相连,形成链表 gp.schedlink.set(glist) glist = gp } // 将 channel 等待发送队列里的 sudog 释放 // 如果存在,这些 goroutine 将会 panic for { // 从发送队列里出队一个 sudog sg := c.sendq.dequeue() if sg == nil { break } // 发送者会 panic sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, unsafe.Pointer(c)) } // 形成链表 gp.schedlink.set(glist) glist = gp } // 解锁 unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // 遍历链表 for glist != nil { // 取最后一个 gp := glist // 向前走一步,下一个唤醒的 g glist = glist.schedlink.ptr() gp.schedlink = 0 // 唤醒相应 goroutine goready(gp, 3) } }
refer: