zoukankan      html  css  js  c++  java
  • Golang Channel实际应用

    不要通过共享内存来通信,要通过通信来共享内存

    在Go语言中,要传递某个数据给另一个goroutine(协程),可以把这个数据封装成一个对象,然后把这个对象的指针传入某个channel中,另外一个goroutine从这个channel中读出这个指针,并处理其指向的内存对象。Golang从语言层面保证同一个时间只有一个goroutine能够访问channel里面的数据,所以Golang的做法就是使用channel来通信,通过通信来传递内存数据,使得内存数据在不同的goroutine中传递,而不是使用共享内存来通信。

    一、通道同步

    使用通道来同步 Go 协程间的执行状态。这里是一个使用阻塞的接受方式来等待一个 Go 协程的运行结束。

    package main
    
    import "fmt"
    import "time"
    
    // 这是一个我们将要在 Go 协程中运行的函数。`done` 通道
    // 将被用于通知其他 Go 协程这个函数已经工作完毕。
    func worker(done chan bool) {
        fmt.Print("working...")
        time.Sleep(time.Second)
        fmt.Println("done")
    
        // 发送一个值来通知我们已经完工啦。
        done <- true
    }
    
    func main() {
    
        // 运行一个 worker Go协程,并给予用于通知的通道。
        done := make(chan bool, 1)
        go worker(done)
    
        // 程序将在接收到通道中 worker 发出的通知前一直阻塞。
        <-done
    }
    

    二、通道选择器(select)

    Go 的通道选择器 让你可以同时等待多个通道操作。Go 协程和通道以及选择器的结合是 Go 的一个强大特性。

    package main
    
    import "time"
    import "fmt"
    
    func main() {
    
        // 在我们的例子中,我们将从两个通道中选择。
        c1 := make(chan string)
        c2 := make(chan string)
    
        // 各个通道将在若干时间后接收一个值,这个用来模拟例如
        // 并行的 Go 协程中阻塞的 RPC 操作
        go func() {
            time.Sleep(time.Second * 1)
            c1 <- "one"
        }()
        go func() {
            time.Sleep(time.Second * 2)
            c2 <- "two"
        }()
    
        // 我们使用 `select` 关键字来同时等待这两个值,并打
        // 印各自接收到的值。
        for i := 0; i < 2; i++ {
            select {
            case msg1 := <-c1:
                fmt.Println("received", msg1)
            case msg2 := <-c2:
                fmt.Println("received", msg2)
            }
        }
    }
    

    三、超时处理

    超时 对于一个连接外部资源,或者其它一些需要花费执行时间的操作的程序而言是很重要的。得益于通道和 select,在 Go中实现超时操作是简洁而优雅的。

    package main
    
    import "time"
    import "fmt"
    
    func main() {
    
        // 在我们的例子中,假如我们执行一个外部调用,并在 2 秒后
        // 通过通道 `c1` 返回它的执行结果。
        c1 := make(chan string, 1)
        go func() {
            time.Sleep(time.Second * 2)
            c1 <- "result 1"
        }()
    
        // 这里是使用 `select` 实现一个超时操作。
        // `res := <- c1` 等待结果,`<-Time.After` 等待超时
        // 时间 1 秒后发送的值。由于 `select` 默认处理第一个
        // 已准备好的接收操作,如果这个操作超过了允许的 1 秒
        // 的话,将会执行超时 case。
        select {
        case res := <-c1:
            fmt.Println(res)
        case <-time.After(time.Second * 1):
            fmt.Println("timeout 1")
        }
    
        // 如果我允许一个长一点的超时时间 3 秒,将会成功的从 `c2`
        // 接收到值,并且打印出结果。
        c2 := make(chan string, 1)
        go func() {
            time.Sleep(time.Second * 2)
            c2 <- "result 2"
        }()
        select {
        case res := <-c2:
            fmt.Println(res)
        case <-time.After(time.Second * 3):
            fmt.Println("timeout 2")
        }
    }
    
    // 超时机制函数封装
    func TimeOut(timeout chan bool, times int) {
        go func() {
            time.Sleep(time.Duration(times) * time.Second)
            timeout <- true
        }()
    }
    

    四、定时器

    我们常常需要在后面一个时刻运行 Go 代码,或者在某段时间间隔内重复运行。Go 的内置 定时器 和 打点器 特性让这些很容易实现。

    package main
    
    import "time"
    import "fmt"
    
    func main() {
    
        // 定时器表示在未来某一时刻的独立事件。你告诉定时器
        // 需要等待的时间,然后它将提供一个用于通知的通道。
        // 这里的定时器将等待 2 秒。
        timer1 := time.NewTimer(time.Second * 2)
    
        // `<-timer1.C` 直到这个定时器的通道 `C` 明确的发送了
        // 定时器失效的值之前,将一直阻塞。
        <-timer1.C
        fmt.Println("Timer 1 expired")
    
        // 如果你需要的仅仅是单纯的等待,你需要使用 `time.Sleep`。
        // 定时器是有用原因之一就是你可以在定时器失效之前,取消这个
        // 定时器。这是一个例子
        timer2 := time.NewTimer(time.Second)
        go func() {
            <-timer2.C
            fmt.Println("Timer 2 expired")
        }()
        stop2 := timer2.Stop()
        if stop2 {
            fmt.Println("Timer 2 stopped")
        }
    }
    
    // 定时器函数封装
    func StartTicker(f func(), d time.Duration) chan struct{} {
        done := make(chan struct{}, 1)
        go func() {
            timer := time.NewTicker(d)
            defer timer.Stop()
            for {
                select {
                case <-timer.C:
                    f()
                case <-done:
                    return
                }
            }
        }()
        return done
    }
    

    五、打点器

    打点器和定时器的机制有点相似:一个通道用来发送数据,然后在这个通道上使用内置的 range 来迭代值每隔500ms 发送一次的值。

    package main
    
    import "time"
    import "fmt"
    
    func main() {
    
        // 打点器和定时器的机制有点相似:一个通道用来发送数据。
        // 这里我们在这个通道上使用内置的 `range` 来迭代值每隔
        // 500ms 发送一次的值。
        ticker := time.NewTicker(time.Millisecond * 500)
        go func() {
            for t := range ticker.C {
                fmt.Println("Tick at", t)
            }
        }()
    
        // 打点器可以和定时器一样被停止。一旦一个打点停止了,
        // 将不能再从它的通道中接收到值。我们将在运行后 1600ms
        // 停止这个打点器。
        time.Sleep(time.Millisecond * 1600)
        ticker.Stop()
        fmt.Println("Ticker stopped")
    }
    

    六、工作池

    这个通过上一篇文章中示例的深入讲解,应该是比较容易理解这个工作池的,这类似于一个线程池。

    package main
    
    import "fmt"
    import "time"
    
    // 这是我们将要在多个并发实例中支持的任务了。这些执行者
    // 将从 `jobs` 通道接收任务,并且通过 `results` 发送对应
    // 的结果。我们将让每个任务间隔 1s 来模仿一个耗时的任务。
    func worker(id int, jobs <-chan int, results chan<- int) {
        for j := range jobs {
            fmt.Println("worker", id, "processing job", j)
            time.Sleep(time.Second)
            results <- j * 2
        }
    }
    
    func main() {
    
        // 为了使用 worker 工作池并且收集他们的结果,我们需要
        // 2 个通道。
        jobs := make(chan int, 100)
        results := make(chan int, 100)
    
        // 这里启动了 3 个 worker,初始是阻塞的,因为
        // 还没有传递任务。
        for w := 1; w <= 3; w++ {
            go worker(w, jobs, results)
        }
    
        // 这里我们发送 9 个 `jobs`,然后 `close` 这些通道
        // 来表示这些就是所有的任务了。
    
        for j := 1; j <= 9; j++ {
            jobs <- j
        }
        close(jobs)
    
        // 最后,我们收集所有这些任务的返回值。
        for a := 1; a <= 9; a++ {
            <-results
        }
    }
    

    扩展:为了最优地提高工作池的并发性,理论上同时并行的任务量应该与CPU核心的数量相同,所有,上面的代码中的worker数量应该为运行主机的实际CPU核心数,可以runtime.NumCPU()获取,使用runtime.GOMAXPROCS()来设置运行的核心数。

    七、速率限制

    速率限制是一个重要的控制服务资源利用和质量的途径。传统的做法是通过redis,设置自增key的expire来实现。Go通过Go协程、通道和打点器优雅地的实现了速率限制。

    package main
    
    import "time"
    import "fmt"
    
    func main() {
    
        // 首先我们将看一下基本的速率限制。假设我们想限制我们
        // 接收请求的处理,我们将这些请求发送给一个相同的通道。
        requests := make(chan int, 5)
        for i := 1; i <= 5; i++ {
            requests <- i
        }
        close(requests)
    
        // 这个 `limiter` 通道将每 200ms 接收一个值。这个是
        // 速率限制任务中的管理器。
        limiter := time.Tick(time.Millisecond * 200)
    
        // 通过在每次请求前阻塞 `limiter` 通道的一个接收,我们限制
        // 自己每 200ms 执行一次请求。
        for req := range requests {
            <-limiter
            fmt.Println("request", req, time.Now())
        }
    
        // 有时候我们想临时进行速率限制,并且不影响整体的速率控制
        // 我们可以通过[通道缓冲](channel-buffering.html)来实现。
        // 这个 `burstyLimiter` 通道用来进行 3 次临时的脉冲型速率限制。
        burstyLimiter := make(chan time.Time, 3)
    
        // 想将通道填充需要临时改变3次的值,做好准备。
        for i := 0; i < 3; i++ {
            burstyLimiter <- time.Now()
        }
    
        // 每 200 ms 我们将添加一个新的值到 `burstyLimiter`中,
        // 直到达到 3 个的限制。
        go func() {
            for t := range time.Tick(time.Millisecond * 200) {
                burstyLimiter <- t
            }
        }()
    
        // 现在模拟超过 5 个的接入请求。它们中刚开始的 3 个将
        // 由于受 `burstyLimiter` 的“脉冲”影响。
        burstyRequests := make(chan int, 5)
        for i := 1; i <= 5; i++ {
            burstyRequests <- i
        }
        close(burstyRequests)
        for req := range burstyRequests {
            <-burstyLimiter
            fmt.Println("request", req, time.Now())
        }
    }
    
  • 相关阅读:
    poj1573
    poj2632
    poj2993 poj2996
    poj3295 构造法
    poj2965枚举
    poj1753 枚举
    poj942Paths on a Grid
    poj1019
    poj1321棋盘问题
    博客园访问量有些小,我就没有必要在复制一份了,博客园就这样吧,继续CSDN了。
  • 原文地址:https://www.cnblogs.com/remixnameless/p/15357723.html
Copyright © 2011-2022 走看看