zoukankan      html  css  js  c++  java
  • sync.WaitGroup golang并发调度器

    源码如下: 

    type WaitGroup struct {
    noCopy noCopy

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers do not ensure it. So we allocate 12 bytes and then use
    // the aligned 8 bytes in them as state.
    state1 [12]byte
    sema uint32
    }
    常用的三个方法分别为:
    func (wg *WaitGroup) Add(delta int) {
    statep := wg.state()
    if race.Enabled {
    _ = *statep // trigger nil deref early
    if delta < 0 {
    // Synchronize decrements with Wait.
    race.ReleaseMerge(unsafe.Pointer(wg))
    }
    race.Disable()
    defer race.Enable()
    }
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if race.Enabled {
    if delta > 0 && v == int32(delta) {
    // The first increment must be synchronized with Wait.
    // Need to model this as a read, because there can be
    // several concurrent wg.counter transitions from 0.
    race.Read(unsafe.Pointer(&wg.sema))
    }
    }
    if v < 0 {
    panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
    panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
    return
    }
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
    if *statep != state {
    panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Reset waiters count to 0.
    *statep = 0
    for ; w != 0; w-- {
    runtime_Semrelease(&wg.sema, false)
    }
    }

    // Done decrements the WaitGroup counter by one.
    func (wg *WaitGroup) Done() {
    wg.Add(-1)
    }

    // Wait blocks until the WaitGroup counter is zero.
    func (wg *WaitGroup) Wait() {
    statep := wg.state()
    if race.Enabled {
    _ = *statep // trigger nil deref early
    race.Disable()
    }
    for {
    state := atomic.LoadUint64(statep)
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    // Counter is 0, no need to wait.
    if race.Enabled {
    race.Enable()
    race.Acquire(unsafe.Pointer(wg))
    }
    return
    }
    // Increment waiters count.
    if atomic.CompareAndSwapUint64(statep, state, state+1) {
    if race.Enabled && w == 0 {
    // Wait must be synchronized with the first Add.
    // Need to model this is as a write to race with the read in Add.
    // As a consequence, can do the write only for the first waiter,
    // otherwise concurrent Waits will race with each other.
    race.Write(unsafe.Pointer(&wg.sema))
    }
    runtime_Semacquire(&wg.sema)
    if *statep != 0 {
    panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    if race.Enabled {
    race.Enable()
    race.Acquire(unsafe.Pointer(wg))
    }
    return
    }
    }
    }
    因此可以简单的解释为:
    1.wg.add 为调度器添加
    2.wg.done := wg.add(-1) 即调度器删除一个任务(非指定)
    3.wg.wait() 等待 所有并发结束后 解除堵塞
    因此可以利用这三个方法建立我们简单的并发模型:
    func Test_channel1(t *testing.T){
    var wg sync.WaitGroup

    for i := 0; i < 5; i++{
    wg.Add(1)
    go func() {
    defer wg.Done()
    task()
    }()
    }
    wg.Wait()
    }


    func task(){
    fmt.Println("开始干活")
    time.Sleep(2* time.Second)
    fmt.Println("干活结束")
    }
  • 相关阅读:
    HDU 1556 差分,前缀和
    Full permutation
    PAT B1029
    字串简介
    阵列(3)
    完形填空
    关于c的比较
    19 阵列的复制
    switch述句
    阵列变数(2)
  • 原文地址:https://www.cnblogs.com/EvildoerOne/p/8337249.html
Copyright © 2011-2022 走看看