zoukankan      html  css  js  c++  java
  • 4. Go并发编程--Mutex/RWMutex

    一.前言

    我们反复提到了goroutine的创建时简单的。 但是仍然要小心, 习惯总是会导致我们可能写出一些bug.对于语言规范没有定义的内容不要做任何的假设。

    需要通过同步语义来控制代码的执行顺序 这一点很重要。 这些包提供了一些基础的同步语义,但是在实际的并发编程当中,我们应该使用 channel 来进行同步控制。

    二. Mutex

    2.1 案例

    上一篇文章中有这样一个例子

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    var wg sync.WaitGroup
    var counter int
    
    func main() {
    	// 多跑几次来看结果
    	for i := 0; i < 100000; i++ {
    		run()
    	}
    	fmt.Printf("Final Counter: %d
    ", counter)
    }
    
    
    func run() {
        // 开启两个 协程,操作
    	for i := 1; i <= 2; i++ {
    		wg.Add(1)
    		go routine(i)
    	}
    	wg.Wait()
    }
    
    func routine(id int) {
    	for i := 0; i < 2; i++ {
    		value := counter
    		value++
    		counter = value
    	}
    	wg.Done()
    }
    

    测试后我们会发现,每次执行的结果都不一样, 那么如何运用Mutex进行修改呢?

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    var wg sync.WaitGroup
    var counter int
    var mu sync.Mutex
    
    func main() {
    	// 多跑几次来看结果
    	for i := 0; i < 100000; i++ {
    		run()
    	}
    	fmt.Printf("Final Counter: %d
    ", counter)
    }
    
    
    func run() {
    	for i := 1; i <= 2; i++ {
    		wg.Add(1)
    		go routine(i)
    	}
    	wg.Wait()
    	fmt.Printf("Final Counter: %d
    ", counter)
    }
    
    func routine(id int) {
    	for i := 0; i < 2; i++ {
    	    // 加锁
    		mu.Lock()
    		counter++
    		// 解锁
    		mu.Unlock()
    	}
    	wg.Done()
    }
    

    这里主要的目的就是为了保护我们临界区的数据,通过锁来进行保证。锁的使用非常的简单,但是还是有几个需要注意的点

    • 锁的范围要尽量的小,不要搞很多大锁
    • 用锁一定要解锁,小心产生死锁

    三. 实现原理

    3.1 锁的实现模式

    • Barging: 这种模式是为了提高吞吐量,当锁被释放时,它会唤醒第一个等待者,然后把锁给第一个等待者或者给第一个请求锁的人
    • Handoff: 当锁释放的时候, 锁会一直持有直到第一个等待者准备好获取锁。 它降低了吞吐量,因为锁被持有, 即使另一个 goroutine 准备获取它。这种模式可以解决公平性的问题,因为在 Barging 模式下可能会存在被唤醒的 goroutine 永远也获取不到锁的情况,毕竟一直在 cpu 上跑着的 goroutine 没有上下文切换会更快一些。缺点就是性能会相对差一些
    • Spining:自旋在等待队列为空或者应用程序重度使用锁时效果不错。Parking 和 Unparking goroutines 有不低的性能成本开销,相比自旋来说要慢得多。但是自旋是有成本的,所以在 go 的实现中进入自旋的条件十分的苛刻。

    3.2 Go Mutex 实现原理

    3.2.1 加锁

    1. 首先如果当前锁处于初始化状态就直接用CAS方法尝试获取锁,这是 Fast Path
    2. 如果失败就进入 Slow Path
      • 会首先判断当前能不能进入自旋状态,如果可以就进入自旋,最多自旋 4 次
      • 自旋完成之后,就会去计算当前的锁的状态
      • 然后尝试通过 CAS 获取锁
      • 如果没有获取到就调用 runtime_SemacquireMutex 方法休眠当前 goroutine 并且尝试获取信号量
      • goroutine 被唤醒之后会先判断当前是否处在饥饿状态,(如果当前 goroutine 超过 1ms 都没有获取到锁就会进饥饿模式)
        1. 如果处在饥饿状态就会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出
        2. 如果不在,就会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环

    CAS 方法在这里指的是 atomic.CompareAndSwapInt32(addr, old, new) bool 方法,这个方法会先比较传入的地址的值是否是 old,如果是的话就尝试赋新值,如果不是的话就直接返回 false,返回 true 时表示赋值成功
    饥饿模式是 Go 1.9 版本之后引入的优化,用于解决公平性的问题[10]

    3.2.2 解锁

    解锁的流程相对于加锁简单很多,这里直接上图,过程不过多赘述

    四. 源码分析

    4.1 Mutex基本结构

    Mutex是个结构体,源码如下

    type Mutex struct {
    	state int32
    	sema  uint32
    }
    

    Mutex 结构体由 state sema 两个 4 字节成员组成,其中 state 表示了当前锁的状态, sema 是用于控制锁的信号量

    state 字段的最低三位表示三种状态,分别是 mutexLocked mutexWoken mutexStarving ,剩下的用于统计当前在等待锁的 goroutine 数量

    • mutexLocked 表示是否处于锁定状态
    • mutexWoken 表示是否处于唤醒状态
    • mutexStarving 表示是否处于饥饿状态

    4.2 加锁

    互斥锁加锁逻辑如下

    • 当调用 Lock 方法的时候,会先尝试走 Fast Path,也就是如果当前互斥锁如果处于未加锁的状态,尝试加锁,只要加锁成功就直接返回

      func (m *Mutex) Lock() {
      	// Fast path: grab unlocked mutex.
      	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      		return
      	}
      	// Slow path (outlined so that the fast path can be inlined)
      	m.lockSlow()
      }
      
    • 否则的话就进入 slow path

      func (m *Mutex) lockSlow() {
      var waitStartTime int64 // 等待时间
      starving := false // 是否处于饥饿状态
      awoke := false // 是否处于唤醒状态
      iter := 0 // 自旋迭代次数
      old := m.state
      for {
      	// Don't spin in starvation mode, ownership is handed off to waiters
      	// so we won't be able to acquire the mutex anyway.
      	if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      		// Active spinning makes sense.
      		// Try to set mutexWoken flag to inform Unlock
      		// to not wake other blocked goroutines.
      		if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
      			atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
      			awoke = true
      		}
      		runtime_doSpin()
      		iter++
      		old = m.state
      		continue
      	}
      
    • lockSlow 方法中可以看到,有一个大的for 循环,不断的尝试去获取互斥锁,在循环的内部,第一步就是判断能否自旋状态。
      进入自旋状态的判断比较苛刻,具体需要满足什么条件呢? runtime_canSpin 源码见下方

      • 当前互斥锁的状态是非饥饿状态,并且已经被锁定了
      • 自旋次数不超过 4 次
      • cpu 个数大于一,必须要是多核 cpu
      • 当前正在执行当中,并且队列空闲的 p 的个数大于等于一
      // Active spinning for sync.Mutex.
      //go:linkname sync_runtime_canSpin sync.runtime_canSpin
      //go:nosplit
      func sync_runtime_canSpin(i int) bool {
          // 自旋次数不超过4
          // cpu个数大于1--所以必须是多核CPU
          // 队列空闲的p的个数大于等于1
      	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
      		return false
      	}
      	if p := getg().m.p.ptr(); !runqempty(p) {
      		return false
      	}
      	return true
      }
      
    • 如果可以进入自旋状态之后就会调用 runtime_doSpin 方法进入自旋, doSpin 方法会调用 procyield(30) 执行三十次 PAUSE 指令

      TEXT runtime·procyield(SB),NOSPLIT,$0-0
      MOVL	cycles+0(FP), AX
      again:
      	PAUSE
      	SUBL	$1, AX
      	JNZ	again
      	RET
      

      为什么使用 PAUSE 指令呢?
      PAUSE 指令会告诉 CPU 我当前处于处于自旋状态,这时候 CPU 会针对性的做一些优化,并且在执行这个指令的时候 CPU 会降低自己的功耗,减少能源消耗

      if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
          atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
          awoke = true
      }
      
    • 在自旋的过程中会尝试设置 mutexWoken 来通知解锁,从而避免唤醒其他已经休眠的 goroutine 在自旋模式下,当前的 goroutine 就能更快的获取到锁

      new := old
      // Don't try to acquire starving mutex, new arriving goroutines must queue.
      if old&mutexStarving == 0 {
      	new |= mutexLocked
      }
      if old&(mutexLocked|mutexStarving) != 0 {
      	new += 1 << mutexWaiterShift
      }
      // The current goroutine switches mutex to starvation mode.
      // But if the mutex is currently unlocked, don't do the switch.
      // Unlock expects that starving mutex has waiters, which will not
      // be true in this case.
      if starving && old&mutexLocked != 0 {
      	new |= mutexStarving
      }
      if awoke {
      	// The goroutine has been woken from sleep,
      	// so we need to reset the flag in either case.
      	if new&mutexWoken == 0 {
      		throw("sync: inconsistent mutex state")
      	}
      	new &^= mutexWoken
      }
      
    • 自旋结束之后就会去计算当前互斥锁的状态,如果当前处在饥饿模式下则不会去请求锁,而是会将当前 goroutine 放到队列的末端

      if atomic.CompareAndSwapInt32(&m.state, old, new) {
      if old&(mutexLocked|mutexStarving) == 0 {
          break // locked the mutex with CAS
      }
      // If we were already waiting before, queue at the front of the queue.
      queueLifo := waitStartTime != 0
      if waitStartTime == 0 {
          waitStartTime = runtime_nanotime()
      }
      runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      old = m.state
      if old&mutexStarving != 0 {
          // If this goroutine was woken and mutex is in starvation mode,
          // ownership was handed off to us but mutex is in somewhat
          // inconsistent state: mutexLocked is not set and we are still
          // accounted as waiter. Fix that.
          if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
              throw("sync: inconsistent mutex state")
          }
          delta := int32(mutexLocked - 1<<mutexWaiterShift)
          if !starving || old>>mutexWaiterShift == 1 {
              // Exit starvation mode.
              // Critical to do it here and consider wait time.
              // Starvation mode is so inefficient, that two goroutines
              // can go lock-step infinitely once they switch mutex
              // to starvation mode.
              delta -= mutexStarving
          }
          atomic.AddInt32(&m.state, delta)
          break
      }
      awoke = true
      iter = 0
      }
      

    状态计算完成之后就会尝试使用 CAS 操作获取锁,如果获取成功就会直接退出循环
    如果获取失败,则会调用 runtime_SemacquireMutex(&m.sema, queueLifo, 1) 方法保证锁不会同时被两个 goroutine 获取。runtime_SemacquireMutex 方法的主要作用是:

    • 不断调用尝试获取锁
    • 休眠当前goroutine
    • 等待信号量, 唤醒 goroutine

    goroutine 被唤醒后就会去判断当前是否处于饥饿模式,如果当前等待超过1ms 就会进入饥饿模式

    • 饥饿模式下: 会获得互斥锁,如果等待队列中只存在当前Goroutine, 互斥锁还会从饥饿模式中退出
    • 正常模式下: 会设置唤醒和饥饿标识, 重置迭代次数并重新执行获取锁的循环

    4.3 解锁

    加锁比解锁简单多了,原理直接参考源码的注释

    // 解锁没有绑定关系,可以一个 goroutine 锁定,另外一个 goroutine 解锁
    func (m *Mutex) Unlock() {
    	// Fast path: 直接尝试设置 state 的值,进行解锁
    	new := atomic.AddInt32(&m.state, -mutexLocked)
        // 如果减去了 mutexLocked 的值之后不为零就会进入慢速通道,这说明有可能失败了,或者是还有其他的 goroutine 等着
    	if new != 0 {
    		// Outlined slow path to allow inlining the fast path.
    		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
    		m.unlockSlow(new)
    	}
    }
    
    func (m *Mutex) unlockSlow(new int32) {
        // 解锁一个没有锁定的互斥量会报运行时错误
    	if (new+mutexLocked)&mutexLocked == 0 {
    		throw("sync: unlock of unlocked mutex")
    	}
        // 判断是否处于饥饿模式
    	if new&mutexStarving == 0 {
            // 正常模式
    		old := new
    		for {
    			// 如果当前没有等待者.或者 goroutine 已经被唤醒或者是处于锁定状态了,就直接返回
    			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    				return
    			}
    			// 唤醒等待者并且移交锁的控制权
    			new = (old - 1<<mutexWaiterShift) | mutexWoken
    			if atomic.CompareAndSwapInt32(&m.state, old, new) {
    				runtime_Semrelease(&m.sema, false, 1)
    				return
    			}
    			old = m.state
    		}
    	} else {
    		// 饥饿模式,走 handoff 流程,直接将锁交给下一个等待的 goroutine,注意这个时候不会从饥饿模式中退出
    		runtime_Semrelease(&m.sema, true, 1)
    	}
    }
    

    通过源码注解,总结解锁需要注意以下

    1. 解锁一个没有锁定的互斥量会报运行时错误

    五. 读写锁(RWMutex)

    读写锁相对于互斥锁来说粒度更细,使用读写锁可以并发读,但是不能并发读写,或者并发写写

    5.1 案例

    大部分的业务应用都是读多写少的场景,这个时候使用读写锁的性能就会比互斥锁要好一些,例如下面的这个例子,是一个配置读写的例子,我们分别使用读写锁和互斥锁实现

    1. 读写锁
    // RWMutexConfig 读写锁实现
    type RWMutexConfig struct {
    	rw   sync.RWMutex
    	data []int
    }
    
    // Get get config data
    func (c *RWMutexConfig) Get() []int {
    	c.rw.RLock()
    	defer c.rw.RUnlock()
    	return c.data
    }
    
    // Set set config data
    func (c *RWMutexConfig) Set(n []int) {
    	c.rw.Lock()
    	defer c.rw.Unlock()
    	c.data = n
    }
    
    1. 互斥锁
    
    // MutexConfig 互斥锁实现
    type MutexConfig struct {
    	data []int
    	mu   sync.Mutex
    }
    
    // Get get config data
    func (c *MutexConfig) Get() []int {
    	c.mu.Lock()
    	defer c.mu.Unlock()
    	return c.data
    }
    
    // Set set config data
    func (c *MutexConfig) Set(n []int) {
    	c.mu.Lock()
    	defer c.mu.Unlock()
    	c.data = n
    }
    

    并发基准测试,测试两种锁的性能

    type iConfig interface {
    	Get() []int
    	Set([]int)
    }
    
    func bench(b *testing.B, c iConfig) {
    	b.RunParallel(func(p *testing.PB) {
    		for p.Next() {
    			c.Set([]int{100})
    			c.Get()
    			c.Get()
    			c.Get()
    			c.Set([]int{100})
    			c.Get()
    			c.Get()
    		}
    	})
    }
    
    func BenchmarkMutexConfig(b *testing.B) {
    	conf := &MutexConfig{data: []int{1, 2, 3}}
    	bench(b, conf)
    }
    
    func BenchmarkRWMutexConfig(b *testing.B) {
    	conf := &RWMutexConfig{data: []int{1, 2, 3}}
    	bench(b, conf)
    }
    

    执行测试结果如下

    root@failymao:/mnt/d/gopath/src/Go_base/daily_test/mutex# go test -race -bench=.
    goos: linux
    goarch: amd64
    pkg: Go_base/daily_test/mutex
    BenchmarkMutexConfig-8            179932              5820 ns/op
    BenchmarkRWMutexConfig-8          279578              3939 ns/op
    PASS
    ok      Go_base/daily_test/mutex        3.158s
    

    可以看到首先是没有 data race 问题,其次读写锁的性能几乎是互斥锁的一倍

    5.2 源码解析

    5.2.1 基本结构

    type RWMutex struct {
        w           Mutex  // 复用互斥锁
    	writerSem   uint32 // 信号量,用于写等待读
    	readerSem   uint32 // 信号量,用于读等待写
    	readerCount int32  // 当前执行读的 goroutine 数量
    	readerWait  int32  // 写操作被阻塞的准备读的 goroutine 的数量
    }
    

    由于复用了互斥锁的代码,读写锁的源码很简单

    5.2.2 读锁

    加锁

    func (rw *RWMutex) RLock() {
        // 直接对读的goroutine记录加1,并判断当前执行的读的goroutines数量是否为空
    	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    		// A writer is pending, wait for it.
    		runtime_SemacquireMutex(&rw.readerSem, false, 0)
    	}
    }
    

    首先是读锁, atomic.AddInt32(&rw.readerCount, 1) 调用这个原子方法,对当前在读的数量加一,如果返回负数,那么说明当前有其他写锁,这时候就调用 runtime_SemacquireMutex 休眠 goroutine 等待被唤醒

    解锁

    func (rw *RWMutex) RUnlock() {
    	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
    		// Outlined slow-path to allow the fast-path to be inlined
    		rw.rUnlockSlow(r)
    	}
    }
    

    解锁的时候对正在读的操作减一,如果返回值小于 0 那么说明当前有在写的操作,这个时候调用 rUnlockSlow 进入慢速通道

    func (rw *RWMutex) rUnlockSlow(r int32) {
    	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
    		race.Enable()
    		throw("sync: RUnlock of unlocked RWMutex")
    	}
    	// A writer is pending.
    	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
    		// The last reader unblocks the writer.
    		runtime_Semrelease(&rw.writerSem, false, 1)
    	}
    }
    

    5.2.3 写锁

    写锁

    func (rw *RWMutex) Lock() {
    	// First, resolve competition with other writers.
    	rw.w.Lock()
    	// Announce to readers there is a pending writer.
    	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    	// Wait for active readers.
    	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    		runtime_SemacquireMutex(&rw.writerSem, false, 0)
    	}
    }
    

    首先调用互斥锁的 lock,获取到互斥锁之后

    • atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 调用这个函数阻塞后续的读操作
    • 如果计算之后当前仍然有其他 goroutine 持有读锁,那么就调用 runtime_SemacquireMutex 休眠当前的 goroutine 等待所有的读操作完成

    解锁

    func (rw *RWMutex) Unlock() {
    	// Announce to readers there is no active writer.
    	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    	if r >= rwmutexMaxReaders {
    		race.Enable()
    		throw("sync: Unlock of unlocked RWMutex")
    	}
    	// Unblock blocked readers, if any.
    	for i := 0; i < int(r); i++ {
    		runtime_Semrelease(&rw.readerSem, false, 0)
    	}
    }
    

    解锁的操作,会先调用 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) 将恢复之前写入的负数,然后根据当前有多少个读操作在等待,循环唤醒

    六.参考

    1. https://mojotv.cn/go/golang-muteex-starvation
    2. https://lailin.xyz/post/go-training-week3-sync.html
    3. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/
    ♥永远年轻,永远热泪盈眶♥
  • 相关阅读:
    遇到的相关问题总结
    AI测试相关文章
    常用模块文档地址
    09-微服务接口:怎么用Mock解决混乱的调用关系
    03-思维方式:用一个案例彻底理解接口测试的关键逻辑
    1-基础:跳出细节看全局,接口测试到底是在做什么?
    21-Python并发编程之Futures
    使用原生php读写excel文件
    在for、foreach循环体中添加数组元素
    eval函数的坑
  • 原文地址:https://www.cnblogs.com/failymao/p/15491416.html
Copyright © 2011-2022 走看看