除了《Kubernetes GO》系列之外,对于golang相关知识,同时准备了《Golang 漫谈》以增雅趣,不足之处,万望海涵,在此特别感谢雨痕的Golang 源码剖析。
Golang 1.13.1已在9月26日正式发布,主要修复CVE-2019-16276
,当然docker等相关组件也同时做了update
channel是Golang提供的goroutine间的通信方式,其为Golang并发模型CSP的关键,Golang鼓励用通讯实现数据共享,如果需要跨进程通信,建议使用分布式方案或者消息队列来解决。该文章主要介绍,以下内容:
- channel介绍及范例
- channel用法
- channel使用场景
- channel原理赏析
下面在进入正题之前,简要介绍一下CSP模型:
传统并发模型分为Actor模型与CSP模型,其中CSP全称为Communicating Sequential Processess,CSP模型有并发执行体(进程、线程、协程),和消息通道组成,执行体之间通过消息通道进行通讯,CSP模型关注消息发送的载体,即消息管道,而Actor关注的是内部的状态,那么Golang中执行体对应的是goroutine,消息通道对应的是channel。
一、channel介绍及范例
如上所言,channel 提供了一种通信机制,其为gouroutine之间的通信提供了一种可能,执行体拷贝数据,channel负责传递,有以下应用场景:
-
广播,如消费者/生产者模型
-
交换数据
-
并发控制
-
显示通知等
Golang鼓励使用通讯来实现数据共享,而不是经由内存。
1.1 channel特性
1)线程安全:hchan mutex
2)先进先出:copying into and out of hchan buffer
3)channel的高性能所在:
- 调用runtime scheduler实现,OS thread不需要阻塞;
- 跨goroutine栈可以直接进行读写;
1.2 channel类型
channel分为非缓存channel与缓存channel。
-
无缓存channel
从无缓存的channel中读取消息会堵塞,直到有goroutine往channel中发送消息;同理,向无缓存的channel中发送消息也会堵塞,直到有goroutine从channel中读取消息。
-
有缓存channel
有缓存channel的声明方式为指定make函数的第二个参数,该参数为channel缓存的容量。
通过内置len函数可获取chan元素个数,通过cap函数可获取chan的缓存长度
单项channel :单向channel为只读/只写channel,单向channel,在编译时,可进行检测。
func testSingal(ch chan<- int) <- chan int {
// 定义工作逻辑
}
其中 chan<- int表示只写channel, <-chan int表示只读channel,此类函数/方法声明可防止channel滥用,在编译时可以检测出。
1.3 channel创建
channel使用内置的make函数创建,如下,声明类型为int的channel:
// 非缓存channel
ch := make(chan int)
// 缓存channel
bch := make(chan int, 2)
channel和map类似,make创建了底层数据结构的引用,当赋值或参数传递时,只是拷贝了一个channel的引用,其指向同一channel对象,与其引用类型一样,channel的空值也为nil。使用==
可以对类型相同的channel进行比较,只有指向相同对象或同为nil时,结果为true。
1.4 channel的读写操作
channel在使用前,需要初始化,否则永远阻塞。
ch := make(chan int)
// 写入channel
ch <- x
// 从channel中读取
y <- ch
// 从channel中读取
z := <- ch
1.5 channel的关闭
golang提供了内置的close函数,对channel进行关闭操作。
// 初始化channel
ch := make(chan int)
// 关闭channel ch
close(ch)
关于channel的关闭,需要注意以下事项:
- 关闭未初始化的channle(nil)会panic
- 重复关闭同一channel会panic
- 向以关闭channel发送消息会panic
- 从已关闭channel读取数据,不会panic,若存在数据,则可以读出未被读取的消息,若已被读出,则获取的数据为零值,可以通过ok-idiom的方式,判断channel是否关闭
- channel的关闭操作,会产生广播消息,所有向channel读取消息的goroutine都会接受到消息
package main
import fmt
func main() {
// 初始化channel
ch := make(chan int, 3)
// 发送消息
ch <- 1
ch <- 2
# 关闭channel
close(ch)
// 循环读取
for c := range ch {
fmt.Println(c)
}
}
1.6 两类channel
ch := make(chan int, 1)
有缓存的channel使用环形数组实现,当缓存未满时,向channel发送消息不会阻塞,当缓存满时,发送操作会阻塞,直到其他goroutine从channel中读取消息;同理,当channel中消息不为空时,读取消息不会阻塞,当channel为空时,读取操作会阻塞,直至其他goroutine向channel发送消息。
ch := make(chan int)
// 阻塞,因channel ch为空
<- ch
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
# 阻塞,因缓存已满
ch <- 4
<- ch
二、channel的用法
2.1 goroutine通信
看下面《effctive go》中的例子:
主goroutine会阻塞,直至执行sort的goroutine完成
// 初始化chan
c := make(chan int)
# 使用goroutine执行list.Sort(),完毕后,发送信号
go func() {
list.Sort()
c <- 1
}()
// 处理其他事务
doSomething()
// 读取chan消息
<-c
2.2 range遍历
channel也可以使用range取值,并且会一直从chanel中读取数据,直至goroutine关闭该channel,循环才会结束,如下所示。
// 初始化channel
ch := make(chan int, 5)
go func(){
for i := 0; i < 5; i ++ {
ch <- i
}
}()
for i := range ch {
fmt.Println(i)
}
等同于
// 初始化channel
ch := make(chan int, 5)
go func(){
for i := 0; i < 5; i ++ {
ch <- i
}
}()
for {
i, ok := <- ch
if !ok {
break
}
fmt.Println(i)
}
2.3 配合select使用
select用法类似IO多路复用,可同时监听多个channel的消息,如下所示:
select {
case <- a;
fmt.Println("testa")
case <- b;
fmt.Println("testb")
case c <- 3;
fmt.Println("testc")
default:
fmt.Println("testdefault")
}
select有以下特性:
- select可同时监听多个channel的读/写
- 执行select时,若只有一个case通过,则执行该case
- 若有多个,则随机执行一个case
- 若所有都不满足,则执行default,若无default,则等待
- 可使用break跳出select
三、channel使用场景
3.1 设置超时时间
// 初始化channel,数据类型为struct{}
ch := make(chan struct{})
// 以goroutine方式处理func
go func(){
// 处理逻辑
// 传递ch,控制goroutine
}(ch)
timeout := time.After(1 * time.Sencond)
select {
case <- ch:
fmt.Printfln("任务完成.")
case <- timeout:
fmt.Printfln("时间已到.")
}
3.2 控制channel
在某些应用场景,工作goroutine一直处理事务,直到收到退出信号
mch := make(chan struct{})
quit := make(chan struct{})
for {
select {
case <- mch:
// 正常工作
work()
case <- quit:
// 退出前,处理收尾工作
doFinish()
return
}
}
四、channel原理赏析
4.1 channel结构体
以下源码基于go 1.13.1
,其主要实现在src/runtime/chan.go
中,在介绍源码前,需要介绍channel最主要的结构体hchan
,其定义如下所示:
type hchan struct {
qcount uint // 当前队列中剩余元素个数,即len
dataqsiz uint // 环形队列长度,即可以存放的元素个数,cap
buf unsafe.Pointer // 环形队列指针:队列缓存,头指针,环形数组实现
elemsize uint16 // 每个元素的大小
closed uint32 // 关闭标志位
elemtype *_type // 元素类型
sendx uint // 队列下标,指示元素写入时存放到队列中的位置
recvx uint // 队列下标,指示元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex // 该锁保护hchan所有字段
}
// sending/receiving等待队列的链表实现
type waitq struct {
first *sudog
last *sudog
}
hchan类型
一个channel只能传递一种类型的值,类型信息存储在hchan
数据结构体中,_type
结构体中包含elemtype
及elemsize
等。
- elemetype代表类型,用于数据传递过程中的赋值
- elemesize代码类型大小,用于在buf中定位元素位置
hchan环形队列
hchan内部实现了一个环形队列作为缓冲区,队列的长度是创建channel时指定的。下图展示了一个可缓存6个元素的channel的示意图:
- dataqsiz指示队列长度为6,即可缓存6个元素
- buf指向队列的内存,队列中还剩余两个元素
- qcount表示队列中还有两个元素
- sendx指示后续写入的数据存储的位置,取值[0,6)
- recvx指示从该位置读取数据,取值[0,6)
hchan等待队列
从channel读消息,如果channel缓冲区为空或者没有缓存区,当前goroutine会被阻塞。
向channel读消息,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。
被阻塞的goroutine将会封装成sudog,加入到channel的等待队列中:
- 因读消息阻塞的goroutine会被channel向channel写入数据的goroutine唤醒
- 因写消息阻塞的goroutine会从channel读消息的goroutine唤醒
一般情况下,recvq和sendq至少一个为空,只有一个例外,即同一个goroutine使用select语句向channel一边写数据,一个读数据。
// sudog将*g封装到等待链表中
//(M)sudogs <-> (N) g
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
和其他一样,sudog也实现二级缓存复用结构。
runtime2.go
type p struct {
// proceresice new(p)时指向sudogbuf
sudogcache []*sudog
sudogbuf [128]*sudog
}
type schedt struct {
// Central cache of sudog structs.
sudoglock mutex
sudogcache *sudog
}
func acquireSudog() *sudog { // 获取当前m mp := acquirem() pp := mp.p.ptr() // 如果当前p为空 if len(pp.sudogcache) == 0 { lock(&sched.sudoglock) // First, try to grab a batch from central cache. // 从全局转移一批到当前p for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } unlock(&sched.sudoglock) // 如果还为空,则创建 if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } // 从尾部获取,同时调整p的缓存 n := len(pp.sudogcache) s := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } releasem(mp) return s } //go:nosplit func releaseSudog(s *sudog) { // 判断结构体是否为空 if s.elem != nil { throw("runtime: sudog with non-nil elem") } if s.isSelect { throw("runtime: sudog with non-false isSelect") } if s.next != nil { throw("runtime: sudog with non-nil next") } if s.prev != nil { throw("runtime: sudog with non-nil prev") } if s.waitlink != nil { throw("runtime: sudog with non-nil waitlink") } if s.c != nil { throw("runtime: sudog with non-nil c") } gp := getg() if gp.param != nil { throw("runtime: releaseSudog with non-nil gp.param") } mp := acquirem() // avoid rescheduling to another P pp := mp.p.ptr() // 如果p已满,则转移到全局 if len(pp.sudogcache) == cap(pp.sudogcache) { // Transfer half of local cache to the central cache. var first, last *sudog for len(pp.sudogcache) > cap(pp.sudogcache)/2 { n := len(pp.sudogcache) p := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if first == nil { first = p } else { last.next = p } last = p } lock(&sched.sudoglock) last.next = sched.sudogcache sched.sudogcache = first unlock(&sched.sudoglock) } pp.sudogcache = append(pp.sudogcache, s) releasem(mp) }
转载: https://juejin.im/entry/5da165c9f265da5b8f107dbf