golang的channel实现位于src/runtime/chan.go文件。golang中的channel对应的结构是:
// Invariants: // At least one of c.sendq and c.recvq is empty, // except for the case of an unbuffered channel with a single goroutine // blocked on it for both sending and receiving using a select statement, // in which case the length of c.sendq and c.recvq is limited only by the // size of the select statement. // // For buffered channels, also: // c.qcount > 0 implies that c.recvq is empty. // c.qcount < c.dataqsiz implies that c.sendq is empty. type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
注:源码中的raceenabled表示是否启用race detector, 所以,如果你只需要了解channel的机制,可以忽略raceenabled的相关代码。当raceenabled为false时,相关函数位于runtime/race0.go文件,如果raceenabled为true,相关函数代码位于runtime/race.go文件。具体race detector的作用可以参考:
https://blog.golang.org/race-detector
对于这个结构中的成员,其中qcount表示当前buffer中的元素个数,这些元素可以被recv函数(x <- c) 立刻读取; dataqsiz表示buf可以存储的最大个数,这里的buf使用环形队列,dataqsiz就是make函数创建chan时传入的大小;buf指向环形队列,如果dataqsiz为0,或者元素大小为0,buf会象征性地指向一段地址;elemsize表示单个元素的大小;sendx表示send函数(c <- x)处理的buf位置;recvx标识recv函数处理到的buf位置;recvq表示等待recv的go程相关信息;sendq表示等待send的go程相关信息。 lock为保护结构体成员的锁。
下面给出waitq结构体的定义,以及其中成员sudog的定义,其中结构体g (type g struct)表示一个go程信息,用于go程的管理:
type waitq struct { first *sudog last *sudog } // sudog represents a g in a wait list, such as for sending/receiving // on a channel. // // sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g // isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) ... c *hchan // channel }
在讲解函数之前,给出一些常量的说明:
- maxAlign 用于对齐,在内存中,结构体按照一定字节对齐,访问速度会更快,常见的结构体对齐要求是8字节对齐。
- hchanSize 表示,如果hchan以8字节对齐的话,那么chan所占的大小,源码中的:
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
等价于:
const hchanSize = unsafe.Sizeof(hchan{})+(maxAlign-1) - ((unsafe.Sizeof(hchan{})+(maxAlgn-1))%maxAlign
简单举例如下,如果unsafe.Sizeof(hchan{})为5,则hchanSize为8, 如果unsafe.Sizeof(hchan{})为8,则hchanSize为8,如果unsafe.Sizeof(hchan{})为10, 则unsafe.Sizeof(hchan{})为16,简单说,就是补足到8的倍数。
下面给出构建chan的函数:
func makechan(t *chantype, size int) *hchan { elem := t.elem // check size and type information ... switch { // 特殊情况,例如元素个数为0,元素大小为0等 ... default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(uintptr(size)*elem.size, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) return c }
这个函数比较简单,就是构建hchan中的数据成员,在源码中,对于元素个数为0,元素大小为0,以及元素不为指针进行了特殊处理,如果想要理解chan的实现,这些细节可以暂时不关注。
下面先给出在send和recv中使用的等待和唤醒函数:
// Puts the current goroutine into a waiting state and calls unlockf. // If unlockf returns false, the goroutine is resumed. // unlockf must not access this G’s stack, as it may be moved between // the call to gopark and the call to unlockf. func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) { ... } // Puts the current goroutine into a waiting state and unlocks the lock. // The goroutine can be made runnable again by calling goready(gp). func goparkunlock(lock *mutex, reason string, traceEv byte, traceskip int) { gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceSkip) }
下面给出chansend函数的实现 (c <- x):
// block 用来表示是否阻塞, ep是要处理的元素(elemenet pointer) func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, “chan send (nil chan)”, traceEvGoStop, 2) throw(“unreachable”) } ... // Fast path: check for failed non-blocking operation without acquiring the lock. if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } ... lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError(“send on closed channel”) } if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) ... typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. mysg := acquireSudog() ... c.sendq.enqueue(mysg) goparkunlock(&c.lock, “chan send”, traceEvGoBlockSend, 3) // someone woke us up ... releaseSudog(mysg) return true }
由上面的代码可以看出,对于send函数,调用顺序如下:
(1)如果channel为nil,在block为false的情况下,则返回false,否则调用gopark函数。
(2)如果当前队列没有空间,而且没有等待recv的go程,在block为false的情况下,返回false,否则进入下面的流程。
(3)如果有等待recv的go程(说明buf中没有可以recv的值),那么从等待recv的列表中获取第一个go程信息,然后将ep直接发送给那个go程。这里的队列为先入先出队列。
(4)如果buf中有recv的值(说明没有recv的go程在等待),那么将ep写入到buf中,这里也是先写入,先读取。
(5)在block为false的情况下,返回false,否则进入等待,等待recv go程的出现。
上述说明,没有说明如果channel为关闭的情况,如果channel关闭,正常触发panic的异常。
下面给出send函数的实现,根据send函数,可以简单了解go程是如何被唤醒的:
// 在上面的函数调用中 unlockf: func() { unlock(&c.lock) } func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) ... goready(gp, skip+1) }
其中上面的goready用来唤醒go程,其中gp的含义大致为go pointer,表示一个go程信息的指针。
下面给出recv函数的定义:
// entry points for <- c from compiled code func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return } func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if c == nil { if !block { return } gopark(nil, nil, “chan receive (nil chan)”, traceEvGoStop, 2) throw(“unreachable”) } // Fast path: check for failed non-blocking operation without acquiring the lock. if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } ... lock(&c.lock) if c.closed != 0 && c.qcount == 0 { ... unlock(&c.lock) if ep != nil { // memory clear typedmemclr(c.elemtype, ep) } return true, false } if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is zero, receive value // directly from sender. Otherwise, receive from head of queue // and add sender’s value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) ... if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recv++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. mysg := acquireSudog() ... c.recvq.enqueue(mysg) goparkunlock(&c.lock, “chan receive”, traceEvGoBlockRecv, 3) // someone woke us up ... releaseSudog(mysg) return true, !closed }
由上面的代码可以看出,对于recv函数,调用顺序如下:
(1)如果channel为nil,在block为false的情况下,返回(false, false),否则调用gopark函数。
(2)如果buf为空,并且没有等待send的go程,channel没有关闭,在block为false的情况下,返回(false, false), 否则进入下面的流程。
(3)如果当前channel已经关闭,并且buf中没有值,则返回(true, false)。
(4)如果当前有等待send的go程,分为如下两种情况:
1)buf为空,则直接将send go程发送的信息,发送给调用recv函数的go程
2)将buf中最先写入的值发送给recv函数的go程,然后将send队列表头的go程中的值写入到buf中
(5)如果当前没有等待send的go程,而且buf中有值,则将第一个写入的值发送给recv函数的go程。
(6)如果当前buf没有值,在block为false的情况下,返回(false, false),否则等待send go程的出现。
从上面的chansend和chanrecv函数的实现来看,可以得出下面的结论:
(1)先调用chansend的go程会第一个被唤醒。
(2)先调用chanrecv的go程会被第一个被唤醒。
(3)先调动chansend的go程写入管道(channel)中的值,会最先从buf中取出。
下面给出close函数的实现:
func closechan(c *hchan) { ... lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError(“close of closed channel”)) } c.closed = 1 var glist *g // release all readers for { sg := c.recvq.dequeue() ... gp.schedlink.set(glist) glist = gp } // release all writers (they will panic) for { sg := c.sendq.dequeue() ... gp.schedlink.set(glist) glist = gp } unlock(&c.lock) // Ready all Gs now that we’ve dropped the channel lock for glist != nil { gp := glist glist = glist.schedlink.ptr() gp.schedlink = 0 goready(gp, 3) }
}
从closechan的实现来看,closechan会唤醒所有等待向这个channel send和向这个channel recv的go程,但是不会清空buf中的内容。
下面给出一个compiler建立的代码与函数的映射:
// compiler implements // // select { // case c <- v: // ... foo // default: // ... bar // } // // as // // if selectnbsend(c, v) { // ... foo // } else { // ... bar // } // func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) } // compiler implements // // select { // case v = <-c: // ... foo // default: // ... bar // } // // as // // if selectnbrecv(&v, c) { // ... foo // } else { // ... bar // } // func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return } // compiler implements // // select { // case v, ok = <-c: // ... foo // default: // ... bar // } // // as // // if c != nil && selectnbrecv2(&v, &ok, c) { // ... foo // } else { // ... bar // } // func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { // TODO(khr): just return 2 values from this function, now that it is in Go. selected, *received = chanrecv(c, elem, false) return }
下面给出操作recvq和sendq队列的函数实现,实际上就是使用链表实现的先入先出队列的实现:
func (q *waitq) enqueue(sgp *sudog) { sgp.next = nil x := q.last if x == nil { sgp.prev = nil q.first = sgp q.last = sgp return } sgp.prev = x x.next = sgp q.last = sgp } func (q *waitq) dequeue() *sudog { for { sgp := q.first if sgp == nil { return nil } y := sgp.next if y == nil { q.first = nil q.last = nil } else { y.prev = nil q.first = y sgp.next = nil // mark as removed (see dequeueSudog) } ... return sgp } }
关于channel实现的简单讲解就到这里了。如果有什么建议或者提议,欢迎提出。