zoukankan      html  css  js  c++  java
  • golang的channel实现

    golangchannel实现位于src/runtime/chan.go文件。golang中的channel对应的结构是:

    // Invariants:
    //  At least one of c.sendq and c.recvq is empty,
    //  except for the case of an unbuffered channel with a single goroutine
    //  blocked on it for both sending and receiving using a select statement,
    //  in which case the length of c.sendq and c.recvq is limited only by the
    //  size of the select statement.
    //
    // For buffered channels, also:
    //  c.qcount > 0 implies that c.recvq is empty.
    //  c.qcount < c.dataqsiz implies that c.sendq is empty.
    
    type hchan struct {
        qcount   uint           // total data in the queue
        dataqsiz uint           // size of the circular queue
        buf      unsafe.Pointer // points to an array of dataqsiz elements
        elemsize uint16
        closed   uint32
        elemtype *_type // element type
        sendx    uint   // send index
        recvx    uint   // receive index
        recvq    waitq  // list of recv waiters
        sendq    waitq  // list of send waiters
    
        // lock protects all fields in hchan, as well as several
        // fields in sudogs blocked on this channel.
        //  
        // Do not change another G's status while holding this lock
        // (in particular, do not ready a G), as this can deadlock
        // with stack shrinking.
        lock mutex
    }
    

    注:源码中的raceenabled表示是否启用race detector, 所以,如果你只需要了解channel的机制,可以忽略raceenabled的相关代码。当raceenabledfalse时,相关函数位于runtime/race0.go文件,如果raceenabledtrue,相关函数代码位于runtime/race.go文件。具体race detector的作用可以参考:

    https://blog.golang.org/race-detector

    对于这个结构中的成员,其中qcount表示当前buffer中的元素个数,这些元素可以被recv函数(x <- c) 立刻读取; dataqsiz表示buf可以存储的最大个数,这里的buf使用环形队列,dataqsiz就是make函数创建chan时传入的大小;buf指向环形队列,如果dataqsiz为0,或者元素大小为0,buf会象征性地指向一段地址;elemsize表示单个元素的大小;sendx表示send函数(c <- x)处理的buf位置;recvx标识recv函数处理到的buf位置;recvq表示等待recv的go程相关信息;sendq表示等待send的go程相关信息。 lock为保护结构体成员的锁。

    下面给出waitq结构体的定义,以及其中成员sudog的定义,其中结构体g (type g struct)表示一个go程信息,用于go程的管理:

    type waitq struct { 
        first *sudog 
        last  *sudog 
    }
    
    // sudog represents a g in a wait list, such as for sending/receiving
    // on a channel.
    //  
    // sudogs are allocated from a special pool. Use acquireSudog and
    // releaseSudog to allocate and free them.
    type sudog struct {
        // The following fields are protected by the hchan.lock of the
        // channel this sudog is blocking on. shrinkstack depends on
        // this for sudogs involved in channel ops.
        g *g
    
        // isSelect indicates g is participating in a select, so
        // g.selectDone must be CAS'd to win the wake-up race.
        isSelect bool
        next     *sudog
        prev     *sudog
        elem     unsafe.Pointer // data element (may point to stack)
    
        ...
    
        c           *hchan // channel
    }
    

    在讲解函数之前,给出一些常量的说明:

    1. maxAlign 用于对齐,在内存中,结构体按照一定字节对齐,访问速度会更快,常见的结构体对齐要求是8字节对齐。
    2. hchanSize 表示,如果hchan8字节对齐的话,那么chan所占的大小,源码中的:
    const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    

    等价于:

    const hchanSize = unsafe.Sizeof(hchan{})+(maxAlign-1) - ((unsafe.Sizeof(hchan{})+(maxAlgn-1))%maxAlign
    

    简单举例如下,如果unsafe.Sizeof(hchan{})为5,则hchanSize为8, 如果unsafe.Sizeof(hchan{})为8,则hchanSize为8,如果unsafe.Sizeof(hchan{})为10, 则unsafe.Sizeof(hchan{})为16,简单说,就是补足到8的倍数。

    下面给出构建chan的函数:

    func makechan(t *chantype, size int) *hchan {
      elem := t.elem
    
      // check size and type information 
      ...
    
      switch {
      // 特殊情况,例如元素个数为0,元素大小为0等
      ...
      default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
      }
    
      c.elemsize = uint16(elem.size)
      c.elemtype = elem
      c.dataqsiz = uint(size)
    
      return c
    }
    

     这个函数比较简单,就是构建hchan中的数据成员,在源码中,对于元素个数为0,元素大小为0,以及元素不为指针进行了特殊处理,如果想要理解chan的实现,这些细节可以暂时不关注。

    下面先给出在send和recv中使用的等待和唤醒函数:

    // Puts the current goroutine into a waiting state and calls unlockf.
    // If unlockf returns false, the goroutine is resumed.
    // unlockf must not access this G’s stack, as it may be moved between
    // the call to gopark and the call to unlockf.
    func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
        ...
    }
    
    // Puts the current goroutine into a waiting state and unlocks the lock.
    // The goroutine can be made runnable again by calling goready(gp).
    func goparkunlock(lock *mutex, reason string, traceEv byte, traceskip int) {
        gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceSkip)
    }
    

     下面给出chansend函数的实现 (c <- x):

    // block 用来表示是否阻塞, ep是要处理的元素(elemenet pointer)
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      if c == nil {
        if !block {
          return false
        }
        gopark(nil, nil, “chan send (nil chan)”, traceEvGoStop, 2)
        throw(“unreachable”)
      }
      ...
      // Fast path: check for failed non-blocking operation without acquiring the lock.
      if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
    		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
      }
      ...
      lock(&c.lock)
    
      if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError(“send on closed channel”)
      }
    
      if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
      }
    
      if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        ...
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
          c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
      }
    
      if !block {
        unlock(&c.lock)
        return false
      }
    
      // Block on the channel. Some receiver will complete our operation for us.
      mysg := acquireSudog()
      ...
      c.sendq.enqueue(mysg)
      goparkunlock(&c.lock, “chan send”, traceEvGoBlockSend, 3)
    
      // someone woke us up
      ...
      releaseSudog(mysg)
      return true
    }
    

     由上面的代码可以看出,对于send函数,调用顺序如下:

    (1)如果channel为nil,在block为false的情况下,则返回false,否则调用gopark函数。

    (2)如果当前队列没有空间,而且没有等待recv的go程,在block为false的情况下,返回false,否则进入下面的流程。

    (3)如果有等待recv的go程(说明buf中没有可以recv的值),那么从等待recv的列表中获取第一个go程信息,然后将ep直接发送给那个go程。这里的队列为先入先出队列。

    (4)如果buf中有recv的值(说明没有recv的go程在等待),那么将ep写入到buf中,这里也是先写入,先读取。

    (5)在block为false的情况下,返回false,否则进入等待,等待recv go程的出现。

    上述说明,没有说明如果channel为关闭的情况,如果channel关闭,正常触发panic的异常。

    下面给出send函数的实现,根据send函数,可以简单了解go程是如何被唤醒的:

    // 在上面的函数调用中 unlockf: func() { unlock(&c.lock) }
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
      ...
      if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
      }
      gp := sg.g
      unlockf()
      gp.param = unsafe.Pointer(sg)
      ...
      goready(gp, skip+1)
    }
    

     其中上面的goready用来唤醒go程,其中gp的含义大致为go pointer,表示一个go程信息的指针。

    下面给出recv函数的定义:

    // entry points for <- c from compiled code
    func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
    }
    
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
    }
    
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
        ...
        if c == nil {
            if !block {
                return
            }
            gopark(nil, nil, “chan receive (nil chan)”, traceEvGoStop, 2)
            throw(“unreachable”)
        }
    
        // Fast path: check for failed non-blocking operation without acquiring the lock.
        if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
            c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
            atomic.Load(&c.closed) == 0 {
            return
        }
        ...
        lock(&c.lock)
    
        if c.closed != 0 && c.qcount == 0 {
            ...
            unlock(&c.lock)
            if ep != nil {
              // memory clear
          typedmemclr(c.elemtype, ep)
        }
        return true, false
      }
    
      if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is zero, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender’s value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
      }
    
      if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
          ...
        if ep != nil {
          typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recv++
        if c.recvx == c.dataqsiz {
          c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
      }
    
      if !block {
        unlock(&c.lock)
        return false, false
      }
    
      // no sender available: block on this channel.
      mysg := acquireSudog()
      ...
      c.recvq.enqueue(mysg)
      goparkunlock(&c.lock, “chan receive”, traceEvGoBlockRecv, 3)
    
      // someone woke us up
      ...
      releaseSudog(mysg)
      return true, !closed
    }                                        
    

     由上面的代码可以看出,对于recv函数,调用顺序如下:

    (1)如果channel为nil,在block为false的情况下,返回(false, false),否则调用gopark函数。

    (2)如果buf为空,并且没有等待send的go程,channel没有关闭,在block为false的情况下,返回(false, false), 否则进入下面的流程。

    (3)如果当前channel已经关闭,并且buf中没有值,则返回(true, false)。

    (4)如果当前有等待send的go程,分为如下两种情况:

      1)buf为空,则直接将send go程发送的信息,发送给调用recv函数的go程

      2)将buf中最先写入的值发送给recv函数的go程,然后将send队列表头的go程中的值写入到buf中

    (5)如果当前没有等待send的go程,而且buf中有值,则将第一个写入的值发送给recv函数的go程。

    (6)如果当前buf没有值,在block为false的情况下,返回(false, false),否则等待send go程的出现。

    从上面的chansend和chanrecv函数的实现来看,可以得出下面的结论:

    (1)先调用chansend的go程会第一个被唤醒。

    (2)先调用chanrecv的go程会被第一个被唤醒。

    (3)先调动chansend的go程写入管道(channel)中的值,会最先从buf中取出。

    下面给出close函数的实现:

    func closechan(c *hchan) {
      ...
      lock(&c.lock)
      if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError(“close of closed channel”))
      }
    
      c.closed = 1
      var glist *g
    
      // release all readers
      for {
        sg := c.recvq.dequeue()
        ...
        gp.schedlink.set(glist)
        glist = gp
      }
    
      // release all writers (they will panic)
      for {
        sg := c.sendq.dequeue()
        ...
        gp.schedlink.set(glist)
        glist = gp
      }
      unlock(&c.lock)
    
      // Ready all Gs now that we’ve dropped the channel lock
      for glist != nil {
        gp := glist
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        goready(gp, 3)
      }
    }

     从closechan的实现来看,closechan会唤醒所有等待向这个channel send和向这个channel recv的go程,但是不会清空buf中的内容。

    下面给出一个compiler建立的代码与函数的映射:

    // compiler implements
    //
    //  select {
    //  case c <- v:
    //      ... foo
    //  default:
    //      ... bar
    //  }
    //
    // as
    //
    //  if selectnbsend(c, v) {
    //      ... foo
    //  } else {
    //      ... bar
    //  }
    //
    func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
        return chansend(c, elem, false, getcallerpc())
    }
    
    // compiler implements
    //
    //  select {
    //  case v = <-c:
    //      ... foo
    //  default:
    //      ... bar
    //  }
    //
    // as
    //
    //  if selectnbrecv(&v, c) {
    //      ... foo
    //  } else {
    //      ... bar
    //  }
    //
    func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
        selected, _ = chanrecv(c, elem, false)
        return
    }
    
    // compiler implements
    //
    //  select {
    //  case v, ok = <-c:
    //      ... foo
    //  default:
    //      ... bar
    //  }
    //
    // as
    //
    //  if c != nil && selectnbrecv2(&v, &ok, c) {
    //      ... foo
    //  } else {
    //      ... bar
    //  }
    //
    func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
        // TODO(khr): just return 2 values from this function, now that it is in Go.
        selected, *received = chanrecv(c, elem, false)
        return
    }
    

     下面给出操作recvq和sendq队列的函数实现,实际上就是使用链表实现的先入先出队列的实现:

    func (q *waitq) enqueue(sgp *sudog) {
        sgp.next = nil
        x := q.last
        if x == nil {
            sgp.prev = nil
            q.first = sgp
            q.last = sgp
            return
        }
        sgp.prev = x
        x.next = sgp
        q.last = sgp
    }
    
    func (q *waitq) dequeue() *sudog {
        for {
            sgp := q.first
            if sgp == nil {
                return nil
            }
            y := sgp.next
            if y == nil {
                q.first = nil
                q.last = nil
            } else {
                y.prev = nil
                q.first = y
                sgp.next = nil // mark as removed (see dequeueSudog)
            }
    
            ...
    
            return sgp
        }
    }
    

     关于channel实现的简单讲解就到这里了。如果有什么建议或者提议,欢迎提出。

  • 相关阅读:
    Qt Model/View 学习笔记 (三)
    Qt Model/View 学习笔记 (二)
    Qt Model/View 学习笔记 (四)
    Qt Model/View 学习笔记 (一)
    C++ GUI Programming with Qt 4 10.3 实现自定义模型
    flash的TLF字体框架,与部分XPSP3 IE8不兼容。
    使用CKEditor需要注意的事情
    用jquery选中所有勾选的checkBox
    html / js / flash的视频、音频播放器
    C# / .net服务端程序,无法通过防火墙的真正坑爹原因
  • 原文地址:https://www.cnblogs.com/albizzia/p/10867724.html
Copyright © 2011-2022 走看看