zoukankan      html  css  js  c++  java
  • 【Go源码】channel实现

    Channel作为Go CSP的重要组成部分

    在传统的编程语言中,并发编程模型是基于线程和内存同步访问控制。

    而CSP是一种新的并发编程模型,CSP的并发哲学:

    Do not communicate by sharing memory; instead, share memory by communicating.

    不要通过共享内存来通信,而要通过通信来实现内存共享。

    Go 是第一个将 CSP 的这些思想引入,并且发扬光大的语言。
    Go 的并发编程(CSP)的模型则用 goroutine 和 channel 来替代。

    channel 提供了一种通信机制,通过它,一个 goroutine 可以向另一 goroutine 发送消息,channel内部有mutex 用于内存同步访问控制。

    chan数据结构

    src/runtime/chan.go:hchan定义了channel的数据结构:

    type hchan struct {
    	qcount   uint           // 当前队列中剩余元素个数
    	dataqsiz uint           // 环形队列长度,即可以存放的元素个数
    	buf      unsafe.Pointer // 环形队列指针
    	elemsize uint16         // 每个元素的大小
    	closed   uint32	        // 标识关闭状态
    	elemtype *_type         // 元素类型
    	sendx    uint           // 队列下标,指示元素写入时存放到队列中的位置
    	recvx    uint           // 队列下标,指示元素从队列的该位置读出
    	recvq    waitq          // 等待读消息的goroutine队列
    	sendq    waitq          // 等待写消息的goroutine队列
    	lock mutex              // 互斥锁,chan不允许并发读写
    }

    属性解析

    从数据结构可以看出channel由队列、类型信息、goroutine等待队列组成,下面分别说明其原理。

    buf 指向底层环形队列,只有缓冲型的 channel 才有。

    sendx,recvx 均指向底层环形队列,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。

    sendq,recvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试向channel发送数据 或从 channel 读取数据而被阻塞。

    waitq 是 sudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:

    type waitq struct {
    first *sudog
    last *sudog
    }
    type sudog struct {
        // The following fields are protected by the hchan.lock of the
        // channel this sudog is blocking on. shrinkstack depends on
        // this for sudogs involved in channel ops.
    
        g          *g
        selectdone *uint32 // CAS to 1 to win select race (may point to stack)
        next       *sudog
        prev       *sudog
        elem       unsafe.Pointer // data element (may point to stack)
    
        // The following fields are never accessed concurrently.
        // For channels, waitlink is only accessed by g.
        // For semaphores, all fields (including the ones above)
        // are only accessed when holding a semaRoot lock.
    
        acquiretime int64
        releasetime int64
        ticket      uint32
        parent      *sudog // semaRoot binary tree
        waitlink    *sudog // g.waiting list or semaRoot
        waittail    *sudog // semaRoot
        c           *hchan // channel
    }

    一个channel同时仅允许被一个goroutine读写,lock 用来保证每个读 channel 或写 channel 的操作都是原子的。

    一个channel只能传递一种类型的值,类型信息存储在hchan数据结构中。

    elemtype代表类型,用于数据传递过程中的赋值;
    elemsize代表类型大小,用于在buf中定位元素位置。

    环形队列

    chan内部实现了一个环形队列作为其缓冲区,队列的长度是创建chan时指定的。

    下图展示了一个可缓存6个整型类型元素的channel示意图:

     

    dataqsiz指示了队列长度为6,即可缓存6个元素;
    buf指向队列的内存,队列中还剩余两个元素;
    qcount表示队列中还有两个元素;
    sendx指示后续写入的数据存储的位置,取值[0, 6);
    recvx指示从该位置读取数据, 取值[0, 6);

    创建channel

    使用make创建channel
    // 无缓冲通道
    ch1 := make(chan int)
    // 有缓冲通道
    ch2 := make(chan int, 10)
    创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。

    源码

    func makechan(t *chantype, size int64) *hchan {
        elem := t.elem
    
        // compiler checks this but be safe.
        if elem.size >= 1<<16 {
            throw("makechan: invalid channel element type")
        }
        if hchanSize%maxAlign != 0 || elem.align > maxAlign {
            throw("makechan: bad alignment")
        }
        if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
            panic(plainError("makechan: size out of range"))
        }
    
        var c *hchan
        
        if elem.kind&kindNoPointers != 0 || size == 0 {
            // case 1: channel 不含有指针
            // case 2: size == 0,即无缓冲 channel
            // Allocate memory in one call.
            // Hchan does not contain pointers interesting for GC in this case:
            // buf points into the same allocation, elemtype is persistent.
            // SudoG's are referenced from their owning thread so they can't be collected.
            // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
            
            // 在堆上分配连续的空间用作 channel
            c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
            if size > 0 && elem.size != 0 {
                c.buf = add(unsafe.Pointer(c), hchanSize)
            } else {
                // race detector uses this location for synchronization
                // Also prevents us from pointing beyond the allocation (see issue 9401).
                c.buf = unsafe.Pointer(c)
            }
        } else {
            // 有缓冲 channel 初始化
            c = new(hchan)
            // 堆上分配 buf 内存
            c.buf = newarray(elem, int(size))
        }
        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        c.dataqsiz = uint(size)
    
        if debugChan {
            print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "
    ")
        }
        return c
    }
    

      

    channel特性

    • 关闭一个未初始化(nil) 的 channel 会产生 panic;
    • 重复关闭同一个 channel 会产生 panic;
    • 向一个已关闭的 channel 中发送消息会产生 panic;
    • 从已关闭的 channel 读取消息不会产生 panic,且能读出 channel 中还未被读取的消息,若消息均已读出,则会读到类型的零值。从一个已关闭的 channel 中读取消息永远不会阻塞,并且会返回一个为 false 的 ok-idiom,可以用它来判断 channel 是否关闭;
    • 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息。
    • 从无缓存的 channel 中读取消息会阻塞,直到有 goroutine 向该 channel 中发送消息;
    • 向无缓存的 channel 中发送消息也会阻塞,直到有 goroutine 从 channel 中读取消息。
    • 有缓存的 channel 当缓存未满时,向 channel 中发送消息时不会阻塞,当缓存满时,发送操作将被阻塞,直到有其他 goroutine 从中读取消息;
    • 有缓存的 channel 当消息不为空时,读取channel中消息不会出现阻塞,当 channel 为空时,读取操作会造成阻塞,直到有 goroutine 向 channel 中写入消息。

    向channel写数据

    向一个channel中写数据简单过程如下:

    1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
    2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
    3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;

    源码

    // entry point for c <- x from compiled code
    //go:nosplit
    func chansend1(c *hchan, elem unsafe.Pointer) {
        chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
    }
    
    /*
     * generic single channel send/recv
     * If block is not nil,
     * then the protocol will not
     * sleep but return if it could
     * not complete.
     *
     * sleep can wake up with g.param == nil
     * when a channel involved in the sleep has
     * been closed.  it is easiest to loop and re-run
     * the operation; we'll see that it's now closed.
     */
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    
        //当 channel 未初始化或为 nil 时,向其中发送数据将会永久阻塞
        if c == nil {
            if !block {
                return false
            }
            
            // gopark 会使当前 goroutine 休眠,并通过 unlockf 唤醒,但是此时传入的 unlockf 为 nil, 因此,goroutine 会一直休眠
            gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
            throw("unreachable")
        }
    
        if debugChan {
            print("chansend: chan=", c, "
    ")
        }
    
        if raceenabled {
            racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
        }
    
        // Fast path: check for failed non-blocking operation without acquiring the lock.
        //
        // After observing that the channel is not closed, we observe that the channel is
        // not ready for sending. Each of these observations is a single word-sized read
        // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
        // Because a closed channel cannot transition from 'ready for sending' to
        // 'not ready for sending', even if the channel is closed between the two observations,
        // they imply a moment between the two when the channel was both not yet closed
        // and not ready for sending. We behave as if we observed the channel at that moment,
        // and report that the send cannot proceed.
        //
        // It is okay if the reads are reordered here: if we observe that the channel is not
        // ready for sending and then observe that it is not closed, that implies that the
        // channel wasn't closed during the first observation.
        if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
            (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
            return false
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        // 获取同步锁
        lock(&c.lock)
    
        //向已经关闭的 channel 发送消息会产生 panic
        if c.closed != 0 {
            unlock(&c.lock)
            panic(plainError("send on closed channel"))
        }
    
        // CASE1: 当有 goroutine 在 recv 队列上等待时,跳过缓存队列,将消息直接发给 reciever goroutine
        if sg := c.recvq.dequeue(); sg != nil {
            // Found a waiting receiver. We pass the value we want to send
            // directly to the receiver, bypassing the channel buffer (if any).
            send(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true
        }
    
        // CASE2: 缓存队列未满,则将消息复制到缓存队列上
        if c.qcount < c.dataqsiz {
            // Space is available in the channel buffer. Enqueue the element to send.
            qp := chanbuf(c, c.sendx)
            if raceenabled {
                raceacquire(qp)
                racerelease(qp)
            }
            typedmemmove(c.elemtype, qp, ep)
            c.sendx++
            if c.sendx == c.dataqsiz {
                c.sendx = 0
            }
            c.qcount++
            unlock(&c.lock)
            return true
        }
    
        if !block {
            unlock(&c.lock)
            return false
        }
        
        // CASE3: 缓存队列已满,将goroutine 加入 send 队列
        // 初始化 sudog
        // Block on the channel. Some receiver will complete our operation for us.
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
        // No stack splits between assigning elem and enqueuing mysg
        // on gp.waiting where copystack can find it.
        mysg.elem = ep
        mysg.waitlink = nil
        mysg.g = gp
        mysg.selectdone = nil
        mysg.c = c
        gp.waiting = mysg
        gp.param = nil
        // 加入sendq队列
        c.sendq.enqueue(mysg)
        // 休眠
        goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
    
        // 唤醒 goroutine
        // someone woke us up.
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        if gp.param == nil {
            if c.closed == 0 {
                throw("chansend: spurious wakeup")
            }
            panic(plainError("send on closed channel"))
        }
        gp.param = nil
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        mysg.c = nil
        releaseSudog(mysg)
        return true
    }
        
    

      

    从channel读数据

    向一个channel中写数据简单过程如下:

    1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
    2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
    3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;

    简单流程图如下:

    源码

    接收操作有两种写法,一种带 "ok",反应 channel 是否关闭;一种不带 "ok",这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。两种写法,都有各自的应用场景。

    // entry points for <- c from compiled code
    func chanrecv1(c *hchan, elem unsafe.Pointer) {
    	chanrecv(c, elem, true)
    }
    
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    	_, received = chanrecv(c, elem, true)
    	return
    } 

    chanrecv1 函数处理不带 "ok" 的情形,chanrecv2 则通过返回 "received" 这个字段来反应 channel 是否被关闭。接收值则比较特殊,会“放到”参数 elem 所指向的地址了,这很像 C/C++ 里的写法。如果代码里忽略了接收值,这里的 elem 为 nil。

    无论如何,最终转向了 chanrecv 函数:

    // 位于 src/runtime/chan.go
    
    // chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
    // 如果 ep 是 nil,说明忽略了接收值。
    // 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
    // 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
    // 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
    // 如果 ep 非空,则应该指向堆或者函数调用者的栈
    
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    	// 省略 debug 内容 …………
    
    	// 如果是一个 nil 的 channel
    	if c == nil {
    		// 如果不阻塞,直接返回 (false, false)
    		if !block {
    			return
    		}
    		// 否则,接收一个 nil 的 channel,goroutine 挂起
    		gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
    		// 不会执行到这里
    		throw("unreachable")
    	}
    
    	// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
    	// 当我们观察到 channel 没准备好接收:
    	// 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
    	// 2. 缓冲型,但 buf 里没有元素
    	// 之后,又观察到 closed == 0,即 channel 未关闭。
    	// 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
    	// 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
    	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
    		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
    		atomic.Load(&c.closed) == 0 {
    		return
    	}
    
    	var t0 int64
    	if blockprofilerate > 0 {
    		t0 = cputicks()
    	}
    
    	// 加锁
    	lock(&c.lock)
    
    	// channel 已关闭,并且循环数组 buf 里没有元素
    	// 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
    	// 也就是说即使是关闭状态,但在缓冲型的 channel,
    	// buf 里有元素的情况下还能接收到元素
    	if c.closed != 0 && c.qcount == 0 {
    		if raceenabled {
    			raceacquire(unsafe.Pointer(c))
    		}
    		// 解锁
    		unlock(&c.lock)
    		if ep != nil {
    			// 从一个已关闭的 channel 执行接收操作,且未忽略返回值
    			// 那么接收的值将是一个该类型的零值
    			// typedmemclr 根据类型清理相应地址的内存
    			typedmemclr(c.elemtype, ep)
    		}
    		// 从一个已关闭的 channel 接收,selected 会返回true
    		return true, false
    	}
    
    	// 等待发送队列里有 goroutine 存在,说明 buf 是满的
    	// 这有可能是:
    	// 1. 非缓冲型的 channel
    	// 2. 缓冲型的 channel,但 buf 满了
    	// 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
    	// 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
    	if sg := c.sendq.dequeue(); sg != nil {
    		// Found a waiting sender. If buffer is size 0, receive value
    		// directly from sender. Otherwise, receive from head of queue
    		// and add sender's value to the tail of the queue (both map to
    		// the same buffer slot because the queue is full).
    		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    		return true, true
    	}
    
    	// 缓冲型,buf 里有元素,可以正常接收
    	if c.qcount > 0 {
    		// 直接从循环数组里找到要接收的元素
    		qp := chanbuf(c, c.recvx)
    
    		// …………
    
    		// 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
    		if ep != nil {
    			typedmemmove(c.elemtype, ep, qp)
    		}
    		// 清理掉循环数组里相应位置的值
    		typedmemclr(c.elemtype, qp)
    		// 接收游标向前移动
    		c.recvx++
    		// 接收游标归零
    		if c.recvx == c.dataqsiz {
    			c.recvx = 0
    		}
    		// buf 数组里的元素个数减 1
    		c.qcount--
    		// 解锁
    		unlock(&c.lock)
    		return true, true
    	}
    
    	if !block {
    		// 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
    		unlock(&c.lock)
    		return false, false
    	}
    
    	// 接下来就是要被阻塞的情况了
    	// 构造一个 sudog
    	gp := getg()
    	mysg := acquireSudog()
    	mysg.releasetime = 0
    	if t0 != 0 {
    		mysg.releasetime = -1
    	}
    
    	// 待接收数据的地址保存下来
    	mysg.elem = ep
    	mysg.waitlink = nil
    	gp.waiting = mysg
    	mysg.g = gp
    	mysg.selectdone = nil
    	mysg.c = c
    	gp.param = nil
    	// 进入channel 的等待接收队列
    	c.recvq.enqueue(mysg)
    	// 将当前 goroutine 挂起
    	goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
    
    	// 被唤醒了,接着从这里继续执行一些扫尾工作
    	if mysg != gp.waiting {
    		throw("G waiting list is corrupted")
    	}
    	gp.waiting = nil
    	if mysg.releasetime > 0 {
    		blockevent(mysg.releasetime-t0, 2)
    	}
    	closed := gp.param == nil
    	gp.param = nil
    	mysg.c = nil
    	releaseSudog(mysg)
    	return true, !closed
    }
    

      

     

    关闭channel

    func closechan(c *hchan) {
    	// 关闭一个 nil channel,panic
    	if c == nil {
    		panic(plainError("close of nil channel"))
    	}
    
    	// 上锁
    	lock(&c.lock)
    	// 如果 channel 已经关闭
    	if c.closed != 0 {
    		unlock(&c.lock)
    		// panic
    		panic(plainError("close of closed channel"))
    	}
    
    	// …………
    
    	// 修改关闭状态
    	c.closed = 1
    
    	var glist *g
    
    	// 将 channel 所有等待接收队列的里 sudog 释放
    	for {
    		// 从接收队列里出队一个 sudog
    		sg := c.recvq.dequeue()
    		// 出队完毕,跳出循环
    		if sg == nil {
    			break
    		}
    
    		// 如果 elem 不为空,说明此 receiver 未忽略接收数据
    		// 给它赋一个相应类型的零值
    		if sg.elem != nil {
    			typedmemclr(c.elemtype, sg.elem)
    			sg.elem = nil
    		}
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		// 取出 goroutine
    		gp := sg.g
    		gp.param = nil
    		if raceenabled {
    			raceacquireg(gp, unsafe.Pointer(c))
    		}
    		// 相连,形成链表
    		gp.schedlink.set(glist)
    		glist = gp
    	}
    
    	// 将 channel 等待发送队列里的 sudog 释放
    	// 如果存在,这些 goroutine 将会 panic
    	for {
    		// 从发送队列里出队一个 sudog
    		sg := c.sendq.dequeue()
    		if sg == nil {
    			break
    		}
    
    		// 发送者会 panic
    		sg.elem = nil
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		gp := sg.g
    		gp.param = nil
    		if raceenabled {
    			raceacquireg(gp, unsafe.Pointer(c))
    		}
    		// 形成链表
    		gp.schedlink.set(glist)
    		glist = gp
    	}
    	// 解锁
    	unlock(&c.lock)
    
    	// Ready all Gs now that we've dropped the channel lock.
    	// 遍历链表
    	for glist != nil {
    		// 取最后一个
    		gp := glist
    		// 向前走一步,下一个唤醒的 g
    		glist = glist.schedlink.ptr()
    		gp.schedlink = 0
    		// 唤醒相应 goroutine
    		goready(gp, 3)
    	}
    }
    

      

    refer:

    Go channel实现原理剖析

  • 相关阅读:
    使用signalr不使用连接服务器和前台的js的方法
    signalR client属性中的大致方法
    创建一个简单的signalr项目
    设计模式之工厂模式
    VS2012下没有ADO.NET实体数据模型
    迷你极客主机Station M1 开箱体验
    制作Station M1主机的Armbian启动卡
    StationP1的Ubuntu上安装gedit
    Station OS播放网络共享文件夹的媒体资源
    体验StationOS的推荐应用功能
  • 原文地址:https://www.cnblogs.com/-wenli/p/12710361.html
Copyright © 2011-2022 走看看