zoukankan      html  css  js  c++  java
  • Go语言之并发编程(四)

    同步

    Go 程序可以使用通道进行多个 goroutine 间的数据交换,但这仅仅是数据同步中的一种方法。通道内部的实现依然使用了各种锁,因此优雅代码的代价是性能。在某些轻量级的场合,原子访问(atomic包)、互斥锁(sync.Mutex)以及等待组(sync.WaitGroup)能最大程度满足需求。

    当多线程并发运行的程序竞争访问和修改同一块资源时,会发生竞态问题。

    下面的代码中有一个 ID 生成器,每次调用生成器将会生成一个不会重复的顺序序号,使用 10 个并发生成序号,观察 10 个并发后的结果。

    竞态检测:

    package main
    
    import (
    	"fmt"
    	"sync/atomic"
    )
    
    var (
    	// 序列号
    	seq int64
    )
    
    // 序列号生成器
    func GenID() int64 {
    
    	// 尝试原子的增加序列号
    	atomic.AddInt64(&seq, 1)
    	return seq
    }
    
    func main() {
    
    	// 10个并发序列号生成
    	for i := 0; i < 10; i++ {
    		go GenID()
    	}
    
    	fmt.Println(GenID())
    }
    

      

    代码说明如下:

    • 第10行,序列号生成器中的保存上次序列号的变量。
    • 第17行,使用原子操作函数atomic.AddInt64()对seq()函数加1操作。不过这里故意没有使用atomic.AddInt64()的返回值作为GenID()函数的返回值,因此会造成一个竞态问题。
    • 第25行,循环10次生成10个goroutine调用GenID()函数,同时忽略GenID()的返回值。
    • 第28行,单独调用一次GenID()函数。

    在运行程序时,为运行参数加入-race参数,开启运行时(runtime)对竞态问题的分析,命令如下:

    # go run -race racedetect.go
    ==================
    WARNING: DATA RACE
    Write at 0x0000005d3f10 by goroutine 7:
      sync/atomic.AddInt64()
          E:/go/src/runtime/race_amd64.s:276 +0xb
      main.GenID()
          D:/go_work/src/chapter09/racedetect/racedetect.go:17 +0x4a
    
    Previous read at 0x0000005d3f10 by goroutine 6:
      main.GenID()
          D:/go_work/src/chapter09/racedetect/racedetect.go:18 +0x5a
    
    Goroutine 7 (running) created at:
      main.main()
          D:/go_work/src/chapter09/racedetect/racedetect.go:25 +0x56
    
    Goroutine 6 (finished) created at:
      main.main()
          D:/go_work/src/chapter09/racedetect/racedetect.go:25 +0x56
    ==================
    10
    Found 1 data race(s)
    exit status 66
    

      

    代码运行发生宕机,根据报错信息,第18行有竞态问题,根据atomic.AddInt64()的参数声明,这个函数会将修改后的值以返回值方式传出:

    func GenID() int64 {
        // 尝试原子的增加序列号
        return atomic.AddInt64(&seq, 1)
    }
    

      

    再次运行:

    # go run -race racedetect.go
    10
    

      

    没有发生竞态问题,程序运行正常。

    本例中只是对变量进行增减操作,虽然可以使用互斥锁(sync.Mutex)解决竞态问题,但是对性能消耗较大。在这种情况下,推荐使用原子操作(atomic)进行变量操作。

    互斥锁(sync.Mutex)和读写互斥锁(sync.RWMutex)

    互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。在Go程序中的使用非常简单,参见下面的代码:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    var (
        // 逻辑中使用的某个变量
        count int
    
        // 与变量对应的使用互斥锁
        countGuard sync.Mutex
    )
    
    func GetCount() int {
    
        // 锁定
        countGuard.Lock()
    
        // 在函数退出时解除锁定
        defer countGuard.Unlock()
    
        return count
    }
    
    func SetCount(c int) {
        countGuard.Lock()
        count = c
        countGuard.Unlock()
    }
    
    func main() {
    
        // 可以进行并发安全的设置
        SetCount(1)
    
        // 可以进行并发安全的获取
        fmt.Println(GetCount())
    
    }
    

      

    代码说明如下:

    • 第10行是某个逻辑步骤中使用到的变量,无论是包级的变量还是结构体成员字段,都可以。
    • 第13行,一般情况下,建议将互斥锁的粒度设置得越小越好,降低因为共享访问时等待的时间。
    • 第16行是一个获取count值的函数封装,通过这个函数可以并发安全的访问变量count。
    • 第19行,尝试对countGuard互斥量进行加锁。一旦countGuard发生加锁,如果另外一个goroutine尝试继续加锁时将会发生阻塞,直到这个countGuard被解锁。
    • 第22行使用defer将countGuard的解锁进行延迟调用,解锁操作将会发生在GetCount()函数返回时。
    • 第27行在设置count值时,同样使用countGuard进行加锁、解锁操作,保证修改count值的过程是一个原子过程,不会发生并发访问冲突。

    在读多写少的环境中,可以优先使用读写互斥锁(sync.RWMutex),它比互斥锁更加高效。sync包中的RWMutex提供了读写互斥锁的封装。

    我们将互斥锁例子中的一部分代码修改为读写互斥锁,参见下面代码:

    var (
        // 逻辑中使用的某个变量
        count int
    
        // 与变量对应的使用互斥锁
        countGuard sync.RWMutex
    )
    
    func GetCount() int {
    
        // 锁定
        countGuard.RLock()
    
        // 在函数退出时解除锁定
        defer countGuard.RUnlock()
    
        return count
    }
    

      

    代码说明如下:

    • 第6行,在声明countGuard时,从sync.Mutex互斥锁改为sync.RWMutex读写互斥锁。
    • 第12行,获取count的过程是一个读取count数据的过程,适用于读写互斥锁。在这一行,把countGuard.Lock()换做countGuard.RLock(),将读写互斥锁标记为读状态。如果此时另外一个goroutine并发访问了countGuard,同时也调用了countGuard.RLock()时,并不会发生阻塞。
    • 第15行,与读模式加锁对应的,使用读模式解锁。

    等待组(sync.WaitGroup)

    除了可以使用通道(channel)和互斥锁进行两个并发程序间的同步外,还可以使用等待组进行多个任务的同步,等待组可以保证在并发环境中完成指定数量的任务

    等待组有下面几个方法可用,如表1-2所示。

    表1-2   等待组的方法
    方法名 功能
    (wg * WaitGroup) Add(delta int) 等待组的计数器+1
    (wg *WaitGroup) Done() 等待组的计数器-1
    (wg *WaitGroup) Wait() 当等待组计数器不等于0时阻塞直到变0

    等待组内部拥有一个计数器,计数器的值可以通过方法调用实现计数器的增加和减少。当我们添加了N个并发任务进行工作时,就将等待组的计数器值增加N。每个任务完成时,这个值减1。同时,在另外一个goroutine中等待这个等待组的计数器值为0时,表示所有任务已经完成。

    package main
    
    import (
        "fmt"
        "net/http"
        "sync"
    )
    
    func main() {
    
        // 声明一个等待组
        var wg sync.WaitGroup
    
        // 准备一系列的网站地址
        var urls = []string{
            "http://www.github.com/",
            "https://www.qiniu.com/",
            "https://www.golangtc.com/",
        }
    
        // 遍历这些地址
        for _, url := range urls {
    
            // 每一个任务开始时, 将等待组增加1
            wg.Add(1)
    
            // 开启一个并发
            go func(url string) {
    
                // 使用defer, 表示函数完成时将等待组值减1
                defer wg.Done()
    
                // 使用http访问提供的地址
                _, err := http.Get(url)
    
                // 访问完成后, 打印地址和可能发生的错误
                fmt.Println(url, err)
    
                // 通过参数传递url地址
            }(url)
        }
    
        // 等待所有的任务完成
        wg.Wait()
    
        fmt.Println("over")
    }
    

      

    代码说明如下:

    • 第12行,声明一个等待组,对一组等待任务只需要一个等待组,而不需要每一个任务都使用一个等待组。
    • 第15行,准备一系列可访问的网站地址的字符串切片。
    • 第22行,遍历这些字符串切片。
    • 第25行,将等待组的计数器加1,也就是每一个任务加1。
    • 第28行,将一个匿名函数开启并发。
    • 第31行,在匿名函数结束时会执行这一句以表示任务完成。wg.Done()方法等效于执行wg.Add(-1)。
    • 第34行,使用http包提供的Get()函数对url进行访问,Get()函数会一直阻塞直到网站响应或者超时。
    • 第37行,在网站响应和超时后,打印这个网站的地址和可能发生的错误。
    • 第40行,这里将url通过goroutine的参数进行传递,是为了避免url变量通过闭包放入匿名函数后又被修改的问题。
    • 第44行,等待所有的网站都响应或者超时后,任务完成,Wait就会停止阻塞。
  • 相关阅读:
    聊天ListView
    Android笔试题三
    java内存分配与垃圾回收
    Activity切换的时候生命周期的变化
    二分查找
    如何用报表工具实现树状层级结构的填报表
    报表数据填报中的自动计算
    报表数据填报中的合法性校验
    tab 页形式展现多张报表
    报表工具之数据校验竟可以如此简单
  • 原文地址:https://www.cnblogs.com/beiluowuzheng/p/9903638.html
Copyright © 2011-2022 走看看