Mutex是go标准库中的互斥锁,用于处理并发场景下共享资源的访问冲突问题。
1. Mutex定义:
// A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 //表示当前锁的状态 sema uint32 //信号量专用,用于阻塞/唤醒goroutine } // A Locker represents an object that can be locked and unlocked. type Locker interface { Lock() Unlock() }
state字段在这里有更丰富的含义,他是一个32位的整型。他的多重含义通过不同的位来表示:
在这里,
locked:表示这个锁是否被持有。
wokend:表示是否有唤醒的goroutine线程。
starving:表示是否处于饥饿状态。
waiterCount:表示等待该锁的goroutine的数量。
2. 加锁操作
// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex.
// 如果当前处于未加锁状态,则直接加锁,并返回
// CompareAndSwapINt32() 将指定的old值 0 与指定地址 &m.state中的值相比较,如果相等,将new值 mutexLocked赋值给指定地址 &m.state。 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined)
// 否则,进入slow path m.lockSlow() }
func (m *Mutex) lockSlow() {
// 等待时间 var waitStartTime int64
// 是否处于饥饿状态 starving := false
// 是否有唤醒的goroutine awoke := false
// 自旋迭代次数 iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway.
// 如果当前锁被占有且处于非饥饿状态,就进入自旋;自旋次数达到上限,再进入下一步。需要注意的是,如果这是一个被唤醒的协程,还需要将唤醒标志位置成唤醒状态1. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state //再次获取锁的状态,之后会检查锁是否被释放了 continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue.
// 自旋结束后,检查旧状态;如果是非饥饿状态,新的状态持有锁标志位置成 1 if old&mutexStarving == 0 { new |= mutexLocked }
// 检查旧状态,如果锁被占用或处于饥饿状态为1,waiter数量 +1;这个时候,协程只能进入等待队列。如果当前是饥饿状态,新来得协程什么也做不了,会直接沉睡。 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case.
// 如果当前协程被唤醒后,认定是饥饿状态,并且目前锁是被占用状态;那么锁的饥饿标志位被设定为1. if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case.
// 协程被唤醒,需要将唤醒标志位清0,因为他要么继续沉睡,要么获取锁 if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果当前锁的状态已经被释放,并且不是饥饿状态,那么当前协程直接占用锁,并返回 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue.
// 如果没能抢到锁,就进入等待状态;被唤醒的重新沉睡的协程会被放在队列的首位,并且这里会开始计时,用于计算是否达到饥饿状态。新加入的自旋之后的协程会放在队列的尾部。 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }
// 阻塞等待,休眠当前goroutine等待并尝试获取信号量唤醒当前goroutine runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 唤醒之后检查锁是否处于饥饿状态 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state
// 如果锁已经处于饥饿状态,直接抢到锁,返回 if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that.
// 如果当前协程被唤醒,欢迎之后检查锁是否处于饥饿状态;如果处于饥饿状态,直接抢到锁,返回 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") }
// 加锁,并将waiter数减1 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode.
// 最后一个 waiter或者已经不饥饿了,清楚饥饿标记 delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
加锁操作如下:
1. 新来的goroutine直接抢占锁,抢占失败进入自旋(循环),自旋一定次数后还是没有抢到锁进入等待队列。这一步给占有CPU时间片的goroutine更多的机会,降低整体的性能消耗。
2. 当前协程被唤醒后,判断是否处于饥饿状态;
如果是非饥饿状态,那么和新来的协程走正常抢占锁的流程;因为被唤醒的协程不占用cpu时间片,经常抢不过新来的协程;如果协程超过1ms还没有获取到锁,就进入饥饿状态。
如果是饥饿状态,将锁的饥饿标志位设置为1;锁的持有者执行完任务后,会将饥饿状态的锁交给队列中的第一个协程;新来的协程不会去抢占锁,也不会spin,而会直接进入阻塞队列。
3. 解锁操作
// Unlock unlocks m. // It is a run-time error if m is not locked on entry to Unlock. // // A locked Mutex is not associated with a particular goroutine. // It is allowed for one goroutine to lock a Mutex and then // arrange for another goroutine to unlock it. func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit.
// 给 m.state直接减 1, 如果等于0,表示没有其他goroutine在等待,可以直接返回。 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
// 否则,需要通过信号量机制唤醒沉睡的goroutine m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) {
// 解锁一个没有锁定的mutex会报错 if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") }
// 判断是否处于饥饿状态,如果不处于饥饿状态 if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way.
// 如果没有等待的goroutine,或者goroutine已经被唤醒或持有了锁,直接返回 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone.
// 唤醒等待者,并将唤醒goroutine的唤醒标志位设置为 1 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it.
// 如果处于饥饿状态,则直接唤醒第一个goroutine runtime_Semrelease(&m.sema, true, 1) } }