zoukankan      html  css  js  c++  java
  • channel

    http://studygolang.com/topics/2144

    boscoyoung · 2016-11-20 16:41:51 · 792 次点击 · 2分钟之前 开始浏览   
    这是一个创建于 2016-11-20 16:41:51 的主题,其中的信息可能已经有所发展或是发生改变。

    传统意义上的并发编程,需要直接共享内存,这给并发模型带来了额外的复杂度.

    Goroutine本质上也是多线程模型,让Go变得特别的地方就是channel, 采用消息模型避免直接的内存共享,降低了信息处理的复杂度.

    注意:本文所有的源码分析基于Go1.6.2,版本不同实现可能会有出入

    Channel是Go中一种特别的数据结构,它不像普通的类型一样,强调传值还是指针; 对于channel而言,只涉及传指针,因为make(chan)返回的是指针类型:

    package main
    
    func abstractListener(fxChan chan func() string) {
        fxChan <- func() string {
            return "Hello, functions in channel"
        }
    }
    
    func main() {
        fxChan := make(chan func() string)
        defer close(fxChan)
    
        go abstractListener(fxChan)
        select {
        case rfx := <-fxChan:
            msg := rfx()
            println("recieved message:", msg)
        }
    }
    

    本文从Go Runtime的实现角度,粗略介绍了channel的实现, 希望能为其他的语言带来一些启发.

    make channel

    这是初始化channel的过程,编译器会把make(chan xxx)最终指向:

    // src/runtime/chan.go#L52
    func reflect_makechan(t *chantype, size int64) *hchan {
        return makechan(t, size)
    }
    

    最终返回一个*hchan类型,可见make出来的channel对象确实是一个指针. hchan保存了一个channel实例必须的信息. 里面的参数后面会详细说到.

    // src/runtime/chan.go#L25
    type hchan struct {
        qcount   uint           // buffer中数据总数
        dataqsiz uint           // buffer的容量
        buf      unsafe.Pointer // buffer的开始指针
        elemsize uint16         // channel中数据的大小
        closed   uint32         // channel是否关闭,0 => false,其他都是true
        elemtype *_type         // channel数据类型
        sendx    uint           // buffer中正在send的element的index
        recvx    uint           // buffer中正在recieve的element的index
        recvq    waitq          // 接收者等待队列
        sendq    waitq          // 发送者等待队列
    
        lock     mutex          // 互斥锁
    }
    

    代码中调用make(chan $type, $size)的时候,最终会执行runtime.makechan

    // src/runtime/chan.go#L56
    func makechan(t *chantype, size int64) *hchan {
      // 初始化,做一些基本校验
    
        var c *hchan
      // 如果是无指针类型或者指定size=0;可见默认情况下都会走这个逻辑
        if elem.kind&kindNoPointers != 0 || size == 0 {
            // 从内存分配器请求一段内存
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        // 如果是无指针类型,并且指定了buffer size
            if size > 0 && elem.size != 0 {
                c.buf = add(unsafe.Pointer(c), hchanSize)
            } else {
                // race detector uses this location for synchronization
                // Also prevents us from pointing beyond the allocation (see issue 9401).
                c.buf = unsafe.Pointer(c)
            }
        } else {
        // 有指针类型,并且size > 0时,直接初始化一段内存
            c = new(hchan)
            c.buf = newarray(elem, int(size))
        }
      // 初始化channel结构里面的参数
        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        c.dataqsiz = uint(size)
    
      // ...
        return c
    }
    

    上面的代码提到了“无指针类型”,一个比较简单的例子就是反射中的值类型, 这种值是没办法直接获取地址的, 也有些地方叫做“不可寻址”:

    i := 1
    // 从道理上上,这个value实际上就是一个抽象概念,
    // 好比从一个活人身上把肉体抽出来 => value
    // 脱离了具体的对象,所以就无法谈地址,但这样的值却实实在在存在
    v := reflect.ValueOf(i)
    // 然后灵魂也可以抽出来 => type
    t := reflect.TypeOf(i)
    

    send to channel

    初始化channel之后就可以向channel中写数据,准确的说,应该是发送数据.

    入口函数:

    // src/runtime/chan.go#L106
    func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) {
        chansend(t, c, elem, true, getcallerpc(unsafe.Pointer(&t)))
    }
    

    核心逻辑都在chansend中实现,注意这里的block:bool参数始终为true:

    // src/runtime/chan.go#L122
    func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      // 初始化,异常校验和处理
    
      // 处理channel的临界状态,既没有被close,又不能正常接收数据
        if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
            (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
            return false
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
      // 对同一个channel的操作是串行的
        lock(&c.lock)
    
      // 向一个已经被close的channel写数据,会panic
        if c.closed != 0 {
            unlock(&c.lock)
            panic(plainError("send on closed channel"))
        }
    
      // 这是一个不阻塞的处理,如果已经有接收者,
      // 就向第一个接收者发送当前enqueue的消息.
      // return true就是说发送成功了,写入buffer也算是成功
        if sg := c.recvq.dequeue(); sg != nil {
        // 发送的实现本质上就是在不同线程的栈上copy数据
            send(c, sg, ep, func() { unlock(&c.lock) })
            return true
        }
    
        if c.qcount < c.dataqsiz {
        // 把发送的数据写入buffer,更新buffer状态,返回成功
        }
    
      // 如果buffer满了,或者non-buffer(等同于buffer满了),
      // 阻塞sender并更新一些栈的状态,
      // 唤醒线程的时候还会重新检查channel是否打开状态,否则panic
    }
    

    再看send的实现:

    // src/runtime/chan.go#L122
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
        if sg.elem != nil {
        // 直接把要发送的数据copy到reciever的栈空间
            sendDirect(c.elemtype, sg, ep)
            sg.elem = nil
        }
    
      // 等待reciever就绪,如果reciever还没准备好就阻塞sender一段时间
    }
    

    sendDirect的实现:

    // src/runtime/chan.go#L284
    func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
      // 直接拷贝数据
        dst := sg.elem
        memmove(dst, src, t.size)
        typeBitsBulkBarrier(t, uintptr(dst), t.size)
    }
    

    channel send主要就是做了2件事情:

    • 如果没有可用的reciever,数据入队(如果有buffer),否则线程阻塞
    • 如果有可用的reciever,就把数据从sender的栈空间拷贝到reciever的栈空间

    recieve from channel

    入口函数:

    // src/runtime/chan.go#L377
    func chanrecv1(t *chantype, c *hchan, elem unsafe.Pointer) {
        chanrecv(t, c, elem, true)
    }
    

    chanrecv的实现:

    // src/runtime/chan.go#L393
    func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      // 初始化,处理基本校验
    
      // 处理channel临界状态,reciever在接收之前要保证channel是就绪的
        if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
            c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
            atomic.Load(&c.closed) == 0 {
            return
        }
    
      // ...
    
      // 如果recieve数据的时候,发现channel关闭了,
      // 直接返回channel type的zero value
        if c.closed != 0 && c.qcount == 0 {
        // ...
            if ep != nil {
                memclr(ep, uintptr(c.elemsize))
            }
            return true, false
        }
    
      // 从sender阻塞队首获取sender,接收数据
      // 如果buffer为空,直接获取sender的数据,否则把sender的数据加到buffer
      // 队尾,然后从buffer队首获取数据
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) })
            return true, true
        }
    
      // 如果当前没有pending的sender,且buffer里有数据,直接从buffer里面拿数据
        if c.qcount > 0 {
        // ...
        }
    
      // buffer里面没有数据,也没有阻塞的sender,就阻塞reciver
    }
    

    close channel

    入口函数:

    // src/runtime/chan.go#L303
    func closechan(c *hchan) {
      // 关闭channel之前必须已经初始化
    
      // 检查channel状态,不能重复关闭channel
    
      // ...
    
      // 将closed置为1,标志channel已经关闭
        c.closed = 1
    
      // ...
    
      // 先释放所有的reciever/reader
    
      // 然后释放所有的sender/writer,在sender端会panic
    
      // ...
    }
  • 相关阅读:
    聊聊WS-Federation
    用双十一的故事串起碎片的网络协议(上)
    责任链模式的使用-Netty ChannelPipeline和Mina IoFilterChain分析
    最小化局部边际的合并聚类算法(中篇)
    最小化局部边际的合并聚类算法(上篇)
    UVaLive 7371 Triangle (水题,判矩形)
    UVaLive 7372 Excellence (水题,贪心)
    POJ 3312 Mahershalalhashbaz, Nebuchadnezzar, and Billy Bob Benjamin Go to the Regionals (水题,贪心)
    UVa 1252 Twenty Questions (状压DP+记忆化搜索)
    UVa 10817 Headmaster's Headache (状压DP+记忆化搜索)
  • 原文地址:https://www.cnblogs.com/thrillerz/p/7390138.html
Copyright © 2011-2022 走看看