zoukankan      html  css  js  c++  java
  • 2.7 Go channel 代码示例

    channel简单示例

    package main
    
    import (
        "fmt"
        "time"
    )
    
    //channel的创建,发送,接收
    func channe1(){
        //创建,channel是有类型的
        c1 := make(chan int)
        
        
        //接收,在这段程序中接收方必须是一个goroutine,因为只在主程序中发送而不接收,程序会报deadlock
        //通常使用匿名函数开一个与主程序同时执行的内部方法,即并行执行
        go func(){
            fmt.Println("接收数据准备")
            //这里接收channel使用了io输出功能,io是可以被抢占控制权的,即IO的特性
            fmt.Println(<- c1)
            fmt.Println("接收数据完成")
            
            //关闭,不显式关闭时,channel会随主程序(即main)的运行结束而结束
            //如果“接收”处理数据的时间较长,就会出现主程序已经结束,但接收方还没处理完的情况
            //此时可以让主程序sleep一段时间,等待接收方把数据处理完毕再关闭
            close(c1)
            
            fmt.Println("接收结束")
        }()
        
        //发送数据,“接收”程序要在发送之前准备,
        //意思就是发送数据之前,要先为channel准备好接收;
        //否则,执行<- 1将1发送到channel时,go发现没有人接收,会报deadlock
        c1 <- 1
        
        //接收方与主程序同时执行
        //主程序在此停止1毫秒,就相当于主程序等了接收方一毫秒
        time.Sleep(time.Millisecond)
    }
    
    func main(){
        channe1()
        fmt.Println("主程序结束")
    }

    输出

    # go run chan1.go 
    接收数据准备
    1
    接收数据完成
    接收结束
    主程序结束

    channel同步

    如果一个动作会触发另外一个动作,那么这个行为通常被称为事件(event);如果这个事件不附带信息,那么此类事件又通常被用于同步。

    channel有发送、接收、关闭三个操作;

    发送触发接收,如果一个channel不发送,那么接收将处于阻塞。这种同步,可用于消息通知。

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func test(){
        c := make(chan struct{})
        
        go func(){
            fmt.Println("我要花两分钟去看看园子里的花还活着吗")
            time.Sleep(7*time.Second)
            c <- struct{}{}
        }()
        
        //程序会在这里等待7秒
        <- c
        //然后打印出下面这句话
        fmt.Println("这花从屋里移值出去后,活得比以前更好了")
    }
    
    func main(){
        test()
    }

    channel数组

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func channel2(){
        //WaitGroup的wait(在主程序中调用)与done(在与主程序并行执行的“接收”方中调用)的交互,
        //可以达到等待所有channel运行完毕,再让主程序运行的效果
        //而不是程序员猜想channel“接收”需要多少时间运行,
        //然后去主程序中设置time.Sleep让主程序等待
        var wtg sync.WaitGroup
        
        //channel数组
        var workers [8]worker
        for i := 0; i < 8; i++{
            //使用引用的方式传送参数,所有的channel公用一个WaitGroup
            workers[i] = createWorker(i,&wtg)
        }
        //要一次性添加完要等待执行的channel个数
        wtg.Add(16)
        for i,worker := range workers {
            worker.in <- 'a' + i
            //wtg.Add(1)   //这种方式会报错
        }
        
        for i,worker := range workers {
            worker.in <- 'A' + i
            //wtg.Add(1)
        }
        //等待所有channel执行完毕,否则一直阻塞
        wtg.Wait()
        fmt.Println("所有channel执行完毕")
    }
    
    func createWorker(id int, wtg *sync.WaitGroup) worker{
        //channel作为struct的一个属性
        wk := worker{
            //chan<-表示channel只用于发送数据,即输入
            in :  make(chan int),
            done: func(){
                wtg.Done()
            },
        }
        //channel创建之后,就开始以并行的方式建立“接收”方
        go wk.doWork(id)
        return wk
    }
    
    
    type worker struct {
        in   chan int
        done func()
    }
    
    //“接收”方程序
    func (wk *worker) doWork(id int){
        
        //接收的确是按数组顺序顺序打印出来的,但这只是程序第一次运行的情况
        //接收是在发送之前就以并行的方式运行起来了,之后数据中每个channel都一直处于阻塞等待状态
        //也就是说数组中的每个channel谁先打印出数据,就表示该谁先发送数据(忽略channel传送数据时长差异)
        for n := range wk.in {
            fmt.Printf("第 %d 次接收的信息为 %c
    ",id,n)
            
            //通知主程序工作处理完毕
            wk.done()
        }
    }
    
    func main(){
        channel2()
        fmt.Println("主程序结束")
    }

    输出

    # go run chann.go 
    第 0 次接收的信息为 a
    第 1 次接收的信息为 b
    第 2 次接收的信息为 c
    第 3 次接收的信息为 d
    第 4 次接收的信息为 e
    第 5 次接收的信息为 f
    第 6 次接收的信息为 g
    第 6 次接收的信息为 G
    第 7 次接收的信息为 h
    第 7 次接收的信息为 H
    第 0 次接收的信息为 A
    第 1 次接收的信息为 B
    第 2 次接收的信息为 C
    第 3 次接收的信息为 D
    第 4 次接收的信息为 E
    第 5 次接收的信息为 F
    所有channel执行完毕
    主程序结束

    这个例子除了介绍buffer channel,即channel数组的使用外,还涉及一个GO语言的重要思想,引用GO创始人的原话:

    Don't communicate by sharing memory;share memory by communicating.

    不要通过共享内存来通信;通过通信来共享内存。

    通过共享内存来通信:程序A运行完的结果返回给标识flag,如果flag为true运行程序B,返之运行程序C;

    通过通信来共享内存:当一个channel处理完毕时,不是修改一个变量告诉发送方我处理完了,而通过channel来传达这个信息; 可以再定义一个channel来实现这个功能,此处使用了WaitGroup。

    select调度

    最简单的select调度

    package main
    
    import (
        "fmt"
    )
    
    func main(){
        var c1,c2 chan int //nil
        
        //select可以在channel为nil的时候就去接收数据,
        //哪个channel有数据发送过来就进行接收
        select {
            case n:= <- c1:
                fmt.Println("来自c1:",n)
                
            case n:= <- c2:
                fmt.Println("来自c2:",n)
            //如果所有channel都无数据
            default:
                fmt.Println("无数据可接收")
        }
    
    }

    输出

    # go run chan3.go 
    无数据可接收

     

     

     

    select调度是多选一,是阻塞的

    select是阻塞的

    package main
    
    import (
        "fmt"
        "tools"
    )
    
    func main() {
    
        var c1,c2  chan int
    
        i:= 0
    
        for {
            i++
            fmt.Println(i)
            tools.SleepBySec(1)
    
            select {
            case <- c1:
                fmt.Println(1)
    
            case <- c2:
                fmt.Println(2)
            }
        }
        fmt.Println("over")
    }

    以上这段代码会输出一次1,然后就被阻塞住,select一直监听着两个case的channel是否有数据过来,阻塞着程序,阻塞着for循环。

    package main
    
    import (
        "fmt"
    )
    
    func channel3() chan int{
        out := make(chan int)
        go func(){
            i := 0
            for {
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        var c1,c2 = channel3(),channel3()
        
        for{
            //select可以在channel为nil的时候就去接收数据,
            //哪个channel有数据发送过来就进行接收
            select {
                case n:= <- c1:
                    fmt.Println("来自c1:",n)
                    
                case n:= <- c2:
                    fmt.Println("来自c2:",n)
            /* 这里注释掉了,为什么呢?
            这是因为如果不注释掉,大部分程序就会走default
            因为在一个时间段内,发送数据只是占用了部分时间片,而不是所有时间 
            该行注释掉之后,select只有两个选择,输出c1与c2发送过来的数据
      
                //如果所有channel都无数据
                default:
                    fmt.Println("无数据可接收")
            */
            }
        }
    
    }

    输出,只取一部分输出数据

    来自c2: 17529
    来自c2: 17530
    来自c2: 17531
    来自c2: 17532
    来自c2: 17533
    来自c2: 17534
    来自c2: 17535
    来自c2: 17536
    来自c2: 17537
    来自c1: 9834
    来自c1: 9835
    来自c1: 9836
    来自c1: 9837
    来自c1: 9838
    来自c1: 9839
    来自c1: 9840
    来自c1: 9841
    来自c1: 9842
    来自c1: 9843

    如果select中的default不注释

    package main
    
    import (
        "fmt"
    )
    
    func channel3() chan int{
        out := make(chan int)
        go func(){
            i := 0
            for {
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        var c1,c2 = channel3(),channel3()
        i :=0 
        j :=0
        for{
            i++
            fmt.Println("循环:",i)
            //select可以在channel为nil的时候就去接收数据,
            //哪个channel有数据发送过来就进行接收
            select {
                case n:= <- c1:
                    fmt.Println("来自c1:",n)
                    
                case n:= <- c2:
                    fmt.Println("来自c2:",n)
                    
                default:
                    j++
                    fmt.Println("无数据可接收",j)
            }
        }
    
    }

    输出,12755次循环中有12635次走了无数据

    循环: 12753
    无数据可接收 12633
    循环: 12754
    无数据可接收 12634
    循环: 12755
    无数据可接收 12635

    无default

    package main
    
    import (
        "fmt"
    )
    
    func channel3() chan int{
        out := make(chan int)
        go func(){
            i := 0
            for {
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        var c1,c2 = channel3(),channel3()
        i :=0 
        j :=0
        t :=0
        for{
            i++
            fmt.Println("循环:",i)
            t = i - j
            fmt.Println("来自c1:",t)
            //select可以在channel为nil的时候就去接收数据,
            //哪个channel有数据发送过来就进行接收
            select {
                case n:= <- c1:
                    fmt.Println("来自c1:",n)
                    
                case n:= <- c2:
                    j++
                    fmt.Println("来自c2:",n)
            }
        }
    }

    输出

    循环: 10218
    来自c1: 4132
    来自c2: 6086
    循环: 10219
    来自c1: 4132
    来自c2: 6087
    循环: 10220
    来自c1: 4132
    来自c2: 6088
    循环: 10221
    来自c1: 4132
    来自c2: 6089
    package main
    import (
        "fmt"
        "time"
    )
    
    func ch1() chan int{
        fmt.Println("接收程序开始")
        
        c1 := make(chan int)
        
        go func(){
            i:=0
            for {
                c1 <- i
            }
        }()
    
        time.Sleep(time.Millisecond)
        return c1
    }
    
    func main(){
        a,b,i := 0,0,0
        c1,c2 := ch1(),ch1()
        
        for{
            i++
            
            select{
                case  <- c1:
                    a++
                case  <- c2:
                    b++
            }
            d := a+b
            fmt.Println("all:",i,d,a,b)    
        }
        
        fmt.Println("主程序结束")
    }
    all: 74333 74333 36912 37421
    all: 74334 74334 36913 37421
    all: 74335 74335 36913 37422
    all: 74336 74336 36914 37422
    all: 74337 74337 36914 37423
    all: 74338 74338 36914 37424
    all: 74339 74339 36915 37424
    all: 74340 74340 36915 37425
    all: 74341 74341 36916 37425
    all: 74342 74342 36917 37425
    all: 74343 74343 36917 37426
    all: 74344 74344 36918 37426
    all: 74345 74345 36918 37427
    all: 74346 74346 36918 37428

    有default时,select几乎99%都选择了default,这说明每一次循环(每一个时间片)中,有数据的情况只是占这个时间片的一小部分,是极小的一部分,无数据的情况占绝大部分(12635/12755);

    无default时,select在多个channel中的选择机会基本均等(36918/37428),并且,总的循环次数=所有case条件执行次数之和,

    这意味着没有任何一次for循环是轮空的,尽管case有数据的情况只是占用一次循环时间中极小的一部分;

    意味着每一次的for循环,即每一次的select执行,必会等待一个case满足条件,本次select才会结束;

    意味着select是阻塞的,即至少有一个case满足条件时,select才会结束,否则就等待

    再次验证一下

    package main
    import (
        "fmt"
        "time"
    )
    
    func ch1() chan int{
        fmt.Println("接收程序开始")
        
        c1 := make(chan int)
        
        go func(){
            i:=0
            for {
                c1 <- i
                time.Sleep(time.Millisecond*3000)
            }
        }()
    
        
        return c1
    }
    
    func main(){
        a,b,i := 0,0,0
        c1,c2 := ch1(),ch1()
        
        for{
            i++
            
            select{
                case  <- c1:
                    a++
                case  <- c2:
                    b++
            }
            d := a+b
            fmt.Println("all:",i,d,a,b)    
        }
        
        fmt.Println("主程序结束")
    }

    每发送一次数据行,等待3秒;select每次等待3秒输出

    接收程序开始
    接收程序开始
    all: 1 1 0 1
    all: 2 2 1 1
    all: 3 3 2 1
    all: 4 4 2 2
    all: 5 5 3 2
    all: 6 6 3 3
    all: 7 7 4 3
    all: 8 8 4 4

    如果两个channel同时到达select,那么select二选一,即随机选择一个优先输出

    如果不同时到达,则谁先达就选择谁,代码如下

    package main
    
    import (
        "fmt"
        "math/rand"
        "time"
    )
    
    func channel3() chan int{
        out := make(chan int)
        go func(){
            i := 0
            for {
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        var c1,c2 = channel3(),channel3()
        
        for{
            //select可以在channel为nil的时候就去接收数据,
            //哪个channel有数据发送过来就进行接收
            select {
                case n:= <- c1:
                    fmt.Println("来自c1:",n)
                    
                case n:= <- c2:
                    fmt.Println("来自c2:",n)
    
            }
        }
    
    }

     输出

    来自c2: 0
    来自c1: 0
    来自c2: 1
    来自c1: 1
    来自c2: 2
    来自c1: 2
    来自c2: 3
    来自c1: 3
    来自c2: 4
    来自c1: 4
    来自c2: 5
    来自c1: 5
    来自c1: 6
    来自c2: 6
    来自c2: 7
    来自c2: 8
    来自c1: 7
    来自c1: 8

    前面有说,一个channel必须先定义接收处理的程序,才能开始发送数据,这而代码却是先发送数据,之后才是select接收,这不是与之前所说的相矛盾吗?

    那来验证一下,使用最简单的方式先发送再让select接收试一试

    package main
    
    import (
        "fmt"
        //"math/rand"
        //"time"
    )
    /*
    func channel3() chan int{
        out := make(chan int)
        go func(){
            i := 0
            for {
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    */
    
    func main(){
        //var c1,c2 = channel3(),channel3()
        var c1,c2 = make(chan int),make(chan int)
        c1 <- 1
        c2 <- 2
        
        for{
            //select可以在channel为nil的时候就去接收数据,
            //哪个channel有数据发送过来就进行接收
            select {
                case n:= <- c1:
                    fmt.Println("来自c1:",n)
                    
                case n:= <- c2:
                    fmt.Println("来自c2:",n)
    
            }
        }
    }

    输出错误deadlock

    fatal error: all goroutines are asleep - deadlock!
    
    goroutine 1 [chan send]:
    main.main()
        /usr/local/automng/src/goapp/src/test/channel3/chan3.go:27 +0x92
    exit status 2

    那为什么开始的代码没有报错?

    channel是goroutine与goroutine之间的通信

     这是因为发送方位于一个goroutine中(go func(){...}())中,goroutine处理了这个问题,

    前面有程序中只有一个go,也就是只有一个goroutine,为什么也能运行?

    这是因为go程序中,main方法本身也是一个goroutine

    下面的代码就是正确的

    package main
    
    import (
        "fmt"
        //"math/rand"
        //"time"
    )
    /*
    func channel3() chan int{
        out := make(chan int)
        go func(){
            i := 0
            for {
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    */
    
    func main(){
        //var c1,c2 = channel3(),channel3()
        var c1,c2 = make(chan int),make(chan int)
        
        go func(){
            for {
                c1 <- 1
                c2 <- 2
                //time.Sleep(time.Millisecond)
            }
        }()
        
        for{
            //select可以在channel为nil的时候就去接收数据,
            //哪个channel有数据发送过来就进行接收
            select {
                case n:= <- c1:
                    fmt.Println("来自c1:",n)
                    
                case n:= <- c2:
                    fmt.Println("来自c2:",n)
    
            }
        }
    }

    输出,不断地发送1和2,然后又不断地输出1和2

    来自c2: 2
    来自c1: 1
    来自c2: 2
    来自c1: 1
    来自c2: 2
    来自c1: 1
    来自c2: 2
    来自c1: 1
    来自c2: 2
    来自c1: 1
    来自c2: 2
    来自c1: 1
    来自c2: 2
    来自c1: 1
    来自c2: 2
    来自c1: 1
    来自c2: 2
    来自c1: 1
    来自c2: 2
    来自c1: 1

    这里的输出与之前的输出又不一样,之前的输出一片c1或c2,而这里是c1与c2交替式输出,一个c1下来接着就是c2,这样的区别很明显;下面的这段话是个人推测,不保证百分之百是这样哈

    首先,在计算机中我们看到一个功能正在运行,比如复制一个电影,但实际上“一个”CPU还在同时做很多工作,比如你还打开了一个网页在看视频,只是这个CPU处理任务的速度非常快,至少在我们看来,这两个工作都在进行着;但对计算机来说,这些工作是在不同的时间片中完成的,同时IO流也在不同的设备上快速切换着。

    GO程序是非抢占式的,就是GO程序拿到CPU资源之后,自己不释放别人不可以抢走;GO中也有可以抢占式的程序,即别人可以抢走程序执行的控制权,比如IO操作(fmt打印就是输出IO到屏幕),这种操作会中断GO对资源的占用,我们可以认为这是系统需要这样,而GO是运行在系统上才不得不这样;比如一个GO主程序正在执行代码,到fmt的时候,在一些时间片上系统抢了GO程序的资源干了别的事,然后又把资源还给了GO程序。

    channel操作也是非抢占式的,先给哪个channel发送数据,那么这个channel如果不被中断,那么它就会先输出,先输出就会被select调度先输出;for中先c1,然后 fmt中断一次,再c2再fmt中断一次,然后下一轮for循环,所以保持了c1,c2,c1,c2……的输出顺序

    而成片输出c1或c2的代码中,channel的创建是在方法中,数据发送则是在两个不同的goroutine中,这两个goroutine是并行执行,而不像for里面c1与c2,

    for {
                c1 <- 1
                c2 <- 2
                //time.Sleep(time.Millisecond)
            }

    程序必定是先c1再c2这么有顺序,下面将变量i变成全局变量,再次执行之前的代码

    文章有些长了,再次重贴一下之前的代码,重运行看效果

    package main
    
    import (
        "fmt"
        //"math/rand"
        //"time"
    )
    
    var i int = 0
    func channel3() chan int{
        out := make(chan int)
        go func(){
            for {
                //time.Sleep(time.Duration(rand.Intn(1000))*
                //    time.Millisecond)
                //time.Sleep(100*time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        var c1,c2 = channel3(),channel3()
        
        for{
            //select可以在channel为nil的时候就去接收数据,
            //哪个channel有数据发送过来就进行接收
            select {
                case n:= <- c1:
                    fmt.Println("来自c1:",n)
                    
                case n:= <- c2:
                    fmt.Println("来自c2:",n)
    
            }
        }
    }

    输出,可以看到成片的c2中有一个c1

    来自c2: 5445
    来自c2: 5446
    来自c2: 5447
    来自c1: 5448
    来自c2: 5448
    来自c2: 5449
    来自c2: 5450
    来自c2: 5451
    来自c2: 5452
    来自c2: 5453
    来自c2: 5454
    来自c2: 5455
    来自c2: 5456

    重点是,有两个5448,这里i是全局变量,每一次取i之后,程序都对代码进行了加1(i++),理论上不应该出现相同的数据; 但实际上在并行处理共享资源的时候都会有这个问题,这个问题暂时跳过,我们先讨论非共享资源的处理; 处理的思路不是加锁,而是前面提到的那个重要理念“不要通过共享内存来通信;通过通信来共享内存。”,我们建立一个channel来处理这个共享资源,这里先跳过这个问题。

    [root@itoracle channel3]# go run chan3.go > /tmp/a.txt
    ^Z
    [10]+  Stopped                 go run chan3.go > /tmp/a.txt
    
    [root@itoracle channel3]# cat /tmp/a.txt |grep c1 |wc -l
    298250
    [root@itoracle channel3]# cat /tmp/a.txt |grep c2 |wc -l
    274977

    之所以成片,是因为计算机处理速度太快了,将结果输出到文本中进行统计c1与c2的数量差不多,而对于计算机来说,一个很短的时间片却可以处理很多工作

    我们在发送数据的时候等待1毫秒,输出结果就不成片连接了

    package main
    
    import (
        "fmt"
        //"math/rand"
        //"time"
    )
    
    var i int = 0
    func channel3() chan int{
        out := make(chan int)
        go func(){
            for {
                //time.Sleep(time.Duration(rand.Intn(1000))*
                //    time.Millisecond)
                time.Sleep(100*time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        var c1,c2 = channel3(),channel3()
        
        for{
            //select可以在channel为nil的时候就去接收数据,
            //哪个channel有数据发送过来就进行接收
            select {
                case n:= <- c1:
                    fmt.Println("来自c1:",n)
                    
                case n:= <- c2:
                    fmt.Println("来自c2:",n)
    
            }
        }
    }

    输出

    来自c1: 0
    来自c2: 1
    来自c2: 2
    来自c1: 3
    来自c1: 4
    来自c2: 5
    来自c2: 6
    来自c1: 7
    来自c1: 8
    来自c2: 9
    来自c2: 10
    来自c1: 11
    来自c1: 12
    来自c2: 13
    来自c2: 14
    来自c1: 15
    来自c1: 16
    来自c2: 17
    来自c2: 18

     未完,准备出去吃个午饭……

    select只收不发会怎么样

    package main
    
    import (
        "fmt"
    )
    
    func channel3() chan int{
        out := make(chan int)
        go func(){
            i := 1
            out <- i
            i++
            close(out)
        }()
        return out
    }
    
    
    func main(){
        var c1,c2 = channel3(),channel3()
        
        for{
            select {
                case n:= <- c1:
                    fmt.Println("来自c1:",n)
                    
                case n:= <- c2:
                    fmt.Println("来自c2:",n)
            }
        }
    }

    输出,除了第一次输出正常外,之后的全输出的是0

    来自c2: 1
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0
    来自c2: 0

     这意味着,如果发送方不是一直在发送数据的话,接收方就会输出0;我们必须在接收方处理这个问题。当有数据发送的时候,接收方才接收。

    select不是一直发数据,接收会怎么处理

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func createWorker(id int) chan<- int{
        c := make(chan int)
        //创建channel之后,就将接收处理放入了goroutine中
        go doWork(id,c)
        return c
    }
    
    func doWork(id int,c chan int){
        for n := range c {
            fmt.Printf("worker %d 接收的信息为 %d
    ",id,n)
        }
    }
    
    var i int = 0
    func channel3() chan int{
        out := make(chan int)
        go func(){
            for {
                //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        //创建两个channel,并发送数据
        var c1,c2 = channel3(),channel3()
        i := 0
        
        //创建一个channel,并接收数据
        wk := createWorker(i)
        for{
            i++
            fmt.Printf("第%d次 
    ",i)
            select {
                case n:= <- c1:
                    wk <- n
                    
                case n:= <- c2:
                    wk <- n
            }
        }
    }

    输出,不是每次for循环中select都能收到数据,没有收到数据时,就跳过了for循环

    第13次 
    worker 0 接收的信息为 11
    第14次 
    worker 0 接收的信息为 12
    worker 0 接收的信息为 13
    第15次 
    第16次 
    worker 0 接收的信息为 14
    worker 0 接收的信息为 15
    第17次 
    第18次 
    worker 0 接收的信息为 16
    worker 0 接收的信息为 17
    第19次 
    第20次 
    worker 0 接收的信息为 18
    worker 0 接收的信息为 19
    第21次 
    第22次 
    worker 0 接收的信息为 20
    worker 0 接收的信息为 21

     这里解读一下这段代码的意思,为后面更复杂的程序作铺垫

    select {
                case n:= <- c1:
                    wk <- n
                    
                case n:= <- c2:
                    wk <- n
            }

    在select的代码块中,如果c1 这个channel有数据发送过来,那么就将数据赋值给变量n;如果c2这个channel有数据发送过来,那么就将数据赋值给变量n;n是一个channel,然后又将这个channel传递给了wk(wk也是一个channel),由wk负责将传递过来的数据输出; 如果有一轮for循环中(一个时间 片段中),c1和c2都没有数据传递过来,那么就轮空或者阻塞。这里是两个channel往一个channel中写数据。

    select中不仅可以channel输出,还可以有channel输入

    package main
    
    import (
        "fmt"
        "math/rand"
        "time"
    )
    
    func createWorker(id int) chan<- int{
        c := make(chan int)
        //创建channel之后,就将接收处理放入了goroutine中
        go doWork(id,c)
        return c
    }
    
    func doWork(id int,c chan int){
        for n := range c {
            fmt.Printf("worker %d 接收的信息为 %d
    ",id,n)
        }
    }
    
    var i int = 0
    func channel3() chan int{
        out := make(chan int)
        go func(){
            for {
                //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        //创建两个channel,并发送数据
        var c1,c2 = channel3(),channel3()
        i := 0
        
        //创建一个channel,并接收数据
        wk := createWorker(i)
        hasDAta := false
        n := 0
        for{
            i++
            fmt.Printf("第%d次 
    ",i)
            
            var datawk chan<- int //nil
            if hasDAta {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
                datawk = wk
            }
            
            select {
                case n = <- c1:
                    hasDAta = true //有数据来了
                case n = <- c2:
                    hasDAta = true //有数据来了
                case datawk <- n:
                    hasDAta = false//数据被转移走了(被输出了)
            }
        }
    }

    输出

    第1次 
    第2次 
    第3次 
    worker 0 接收的信息为 0
    第4次 
    第5次 
    worker 0 接收的信息为 1
    第6次 
    第7次 
    worker 0 接收的信息为 2
    第8次 
    第9次 
    worker 0 接收的信息为 3
    第10次 
    第11次 
    worker 0 接收的信息为 4
    第12次 
    第13次 
    worker 0 接收的信息为 5

    channel是一种缓冲,是一种通信缓存;张三向李四抛了一个鸡蛋,如果李四还没准备好接这个鸡蛋,那么鸡蛋就会摔碎在地上,当然张三没有这么笨,他会等李四准备好之后再抛;如果有一个缓冲,张三不用管李四是否准备好,张三有鸡蛋就往缓冲里放,李四发现缓冲有鸡蛋就去取,以通信的方式来达到资源共享,这就是channel。

    channel中如果发送数据的速度大于接收的速度,则数据丢失 

    package main
    
    import (
        "fmt"
        "math/rand"
        "time"
    )
    
    func createWorker(id int) chan<- int{
        c := make(chan int)
        //创建channel之后,就将接收处理放入了goroutine中
        go doWork(id,c)
        return c
    }
    
    func doWork(id int,c chan int){
        for n := range c {
            time.Sleep(time.Second)
            fmt.Printf("worker %d 接收的信息为 %d
    ",id,n)
        }
    }
    
    var i int = 0
    func channel3() chan int{
        out := make(chan int)
        go func(){
            for {
                //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        //创建两个channel,并发送数据
        var c1,c2 = channel3(),channel3()
        i := 0
        
        //创建一个channel,并接收数据
        wk := createWorker(i)
        hasDAta := false
        n := 0
        for{
            i++
            fmt.Printf("第%d次 
    ",i)
            
            var datawk chan<- int //nil
            if hasDAta {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
                datawk = wk
            }
            
            select {
                case n = <- c1:
                    hasDAta = true //有数据来了
                case n = <- c2:
                    hasDAta = true //有数据来了
                case datawk <- n:
                    hasDAta = false//数据被转移走了(被输出了)
            }
        }
    }

    接收时等待1秒再接收数据,发送是连接发送0,1,2,3,……,输出结果中显示很多数据丢失了

    第1次 
    第2次 
    第3次 
    第4次 
    第5次 
    第6次 
    第7次 
    worker 0 接收的信息为 0
    第8次 
    第9次 
    第10次 
    第11次 
    第12次 
    worker 0 接收的信息为 4
    第13次 
    第14次 
    第15次 
    第16次 
    第17次 
    第18次 
    第19次 
    worker 0 接收的信息为 8
    第20次 

     数据的丢失,并不是因为channel没有输出,而是我们在等待1秒后去打印n的时候,已经是n被多次赋值后的结果了,我们并不是n每次变化就打印n; 同时也表示轮空并不是n没有被赋值,channel没有输出。

    package main
    
    import (
        "fmt"
        "math/rand"
        "time"
    )
    
    func createWorker(id int) chan<- int{
        c := make(chan int)
        //创建channel之后,就将接收处理放入了goroutine中
        go doWork(id,c)
        return c
    }
    
    func doWork(id int,c chan int){
        for n := range c {
            time.Sleep(time.Second)
            fmt.Printf("worker %d 接收的信息为 %d
    ",id,n)
        }
    }
    
    var i int = 0
    func channel3() chan int{
        out := make(chan int)
        go func(){
            for {
                //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        //创建两个channel,并发送数据
        var c1,c2 = channel3(),channel3()
        i := 0
        
        //创建一个channel,并接收数据
        wk := createWorker(i)
        hasDAta := false
        n := 0
        for{
            i++
            fmt.Printf("第%d次 
    ",i)
            
            var datawk chan<- int //nil
            if hasDAta {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
                datawk = wk
            }
            
            select {
                case n = <- c1:
                    hasDAta = true //有数据来了
                    fmt.Printf("c1 n=%d 
    ",n)
                case n = <- c2:
                    hasDAta = true //有数据来了
                    fmt.Printf("c2 n=%d 
    ",n)
                case datawk <- n:
                    hasDAta = false//数据被转移走了(被输出了)
            }
        }
    }

    输出

    第1次 
    c1 n=0 
    第2次 
    第3次 
    c2 n=1 
    第4次 
    c1 n=2 
    第5次 
    c2 n=3 
    第6次 
    c1 n=4 
    第7次 
    worker 0 接收的信息为 0
    第8次 
    c2 n=5 
    第9次 
    c1 n=6 
    第10次 
    c2 n=7 
    第11次 
    c1 n=8 
    第12次 
    worker 0 接收的信息为 4
    第13次 
    c2 n=9 
    第14次 

    为channel接收方加上缓存处理

    package main
    
    import (
        "fmt"
        "math/rand"
        "time"
    )
    
    func createWorker(id int) chan<- int{
        c := make(chan int)
        //创建channel之后,就将接收处理放入了goroutine中
        go doWork(id,c)
        return c
    }
    
    func doWork(id int,c chan int){
        for n := range c {
            time.Sleep(time.Second)
            fmt.Printf("worker %d 接收的信息为 %d
    ",id,n)
        }
    }
    
    var i int = 0
    func channel3() chan int{
        out := make(chan int)
        go func(){
            for {
                //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        //创建两个channel,并发送数据
        var c1,c2 = channel3(),channel3()
        i := 0
        
        //创建一个channel,并接收数据
        wk := createWorker(i)
        n := 0
        
        var values []int
        for{
            i++
            fmt.Printf("第%d次 
    ",i)
            
            var datawk chan<- int //nil
            var currentValue int
            if len(values) > 0 {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
                datawk = wk
                currentValue = values[0]
            }
            
            select {
                case n = <- c1:
                    values = append(values,n)
                case n = <- c2:
                    values = append(values,n)
                case datawk <- currentValue: //消费掉列表中的第一个值
                    values = values[1:]      //列表中的数据前移一位
            }
        }
    }

    输出

    第31次 
    第32次 
    worker 0 接收的信息为 4
    第33次 
    第34次 
    第35次 
    第36次 
    worker 0 接收的信息为 5
    第37次 
    第38次 
    第39次 
    第40次 
    第41次 
    worker 0 接收的信息为 6
    第42次 
    第43次 
    第44次 
    第45次 
    worker 0 接收的信息为 7
    第46次 

    这样可以看到接收到的数据是连接的,不再断续;轮空代表着select循环的次数,也代表着计算机最快的处理速度,之后的程序不再输出这个。

     设置channel超时时间

    package main
    
    import (
        "fmt"
        "math/rand"
        "time"
    )
    
    func createWorker(id int) chan<- int{
        c := make(chan int)
        //创建channel之后,就将接收处理放入了goroutine中
        go doWork(id,c)
        return c
    }
    
    func doWork(id int,c chan int){
        for n := range c {
            time.Sleep(time.Second)
            fmt.Printf("worker %d 接收的信息为 %d
    ",id,n)
        }
    }
    
    var i int = 0
    func channel3() chan int{
        out := make(chan int)
        go func(){
            for {
                //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        //创建两个channel,并发送数据
        var c1,c2 = channel3(),channel3()
        
        //创建一个channel,并接收数据
        wk := createWorker(0)
        n := 0
        
        //时间channel
        end := time.After(10*time.Second)
        i := 0 
        var values []int
        for{
    
            var datawk chan<- int //nil
            var currentValue int
            if len(values) > 0 {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
                datawk = wk
                currentValue = values[0]
            }
            
            select {
                case n = <- c1:
                    values = append(values,n)
                case n = <- c2:
                    values = append(values,n)
                case datawk <- currentValue: //消费掉列表中的第一个值
                    values = values[1:]      //列表中的数据前移一位
                    
                //一定时间内程序未接收到数据就提示timeout
                case <- time.After(450 * time.Millisecond):
                    i++
                    fmt.Println("time out ",i)
                case <- end:
                    fmt.Println("game over")
                    return
            }
        }
    }

    输出

    time out  1
    worker 0 接收的信息为 0
    worker 0 接收的信息为 1
    time out  2
    worker 0 接收的信息为 2
    worker 0 接收的信息为 3
    worker 0 接收的信息为 4
    worker 0 接收的信息为 5
    worker 0 接收的信息为 6
    time out  3
    worker 0 接收的信息为 7
    worker 0 接收的信息为 8
    game over

     显出channel缓存队列数据积压程度

    package main
    
    import (
        "fmt"
        "math/rand"
        "time"
    )
    
    func createWorker(id int) chan<- int{
        c := make(chan int)
        //创建channel之后,就将接收处理放入了goroutine中
        go doWork(id,c)
        return c
    }
    
    func doWork(id int,c chan int){
        for n := range c {
            time.Sleep(time.Second)
            fmt.Printf("worker %d 接收的信息为 %d
    ",id,n)
        }
    }
    
    var i int = 0
    func channel3() chan int{
        out := make(chan int)
        go func(){
            for {
                //随机停止0-1秒的时间,即发送方不是一直都在发数据,注意不是不再发了(即不是close),只是没有一直发
                time.Sleep(time.Duration(rand.Intn(1000))*
                    time.Millisecond)
                out <- i
                i++
            }
        }()
        return out
    }
    
    
    func main(){
        //创建两个channel,并发送数据
        var c1,c2 = channel3(),channel3()
        
        //创建一个channel,并接收数据
        wk := createWorker(0)
        n := 0
        
        //时间channel
        end := time.After(10*time.Second)
        i := 0 
        var values []int
        
        //Tick会每过一个时间间隔,发送一个值到channel
        tick := time.Tick(2*time.Second)
        for{
    
            var datawk chan<- int //nil
            var currentValue int
            if len(values) > 0 {//当有数据时,再对datawk初始化,这样datawk就能够输出数据
                datawk = wk
                currentValue = values[0]
            }
            
            select {
                case n = <- c1:
                    values = append(values,n)
                case n = <- c2:
                    values = append(values,n)
                case datawk <- currentValue: //消费掉列表中的第一个值;如果该条件被轮空跳过,那么数据会写入队列,而不是因覆盖而消失,所以数据不会丢;并且当前值永远是队列的第一个值,也不会变
                    values = values[1:]      //列表中的数据前移一位;只有消费一次,当前值才会变化,不因每次for循环datawk都会被置为nil而出现误差。
                    
                case <- tick:
                    fmt.Println("队列当前长度为:",len(values))
                    
                //一定时间内程序未接收到数据就提示timeout
                case <- time.After(600 * time.Millisecond):
                    i++
                    fmt.Println("time out ",i)
                case <- end:
                    fmt.Println("game over")
                    return
            }
        }
    }

    输出

    time out  1
    worker 0 接收的信息为 0
    队列当前长度为: 7
    worker 0 接收的信息为 1
    worker 0 接收的信息为 2
    队列当前长度为: 16
    worker 0 接收的信息为 3
    worker 0 接收的信息为 4
    队列当前长度为: 23
    worker 0 接收的信息为 5
    worker 0 接收的信息为 6
    队列当前长度为: 28
    worker 0 接收的信息为 7
    worker 0 接收的信息为 8
    game over

     基础条件:

    case的条件谁先成立,就先执行谁;同时成立,则按一定算法处理,可简单认为随机处理。

    这次未被处理的条件,下次条件必定为真,但若同时还有为真的条件,它未必就一定会优先处理,依然有一定随机性。

    这是前面数据丢失的原因,正在基于这一点,后面设计了缓存队列。

    case在进行条件判断的同时,也是一次数据处理,将currentValue传入datawk channel,而datawk chanel中一旦有数据,就会进行处理,这里是打印输出。

    case datawk <- currentValue

    原理

    func createWorker(id int) chan<- int,这一步创建一个通道,并将这个通道的地址返回给主程序,同时,开启一个goroutine尝试从这个通道取数据;主程序就可以不断地向这个通道存放数据,不存放时候,获取数据的goroutine将处理阻塞状态。

     其他

    wk <- 1 为一个channel赋值,然后goroutine再对其处理,处理的过程是不会阻塞主程序的执行的,可能会出现goroutine还没处理完,主程序就已经执行完的情况。

    文件写:

    为每个文件创建一个列表

    多个channel往列表中放数据

    每个列表有一个channel负责从列表中取出数据写入文件

  • 相关阅读:
    UVa 10118 记忆化搜索 Free Candies
    CodeForces 568B DP Symmetric and Transitive
    UVa 11695 树的直径 Flight Planning
    UVa 10934 DP Dropping water balloons
    CodeForces 543D 树形DP Road Improvement
    CodeForces 570E DP Pig and Palindromes
    HDU 5396 区间DP 数学 Expression
    HDU 5402 模拟 构造 Travelling Salesman Problem
    HDU 5399 数学 Too Simple
    CodeForces 567F DP Mausoleum
  • 原文地址:https://www.cnblogs.com/perfei/p/10703964.html
Copyright © 2011-2022 走看看