zoukankan      html  css  js  c++  java
  • chan数据结构实现原理

    1. chan数据结构

    src/runtime/chan.go:hchan定义了channel的数据结构,如下

    type hchan struct {
        qcount   uint           // 当前队列中剩余元素个数
        dataqsiz uint           // 环形队列长度,即可以存放的元素个数
        buf      unsafe.Pointer // 环形队列指针
        elemsize uint16         // 每个元素的大小
        closed   uint32            // 标识关闭状态
        elemtype *_type         // 元素类型
        sendx    uint           // 队列下标,指示元素写入时存放到队列中的位置
        recvx    uint           // 队列下标,指示元素从队列的该位置读出
        recvq    waitq          // 等待读消息的goroutine队列
        sendq    waitq          // 等待写消息的goroutine队列
        lock mutex              // 互斥锁,chan不允许并发读写
    }
    

    从数据结构可以看出channel由队列、类型信息、goroutine等待队列组成,下面分别说明其原理。
    如上,我们通常可以用chan相关绑定的方法获取相应hchan结构体的某些字段

    1. qcount:当前队列中剩余元素的个数,我们在判断一个有缓存是否有可读数据时,使用len(chan)
    2. daraqsize: 环形队列长度,可存放的元素个数,cap(chan)
    3. closed: 标识关闭的状态,通过close(chan)来改变状态

    1.1 环形队列

    chan内部实现了一个环形队列作为其缓冲区,队列的长度是创建chan时指定的: make(chan int, 10)

    图展示了一个可缓存6个元素的channel示意图:

    • dataqsiz指示了队列长度为6,即可缓存6个元素;
    • buf指向队列的内存,队列中还剩余两个元素;
    • qcount表示队列中还有两个元素;
    • sendx指示后续写入的数据存储的位置,取值[0, 6);
    • recvx指示从该位置读取数据, 取值[0, 6);

    1.2 等待队列

    从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。
    向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。

    被阻塞的gouroutie将会挂在channel的等待队列中:

    • 因读阻塞的goroutine会被像channel写入数据的goroutine唤醒
      • 常见有缓冲已经消费完数据数据后,如使用range读取时无数据时会进行阻塞
    • 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;
      • 无缓冲chan需要在写入前,读操作准备好
      • 有缓冲的chan在缓冲填满后,只有当读操作消费数据后,chan有空间,写入数据才能进行

    下图展示了一个没有缓冲区的channel,有几个goroutine阻塞等待读数据:


    注意,一般情况下recvq和sendq至少有一个为空。只有一个例外,那就是同一个goroutine使用select语句向channel一边写数据,一边读数据。

    1.3 类型消息

    • elemtype *_type

    一个channel只能传递一种类型的值,类型信息存储在hchan数据结构中。

    • elemtype代表类型,用于数据传递过程中的赋值;
    • elemsize代表类型大小,用于在buf中定位元素位置。

    1.4 锁

    一个channel同时仅允许被一个goroutine读写,为简单起见,本章后续部分说明读写过程时不再涉及加锁和解锁。

    2.channel读写

    2.1 创建channel

    创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。

    创建channel的伪代码如下所示:

    func makechan(t *chantype, size int) *hchan {
        var c *hchan
        c = new(hchan)
        c.buf = malloc(元素类型大小*size)
        c.elemsize = 元素类型大小
        c.elemtype = 元素类型
        c.dataqsiz = size
    
        return c
    }
    

    2.2 向channel写数据

    向一个channel中写数据简单过程如下:

    • 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
    • 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
    • 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;

    流程图如下:

    2.3 从channel读数据

    从一个channel读数据简单过程如下:

    1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
    2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
    3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
    4. 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;

    简单流程图如下:

    2.4 关闭channel

    关闭channel时会把recvq中的G全部唤醒, 本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic

    • 简单说就是,向关闭的channel写数据会panic,继续读数据会读出nil

    除此之外, panic出现的常见场景还有:

    1. 关闭值为nil的通道channel
    2. 关闭已经被关闭的channel
    3. 像已经关闭的channel写数据

    3.常见的用法

    3.1 单向channel

    单向的channel指只能勇于发送或接受数据,实际上也没有单向的channel

    channel可以通过参数进行传递,所谓的单向channel只是对channel的一种使用限制

    • func readChan(chanName <-chan int): 通过形参限定函数内部只能从channel中读取数据
    • func writeChan(chanName chan<-int): 参数限定函数内部只能从channel中写入输入

    简单的示例代码如下:

    package main
    
    func readChan(chanName <-chan int){
        <-chanName
    }
    
    func writeChan(chanName chan<- int){
        chanName <-1
    }
    
    func main(){
        var mychan = make(chan int, 10)
    
        writeChan(mychan)
        readChan(mychan)
    }
    
    

    mychan是个正常的chan.而readChan() 参数限制传入的channel只能用来读,writeChan()参数限制了传入的channel只能用来写

    3.2 select

    使用select可以监控多个channel,比如监控多个channel,当其中某个channel有数据时,就从其读出数据。

    一个简单的示例如下:

    
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func addNumberToChan(chanName chan int) {
        for {
            chanName <- 1
            time.Sleep(1 * time.Second)
        }
    }
    
    func main() {
        var chan1 = make(chan int, 10)
        var chan2 = make(chan int, 10)
    
        go addNumberToChan(chan1)
        go addNumberToChan(chan2)
    
        for {
            select {
            case e := <- chan1 :
                fmt.Printf("Get element from chan1: %d
    ", e)
            case e := <- chan2 :
                fmt.Printf("Get element from chan2: %d
    ", e)
            default:
                fmt.Printf("No element in chan1 and chan2.
    ")
                time.Sleep(1 * time.Second)
            }
        }
    }
    

    程序中创建两个channel:chan1和chan2. 函数addNumberToChan()函数会向两个channel中周期性写入数据。 通过select可以监控两个channel,任意一个刻度是就从其中读出数据。
    程序输出如下:

    root@failymao:/mnt/d/gopath/src/Go_base/# go run chan.go
    No element in chan1 and chan2.
    Get element from chan1: 1
    Get element from chan2: 1
    Get element from chan1: 1
    No element in chan1 and chan2.
    Get element from chan2: 1
    Get element from chan1: 1
    Get element from chan2: 1
    No element in chan1 and chan2.
    Get element from chan1: 1
    Get element from chan2: 1
    No element in chan1 and chan2.
    

    从输出可见, 从channel中读出数据的顺序是随机的,事实上select语句的多个case执行是随机的。
    select 的case语句读channel不会阻塞, 尽管channel中没有数据。 但是由于case语句在编译后调用读channel时会明确传入不阻塞的参数, 此时读不到数据时不会将当前的goroutine加入到等待队列,而是直接返回。

    3.3 range遍历channel元素

    通过range可以持续从channel中读取出数据,如同遍历map,slice,array 一样,当channel中没有数据是会阻塞当前goroutine, 与读channel阻塞时处理机制一样, 当有数据写入时,读channel开始被唤醒

    func chanRange(chanName chan int) {
        for e := range chanName {
            fmt.Printf("Get element from chan: %d
    ", e)
        }
    }
    

    注意:如果向此channel写数据的goroutine退出时,系统检测这种情况会panic,否则range将会永久阻塞。

    使用场景:比如在读取kafka数据时,因为会存在大量的数据写入,一般会通过方式不停进行取数据,但是在程序中会结合单独的协程使用

    
    func receiver(pc sarama.Consumer){
        for msg:=range pc.Message{
            fmt.Println(msg.Value,msg.offse,msg.Topic)
        }
    }
    
    func main(){
       ...
       go receiver(pc)
    }
    
    ♥永远年轻,永远热泪盈眶♥
  • 相关阅读:
    Android:短信发送
    Android 自制拍照软件
    Android 联系人的读取,查询,添加
    android activity生命周期
    android 让 EditText, TextView自动识别链接
    android Log图文详解(Log.v,Log.d,Log.i,Log.w,Log.e)
    FileOutputStream 读文件的模式
    Android 使用 SharedPreferences 保存和加载软件参数
    使用google的GSON处理JSON
    Android SqlLite数据库的创建、增、删、改、查、使用事务
  • 原文地址:https://www.cnblogs.com/failymao/p/14891813.html
Copyright © 2011-2022 走看看