zoukankan      html  css  js  c++  java
  • sync—WaitGroup

    转自:https://mp.weixin.qq.com/s/HUqTC6i7aSUBVq0bsqfQXw

    sync.WaitGroup() 用来做Goroutine的等待,当使用go启动多个并发程序,通过waitgroup可以等待所有go协程结束后再执行后面的代码逻辑

    用途:阻塞协程的执行,直到所有的goroutine执行完成

    WaitGroup总共有三个方法:Add(delta int),Done(),Wait()。简单的说一下这三个方法的作用。

    Add:添加或者减少等待goroutine的数量

    Done:相当于Add(-1)

    Wait:执行阻塞,直到所有的WaitGroup数量变成0

    WaitGroup主要维护了2个计数器,一个是请求计数器 v,一个是等待计数器 w,二者组成一个64bit的值,请求计数器占高32bit,等待计数器占低32bit。

    那么等待计数器拿来干嘛?是因为同一个实例的Wait()方法支持多处调用,每一次Wait()方法执行,等待计数器 w 就会加1,而当请求计数器v为0触发Wait()时,要根据w的数量发送w份的信号量,正确的触发所有的Wait(),这虽然不是很常用的一个特性,但是在一些特殊场合是有用处的(比如多个并发都依赖于WaitGroup的实例的结束信号来进行下一个action),演示代码如下

    func main() {
      wg := sync.WaitGroup{}
      for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
          defer wg.Done()
        }()
      }
      time.Sleep(2 * time.Second)
      for j := 0; j < 3; j++ {
        go func(i int) {
          // 3个地方调用Wait(),通过等待j计时器,每个 Wait 都会被唤醒
          wg.Wait()
          fmt.Println("wait done now ", i)
        }(j)
      }
      time.Sleep(10 * time.Second)
      return
    }
    /*
    输出如下,数字出现的顺序随机
    wait done now  1
    wait done now  0
    wait done now  2
    */

    同时,WaitGroup里还对使用逻辑进行了严格的检查,比如Wait()一旦开始不能再执行 Add() 操作

    下面是带注释的源码,去掉了不影响代码逻辑的trace部分:

    func (wg *WaitGroup) Add(delta int) {
        statep := wg.state()
        // 更新statep,statep将在wait和add中通过原子操作一起使用
        state := atomic.AddUint64(statep, uint64(delta)<<32)
        v := int32(state >> 32)
        w := uint32(state)
            if v < 0 {
            panic("sync: negative WaitGroup counter")
        }
        if w != 0 && delta > 0 && v == int32(delta) {
            // wait不等于0说明已经执行了Wait,此时不容许Add
            panic("sync: WaitGroup misuse: Add called concurrently with Wait")
        }
        // 正常情况,Add会让v增加,Done会让v减少,如果没有全部Done掉,此处v总是会大于0的,直到v为0才往下走
        // 而w代表是有多少个goruntine在等待done的信号,wait中通过compareAndSwap对这个w进行加1
         if v > 0 || w == 0 {
            return
        }
        // This goroutine has set counter to 0 when waiters > 0.
        // Now there can't be concurrent mutations of state:
        // - Adds must not happen concurrently with Wait,
        // - Wait does not increment waiters if it sees counter == 0.
        // Still do a cheap sanity check to detect WaitGroup misuse.
        // 当v为0(Done掉了所有)或者w不为0(已经开始等待)才会到这里,但是在这个过程中又有一次Add,导致statep变化,panic
        if *statep != state {
            panic("sync: WaitGroup misuse: Add called concurrently with Wait")
        }
        // Reset waiters count to 0.
        // 将statep清0,在Wait中通过这个值来保护信号量发出后还对这个Waitgroup进行操作
        *statep = 0
        // 将信号量发出,触发wait结束
        for ; w != 0; w-- {
            runtime_Semrelease(&wg.sema, false)
        }
    }
    
    // Done decrements the WaitGroup counter by one.
    func (wg *WaitGroup) Done() {
        wg.Add(-1)
    }
    
    // Wait blocks until the WaitGroup counter is zero.
    func (wg *WaitGroup) Wait() {
        statep := wg.state()
            for {
            state := atomic.LoadUint64(statep)
            v := int32(state >> 32)
            w := uint32(state)
            if v == 0 {
                // Counter is 0, no need to wait.
                if race.Enabled {
                    race.Enable()
                    race.Acquire(unsafe.Pointer(wg))
                }
                return
            }
            // Increment waiters count.
            // 如果statep和state相等,则增加等待计数,同时进入if等待信号量
            // 此处做CAS,主要是防止多个goroutine里进行Wait()操作,每有一个goroutine进行了wait,等待计数就加1
            // 如果这里不相等,说明statep,在 从读出来 到 CAS比较 的这个时间区间内,被别的goroutine改写了,那么不进入if,回去再读一次,这样写避免用锁,更高效些
            if atomic.CompareAndSwapUint64(statep, state, state+1) {
                if race.Enabled && w == 0 {
                    // Wait must be synchronized with the first Add.
                    // Need to model this is as a write to race with the read in Add.
                    // As a consequence, can do the write only for the first waiter,
                    // otherwise concurrent Waits will race with each other.
                    race.Write(unsafe.Pointer(&wg.sema))
                }
                // 等待信号量
                runtime_Semacquire(&wg.sema)
                // 信号量来了,代表所有Add都已经Done
                if *statep != 0 {
                    // 走到这里,说明在所有Add都已经Done后,触发信号量后,又被执行了Add
                    panic("sync: WaitGroup is reused before previous Wait has returned")
                }
                return
            }
        }
    }
    

      

    示例

    package main
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    func main() {
    	var wg sync.WaitGroup
    
    	for i := 0; i < 5; i = i + 1 {
    		wg.Add(1)
    		go func(n int) {
    			defer wg.Done()
    			//defer wg.Add(-1)
    			EchoNumber(n)
    		}(i)
    	}
    
    	wg.Wait()
    }
    
    func EchoNumber(i int) {
    	time.Sleep(5000)
    	fmt.Println(i)
    }
    

      执行结果:

    4
    3
    0
    1
    2
    

     因为哪个协程先被cpu调度是不确定的,所以每次的执行结果也可能是不同的

     这个程序如果不用WaitGroup,那么将看不见输出结果。因为goroutine还没执行完,主线程已经执行完毕。之前执行脚本,都是使用time.Sleep,用一个估计的时间等到子线程执行完,实在太low了

     注释的defer wg.Add(-1)和defer wg.Done()作用一样

       虽然channel也能实现,但是如果涉及不到协程之间的数据同步,这个感觉不错

  • 相关阅读:
    swf上传地址
    Nape 获取碰撞点加特效
    vbs打包exe工具
    Air打包exe
    JDK12的安装搭建
    Dubble 入门
    FastDFS 集群
    PAT Advanced 1077 Kuchiguse (20 分)
    高可用4层lvs——keepalived
    PAT Advanced 1035 Password (20 分)
  • 原文地址:https://www.cnblogs.com/yorkyang/p/8134697.html
Copyright © 2011-2022 走看看