zoukankan      html  css  js  c++  java
  • channel补充

    无缓冲管道 :

    指在接收前没有能力保存任何值的通道,这种类型通道要求发送gorouutine和接收goroutine同时准备好,才能完成发送和接收操作。如果两个goroutine没有同时准备好,

    通道会导致先执行发送或者接收的goroutine阻塞等待,这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在

    模拟打羽毛球:发球 接球

    package main
    
    import (
        "fmt"
        "math/rand"
        "sync"
        "time"
    )
    
    var wg sync.WaitGroup
    
    func init(){
        rand.Seed(time.Now().Unix())
    }
    
    func main(){
        court := make(chan int)
        wg.Add(2)
    
        go player("发球",court)
        go player("接球",court)
        court <- 1
        wg.Wait()
    }
    
    func player(name string,court chan int)  {
        defer wg.Done()
    
        for{
            ball,ok:= <-court
            if !ok{
                fmt.Printf("player %s won
    ",name)
                return
            }
            n:=rand.Intn(100)
            if n%13 ==0{
                fmt.Printf("player %s Missed
    ",name)
                close(court)
                return
            }
            fmt.Printf("player %s Hit %d
    ",name,ball)
            ball++
            court<- ball
        }
    }
    GOROOT=D:go #gosetup
    GOPATH=D:gospaces #gosetup
    D:goingo.exe build -o C:UsersAdministratorAppDataLocalTemp\___go_build_listen20_go.exe D:/gocode/test/listen20.go #gosetup
    "D:softgoland2018.3GoLand 2018.3.5in
    unnerw64.exe" C:UsersAdministratorAppDataLocalTemp\___go_build_listen20_go.exe #gosetup
    player 接球 Hit 1
    player 发球 Hit 2
    player 接球 Hit 3
    player 发球 Hit 4
    player 接球 Hit 5
    player 发球 Hit 6
    player 接球 Hit 7
    player 发球 Hit 8
    player 接球 Hit 9
    player 发球 Hit 10
    player 接球 Hit 11
    player 发球 Hit 12
    player 接球 Hit 13
    player 发球 Hit 14
    player 接球 Hit 15
    player 发球 Hit 16
    player 接球 Hit 17
    player 发球 Hit 18
    player 接球 Hit 19
    player 发球 Hit 20
    player 接球 Hit 21
    player 发球 Hit 22
    player 接球 Hit 23
    player 发球 Hit 24
    player 接球 Missed
    player 发球 won

    4名跑步者围绕赛道轮流跑 接力棒

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    var wg sync.WaitGroup
    
    func main(){
        //创建一个无缓冲的通道
        baton := make(chan int)
        wg.Add(1)
        //第一位跑步者持有接力棒
        go Runner(baton)
    
        //开始比赛
        baton <- 1
    
        wg.Wait()
    }
    
    //Runner 模拟接力比赛中的一位跑步者
    func Runner(baton chan int){
        var newRunner int
        //等待接力棒
        runner := <- baton
        //开始绕着跑道跑步
        fmt.Printf("运动员 %d 到跑到这里来准备... 
    ",runner)
    
        //创建下一位跑步者
        if runner != 4{
            newRunner = runner+1
            fmt.Printf("运动员 %d 开始跑... 
    ",runner)
            go Runner(baton)
        }
    
    
        //围绕着跑到跑步跑100毫秒
        time.Sleep(1000000*time.Microsecond)
    
        //比赛结束了吗?
        if runner ==4 {
            fmt.Printf("运动员 %d finished ,race over 
    ",runner)
            wg.Done()
            return
        }
    
        fmt.Printf("运动员 %d 跑完一圈了开始和运动员 %d 交换接力棒... 
    ",runner,newRunner)
        baton <- newRunner
    }

    运动员 1 到跑到这里来准备...
    运动员 1 开始跑...
    运动员 1 跑完一圈了开始和运动员 2 交换接力棒...
    运动员 2 到跑到这里来准备...
    运动员 2 开始跑...
    运动员 2 跑完一圈了开始和运动员 3 交换接力棒...
    运动员 3 到跑到这里来准备...
    运动员 3 开始跑...
    运动员 3 跑完一圈了开始和运动员 4 交换接力棒...
    运动员 4 到跑到这里来准备...
    运动员 4 finished ,race over

    有缓冲的通道:

    有缓冲的通道是一种在被接收前能够存储一个或者多个值的通道,这种类型的通道并不强制要求协程之间必须同时完成发送和接收。通道会阻塞发送和接收的

    动作的条件也会不同,只有在通道中没有要接受的值时,接收动作才会阻塞。只有在通道没有可用的缓冲的去容纳被发送的值时,发送才会阻塞。这回导致有缓冲

    的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的协程会在同一时间进行数据交换;有缓冲的通道没有这种保证

    package main
    
    import (
        "fmt"
        "math/rand"
        "sync"
        "time"
    )
    
    const(
        numberGoroutines = 4 //要使用的gotoutine的数量
        taskLoad = 10 //要处理的工作的数量
    )
    
    //wg 用来等待程序完成
    var wg sync.WaitGroup
    
    //优先执行这个函数
    func init(){
        rand.Seed(time.Now().Unix())
    }
    
    
    //main 是所有go程序入口
    func main(){
        //创建一个有缓冲的通道来管理工作
        tasks := make(chan string,taskLoad)
    
        //启动协程来处理工作
        wg.Add(numberGoroutines)
        for gr := 1;gr<=numberGoroutines;gr++ {
            go worker(tasks,gr)
        }
    
        //增加一组要完成的工作
        for post := 1;post<=taskLoad;post++{
            tasks <- fmt.Sprintf("task :%d",post)
        }
    
        //当所有工作都处理完成是关闭管道
        //以便所有goroutine推出
        //很多人此处有疑问:为啥任务没处理完就关闭了
        //答:当任务关闭后 协程依然可以从通道中接收数据,但是不能像通道发送数据,能够从已经关闭的通道接收数据这点非常非常重要
        //因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不丢失数据
        close(tasks)
    
        //等待所有工作完成
        wg.Wait()
    }
    
    
    //worker 作为goroutine的启动来处理
    //从有缓冲的通道传入的工作
    func worker(tasks chan string,worker int){
        //通知函数已经返回、
        defer wg.Done()
    
        for{
            //等待分配工作
            task,ok:= <- tasks
            if !ok{
                //这里意味着通道已经空了,并且已经被关闭
                fmt.Printf("worker;%d:shutting down
    ",worker)
                return
            }
    
            //显示我们开始工作
            fmt.Printf("worker :%d:start %s
    ",worker,task)
    
            //随机等待一段时间模拟工作
            sleep := rand.Int63n(100)
            time.Sleep(time.Duration(sleep)*time.Microsecond)
    
            //显示我们完成了工作
            fmt.Printf("worker:%d:completed %s
    ",worker,task)
        }
    
    }
    View Code
    GOROOT=D:go #gosetup
    GOPATH=D:gospaces #gosetup
    D:goingo.exe build -o C:UsersAdministratorAppDataLocalTemp\___go_build_listen20_go.exe D:/gocode/test/listen20.go #gosetup
    "D:softgoland2018.3GoLand 2018.3.5in
    unnerw64.exe" C:UsersAdministratorAppDataLocalTemp\___go_build_listen20_go.exe #gosetup
    worker :4:start task :4
    worker :3:start task :3
    worker :1:start task :1
    worker :2:start task :2
    worker:3:completed task :3
    worker :3:start task :5
    worker:4:completed task :4
    worker :4:start task :6
    worker:1:completed task :1
    worker :1:start task :7
    worker:2:completed task :2
    worker :2:start task :8
    worker:2:completed task :8
    worker:3:completed task :5
    worker :3:start task :10
    worker:4:completed task :6
    worker;4:shutting down
    worker :2:start task :9
    worker:1:completed task :7
    worker;1:shutting down
    worker:3:completed task :10
    worker;3:shutting down
    worker:2:completed task :9
    worker;2:shutting down
    
    Process finished with exit code 0

    网易:

    package main
    
    import (
        "fmt"
    )
    
    func main() {
        var c chan int
        fmt.Printf("c=%v
    ", c)
    
        c = make(chan int, 1)
        fmt.Printf("c=%v
    ", c)
        c <- 100
    
        /*
            data := <-c
            fmt.Printf("data:%v
    ", data)
        */
        <-c
    }

    nobufChan 不带缓冲(不带大小的chan 无法插入数据的,只有当有人在获取数据时候才可以放入数据)

    比如:收快递:只有快递员见到你本人后,只能寄快递

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func produce(c chan int) {
        c <- 1000
        fmt.Println("produce finished")
    }
    
    func consume(c chan int) {
        data := <-c
        fmt.Println(data)
    }
    
    func main() {
        var c chan int
        fmt.Printf("c=%v
    ", c)
    
        c = make(chan int)
        go produce(c)
        go consume(c)
        time.Sleep(time.Second * 5)
    }

    goroutine_sync 模拟sleep阻塞的功能

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func hello(c chan bool) {
        time.Sleep(5 * time.Second)
        fmt.Println("hello goroutine")
    
        c <- true
    }
    
    func main() {
        var exitChan chan bool
        exitChan = make(chan bool)
        go hello(exitChan)
        fmt.Println("main thread terminate")
        <-exitChan
    }

     只读 只写的chan

    package main
    
    import "fmt"
    
    func sendData(sendch chan<- int) {
        sendch <- 10
        //<-sendch
    }
    
    func readData(sendch <-chan int) {
        //sendch <- 10
        data := <-sendch
        fmt.Println(data)
    }
    
    func main() {
        chnl := make(chan int)
        go sendData(chnl)
        readData(chnl)
    }

    判断管道是否关闭

    package main
    
    import (
        "fmt"
    )
    
    func producer(chnl chan int) {
        for i := 0; i < 10; i++ {
            chnl <- i
        }
        close(chnl)
    }
    
    func main() {
        ch := make(chan int)
        go producer(ch)
        for {
            v, ok := <-ch
            if ok == false {
                fmt.Println("chan is closed")
                break
            }
            fmt.Println("Received ", v)
        }
    }

    for-range-chan  不需要关注管道是否关闭 管道关闭后 自动退出循环

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func producer(chnl chan int) {
    	for i := 0; i < 10; i++ {
    		chnl <- i
    		time.Sleep(time.Second)
    	}
    	close(chnl)
    }
    
    func main() {
    	ch := make(chan int)
    	go producer(ch)
    	for v := range ch {
    		fmt.Println("receive:", v)
    	}
    }
    

      

     待缓冲的chan(容量)

    特点:当没有往chan放入数据,直接去获取数据就会报错(死锁);当超过chan容量后,继续放入数据也会报错(死锁)

    package main
    
    import "fmt"
    
    func main() {
        ch := make(chan string, 2)
        var s string
        //s = <-ch
        ch <- "hello"
        ch <- "world"
        ch <- "!"
        //ch <- "test"
        s1 := <-ch
        s2 := <-ch
    
        fmt.Println(s, s1, s2)
    }
    View Code

    待缓冲的chan

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func write(ch chan int) {
        for i := 0; i < 5; i++ {
            ch <- i
            fmt.Println("successfully wrote", i, "to ch")
        }
        close(ch)
    }
    func main() {
        ch := make(chan int, 2)
        go write(ch)
        time.Sleep(2 * time.Second)
        for v := range ch {
            fmt.Println("read value", v, "from ch")
            time.Sleep(2 * time.Second)
        }
    }
    View Code

    长度和容量

        
    package main
    
    import (
        "fmt"
    )
    
    func main() {
        ch := make(chan string, 3)
        ch <- "naveen"
        ch <- "paul"
        fmt.Println("capacity is", cap(ch))
        fmt.Println("length is", len(ch))
        fmt.Println("read value", <-ch)
        fmt.Println("new length is", len(ch))
    }
    View Code

    如何等待一组goroutine结束?

    方法1:low版本

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func process(i int, ch chan bool) {
        fmt.Println("started Goroutine ", i)
        time.Sleep(2 * time.Second)
        fmt.Printf("Goroutine %d ended
    ", i)
        ch <- true
    }
    func main() {
        no := 3
        exitChan := make(chan bool, no)
        for i := 0; i < no; i++ {
            go process(i, exitChan)
        }
        for i := 0; i < no; i++ {
            <-exitChan
        }
        fmt.Println("All go routines finished executing")
    }

    方法2:sync.WaitGroup

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func process(i int, wg *sync.WaitGroup) {
        fmt.Println("started Goroutine ", i)
        time.Sleep(2 * time.Second)
        fmt.Printf("Goroutine %d ended
    ", i)
        wg.Done()
    }
    func main() {
        no := 3
        var wg sync.WaitGroup
        wg.Wait()
        fmt.Println("wait return")
        for i := 0; i < no; i++ {
            wg.Add(1)
            go process(i, &wg)
        }
        wg.Wait()
        fmt.Println("All go routines finished executing")
    }

    workerpool的实现

    woker池的实现

    a,生产者,消费者模型,简单有效

    b,控制goroutine的数量,防止goroutine泄露和暴涨

    c,基于goroutine和chan,构建wokerpool非常简单

     1,任务抽象程一个个job

    2,使用job队列和result队列

    3,开一个组goroutine进行实际任务计算,并把结果放回result队列

     案例:

    package main
    
    import (
        "fmt"
        "math/rand"
    )
    
    type Job struct {
        Number int
        Id     int
    }
    
    type Result struct {
        job *Job
        sum int
    }
    
    func calc(job *Job, result chan *Result) {
        var sum int
        number := job.Number
        for number != 0 {
            tmp := number % 10
            sum += tmp
            number /= 10
        }
    
        r := &Result{
            job: job,
            sum: sum,
        }
    
        result <- r
    }
    
    func Worker(jobChan chan *Job, resultChan chan *Result) {
    
        for job := range jobChan {
            calc(job, resultChan)
        }
    }
    
    func startWorkerPool(num int, jobChan chan *Job, resultChan chan *Result) {
    
        for i := 0; i < num; i++ {
            go Worker(jobChan, resultChan)
        }
    }
    
    func printResult(resultChan chan *Result) {
        for result := range resultChan {
            fmt.Printf("job id:%v number:%v result:%d
    ", result.job.Id, result.job.Number, result.sum)
        }
    }
    
    func main() {
    
        jobChan := make(chan *Job, 1000)
        resultChan := make(chan *Result, 1000)
    
        startWorkerPool(128, jobChan, resultChan)
    
        go printResult(resultChan)
        var id int
        for {
            id++
            number := rand.Int()
            job := &Job{
                Id:     id,
                Number: number,
            }
    
            jobChan <- job
        }
    }
    View Code

     select 

     

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func server1(ch chan string) {
        time.Sleep(time.Second * 6)
        ch <- "response from server1"
    }
    
    func server2(ch chan string) {
        time.Sleep(time.Second * 3)
        ch <- "response from server2"
    }
    
    func main() {
        output1 := make(chan string)
        output2 := make(chan string)
    
        go server1(output1)
        go server2(output2)
        /*
            s1 := <-output1
            fmt.Println("s1:", s1)
            s2 := <-output2
            fmt.Println("s2:", s2)
        */
    
        select {
        case s1 := <-output1:
            fmt.Println("s1:", s1)
        case s2 := <-output2:
            fmt.Println("s2:", s2)
        default:
            fmt.Println("run default")
        }
    }
    View Code
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func write(ch chan string) {
        for {
            select {
            case ch <- "hello":
                fmt.Println("write succ")
            default:
                fmt.Println("channel is full")
            }
            time.Sleep(time.Millisecond * 500)
        }
    }
    
    func main() {
        //select {}
    
        output1 := make(chan string, 10)
    
        go write(output1)
        for s := range output1 {
            fmt.Println("recv:", s)
            time.Sleep(time.Second)
        }
    }
    View Code

    sync.Mutex

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    var x int
    var wg sync.WaitGroup
    var mutex sync.Mutex
    
    func add() {
        for i := 0; i < 5000; i++ {
            mutex.Lock()
            x = x + 1
            mutex.Unlock()
        }
        wg.Done()
    }
    
    func main() {
    
        wg.Add(2)
        go add()
        go add()
    
        wg.Wait()
        fmt.Println("x:", x)
    }
    View Code

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    var rwlock sync.RWMutex
    var x int
    var wg sync.WaitGroup
    
    func write() {
        rwlock.Lock()
        fmt.Println("write lock")
        x = x + 1
        time.Sleep(10 * time.Second)
        fmt.Println("write unlock")
        rwlock.Unlock()
        wg.Done()
    }
    
    func read(i int) {
        fmt.Println("wait for rlock")
        rwlock.RLock()
        fmt.Printf("goroutine:%d x=%d
    ", i, x)
        time.Sleep(time.Second)
        rwlock.RUnlock()
        wg.Done()
    }
    
    func main() {
    
        wg.Add(1)
        go write()
        time.Sleep(time.Millisecond * 5)
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go read(i)
        }
    
        wg.Wait()
    
    }
    读锁写锁

    互斥锁和读写锁比较

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    var rwlock sync.RWMutex
    var x int
    var wg sync.WaitGroup
    var mutex sync.Mutex
    
    func write() {
        for i := 0; i < 100; i++ {
            //rwlock.Lock()
            mutex.Lock()
            x = x + 1
            time.Sleep(10 * time.Millisecond)
            mutex.Unlock()
            //rwlock.Unlock()
        }
        wg.Done()
    }
    
    func read(i int) {
        for i := 0; i < 100; i++ {
            //rwlock.RLock()
            mutex.Lock()
            time.Sleep(time.Millisecond)
            mutex.Unlock()
            //rwlock.RUnlock()
        }
        wg.Done()
    }
    
    func main() {
    
        start := time.Now().UnixNano()
        wg.Add(1)
        go write()
    
        for i := 0; i < 100; i++ {
            wg.Add(1)
            go read(i)
        }
    
        wg.Wait()
        end := time.Now().UnixNano()
        cost := (end - start) / 1000 / 1000
        fmt.Println("cost:", cost, "ms")
    }
    View Code

    package main
    
    import (
        "fmt"
        "sync"
        "sync/atomic"
        "time"
    )
    
    var x int32
    var wg sync.WaitGroup
    
    var mutex sync.Mutex
    
    func addMutex() {
        for i := 0; i < 500; i++ {
            mutex.Lock()
            x = x + 1
            mutex.Unlock()
        }
        wg.Done()
    }
    
    func add() {
        for i := 0; i < 500; i++ {
            //mutex.Lock()
            //x = x +1
            atomic.AddInt32(&x, 1)
            //mutex.Unlock()
        }
        wg.Done()
    }
    
    func main() {
    
        start := time.Now().UnixNano()
        for i := 0; i < 10000; i++ {
            wg.Add(1)
            go add()
            //go addMutex()
        }
    
        wg.Wait()
        end := time.Now().UnixNano()
        cost := (end - start) / 1000 / 1000
        fmt.Println("x:", x, "cost:", cost, "ms")
    }
    atomic

    其它案例:

    先看代码

    package main
    import (
        "strings"
        "fmt"
        "time"
    )
    
    
    
    func main()  {
    
        users:=strings.Split("shenyi,zhangsan,lisi,wangwu",",")
        ages:=strings.Split("19,21,25,26",",")
    
        c1,c2:=make(chan bool),make(chan bool)
        ret:=make([]string,0)
        go func() {
            for _,v:=range users{
                 <-c1
                 ret=append(ret,v)
                 time.Sleep(time.Second)
                 c2<-true
            }
        }()
        go func() {
            for _,v:=range ages{
                <-c2
                ret=append(ret,v)
                c1<-true
            }
        }()
        c1<-true
        fmt.Println(ret)
    
    
    }

    打印:

    [shenyi]

    package main
    import (
        //_ "github.com/go-sql-driver/mysql"
        "io/ioutil"
        "net/http"
        "fmt"
    )
    
    
    
    func main()  {
    
         url:="https://news.cnblogs.com/n/page/%d/"
    
         c:=make(chan map[int][]byte)
         for i:=1;i<=3;i++{
             go func(index int) {
                url:=fmt.Sprintf(url,index)
                res,_:=http.Get(url)
                cnt,_:= ioutil.ReadAll(res.Body)
                c<-map[int][]byte{index:cnt}
    
                if index==3 {
                    close(c)
                }
            }(i)
         }
    
         for getcnt:=range c{
              for k,v:=range getcnt{
                 ioutil.WriteFile(fmt.Sprintf("./files/%d",k),v,666)
             }
    
         }
    
    
    
    
    
    
    
    }

    打印:

    。。。。会一直hang住

  • 相关阅读:
    一条insert语句批量插入多条记录
    分析器错误消息: 未能加载类型“WebApplication._Default”
    Avi视频生成缩略图时,提示“尝试读取或写入受保护的内存。这通常指示其他内存已损坏”
    DataGridView 的单元格的边框、 网格线样式的设定【转】
    2015届求职经历(转)
    现在有m组n个有序数组,例如{1,2,3,4},{2,3,4,6},{1,3,5,7},在这些数组中选择第k小的数据,然后返回这个值
    给定数组A,大小为n,现给定数X,判断A中是否存在两数之和等于X
    ASP.Net中使用Report Service
    为自己尝试写点东西吧,程序员们!(转)
    ubuntu菜单面板丢了怎么找回
  • 原文地址:https://www.cnblogs.com/sunlong88/p/11234859.html
Copyright © 2011-2022 走看看