1. 简介

Go 的 sync.Mutex 作为互斥锁,在程序并发运行时,用于对某一段代码或某一种资源访问进行控制,防止同时执行。

如 Go 原生的 map 存在多个协程并发地进行修改,会引发 panic,需要使用互斥锁进行保护。又如代码中存在对某个变量进行先读取后更新的操作,当不进行互斥访问时,可能存在两个协程先后读取再先后更新,造成数据的更新不稳定符合预期,需要通过互斥锁将读取更新的逻辑隔离开来。

Mutex 的使用方法如下,定一个 sync.Mutex 对象,然后调用 Lock 方法加锁,执行完一段逻辑代码后,调用 Unlock 方法解锁。当某个协程加锁后,别的协程再调用 Lock 方法将会被阻塞,直至改变量调用 Unlock 方法解锁后,才能获取锁。

var lock sync.Mutex

func f() {
	lock.Lock()
	// do sth
	lock.Unlock()
}

我们中间一段互斥访问的逻辑代码可能会存在函数返回,或引发 panic 等情况,以上写法有可能会执行不到 Unlock,或在某些返回分支条件中忘记加上 Unlock 方法调用,造成死锁。更好的写法是在调用 Lock 方法之后,立刻就 defer 调用 Unlock 方法,无论函数返回或是引发 panic,Unlock 方法都能得到调用。

var lock sync.Mutex

func f() {
	lock.Lock()
	defer lock.Unlock()
	// do sth
}

Mutex 还存在一个 TryLock 方法,当无法获取锁时,将会返回 false,而不是一直阻塞。

var lock sync.Mutex

func f() {
	if lock.TryLock() {
		defer lock.Unlock()
		// do sth
	}
}

2. 原理解析

2.1 Mutex

Mutex 结构体定义如下:state 表示当前互斥锁的状态,sema 是一个信号量变量,用来控制等待协程的阻塞休眠和唤醒。

type Mutex struct {
	state int32
	sema  uint32
}

state 字段有 32 位长度,可以分为四部分。最低的三位分别表示 mutexLocked、mutexWoken、mutexStarving 三个状态,mutexLocked 表示 Mutex 是否已上锁, mutexWoken 表示从普通模式被唤醒,mutexStarving 表示是否属于饥饿模式。后面 29 位用于记录等待互斥锁协程数量的计数器 waiter,最多可以表示 2^29 个协程正在等待该互斥锁。

  • mutexLocked:互斥锁是否已上锁,为 1 表示已被锁定
  • mutexWoken:是否有等待的协程已被唤醒,防止多个等待的协程被同时唤醒来竞争锁
  • mutexStarving:互斥锁是否处于饥饿模式
  • waiter:有多少协程在等待获取互斥锁

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota // 3,用于 state 移位获取等待互斥锁的协程数量
)

Mutex 有两种模式:普通模式和饥饿模式。

在普通模式下,所有争取锁的协程都加入到先进先出的等待队列中,被唤醒的协程将会和新来的协程进行竞争,但新的协程已经在 CPU 中于是有优势获取锁,而被唤醒的协程往往可能无法获取到锁,而重新插入队列前面。如果一个等待锁的协程超过 1ms 无法获取锁时,该锁转为饥饿模式。

在饥饿模式下,当一个协程解锁后,将会将锁的拥有权直接传递给等待队列开头的协程。新来的协程即使在未上锁时也不会尝试去获取锁,也不会进行自旋,而只是将他们排到等待队列的队尾。当某个协程获取锁后,如果它是等待队列的最后一个,或着它等待时间小于 1ms,将锁变回普通模式。

普通模式有着更好的获取锁的性能,而饥饿模式则更注重公平,避免一些协程一直无法获取锁导致的饿死情况。

const (
	starvationThresholdNs = 1e6 // 1ms,普通模式和饥饿模式转换的阈值
)

2.2 Lock

Lock 方法用于给 Mutex 加锁,如果已经锁上了,将会一直阻塞等待。

func (m *Mutex) Lock() {
	// 快速通过 CAS 原子操作获得锁
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}
	// 无法获取锁,通过自旋或等待来获取锁
	m.lockSlow()
}

func (m *Mutex) lockSlow() {
	var waitStartTime int64 // 等待时间
	starving := false // 饥饿标记
	awoke := false // 唤醒标记
	iter := 0 // 自旋次数
	old := m.state // 锁的当前状态

	// 循环直至成功上锁
	for {
		// 尝试自旋:已上锁且不是饥饿模式,判断可以自旋时
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 不是唤醒状态,没有被其他人唤醒,等待的协程数不为0
			// 尝试将 state 的 mutexWoken 位置 1 成功时
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true // 设置唤醒标记
			}
            // 进行自旋并更新记录的状态
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}

		new := old
		// 如果不是饥饿模式,将 state 的 mutexLocked 位置 1,表示加锁
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		// 如果已上锁或是饥饿模式,将等待计数器加一
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// 如果是饥饿模式且已上锁,将 state 的 mutexStarving 位置 1
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving // 当前协程的锁变为饥饿模式,当解锁时不主动获取锁
		}
		// 如果自旋时成功设置唤醒标记
		if awoke {
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken // 清除唤醒标志位
		}

		// 尝试通过 CAS 原子操设置新的状态
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 原先没有上锁也不是饥饿模式,表示已经获取到锁,退出循环返回
			if old&(mutexLocked|mutexStarving) == 0 {
				break
			}
			// 如果等待时长大于 0,放到等待队列的开头
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 { // 第一次执行到这里,设置等待时间
				waitStartTime = runtime_nanotime()
			}
			// 阻塞等待获取 mutex,当前协程进行休眠
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// 检查等待时间,超过阈值时设置为饥饿状态
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			// 如果已经是饥饿状态
			if old&mutexStarving != 0 {
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 加锁并将等待计数器减一
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// 如果当前协程不是饥饿模式,就将锁转为正常模式
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			// 设置状态失败,重置 old,继续循环
			old = m.state
		}
	}
}

Lock 方法通过自旋锁机制,在某些情况下允许协程进行自旋等待,当期间成功加锁时,可以免除协程调度的时间开销,提高性能。

还通过普通模式和饥饿模式的切换,兼顾性能与公平。普通模式下所有协程争抢加锁,这时候往往是正在 CPU 中调度执行的协程获取到锁,而这会造成一些协程迟迟无法获取锁的情况。切换饥饿模式又能保证公平性,避免了部分协程获取锁的时间变得不合理地长。而在最后一个等待协程获取锁或等待获取锁时间小于 1ms 时,又切换回普通模式,保证了上锁的性能。

上锁的流程图如下:

2.3 TryLock

TryLock 方法也是给 Mutex 加锁,但与 Lock 方法的区别是,Lock 方法获取不到锁时会阻塞等待,而 TryLock 方法会返回 false。

func (m *Mutex) TryLock() bool {
	old := m.state
	// 判断已上锁或是饥饿模式,直接返回 false
	if old&(mutexLocked|mutexStarving) != 0 {
		return false
	}

	// 尝试将 mutexLocked 位置 1,成功返回 true,失败返回 false
	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
		return false
	}
	return true
}

TryLock 方法相比之下非常简单,通过 CAS 尝试加锁,随后将成功或失败结果直接返回,不需要考虑多个等待协程间的竞争关系。

2.4 Unlock

Unlock 方法用来给 Mutex 释放锁。

func (m *Mutex) Unlock() {
	// 将 state 减去 mutexLocked,即减一
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// 如果 state 不等于 0,表示其他位有值,有额外工作需要处理
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
	// 对未上锁的 mutex 调用 Unlock 将导致 fatal error
	if (new+mutexLocked)&mutexLocked == 0 {
		fatal("sync: unlock of unlocked mutex")
	}

	if new&mutexStarving == 0 {
		// 正常模式
		old := new
		for {
			// 如果等待计数器为 0,或者已经有在处理的情况,直接返回
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 等待计数器减一,设置 mutexWoken 为 1
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			// 尝试更新 state,成功时返回,否则继续循环
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				// 唤醒等待队列中的 waiter 协程,返回
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// 饥饿模式
		// 直接唤醒等待队列中的 waiter 协程
		runtime_Semrelease(&m.sema, true, 1)
	}
}

Unlock 方法释放锁,并通过信号量来唤醒阻塞获取锁的协程。根据 Mutex 属于普通模式还是饥饿模式,区别在于唤醒等待队列的任意一个或者是队首的等待协程。

由于 Mutex 本身并不会记录持有锁的协程信息,所以 Unlock 方法不会对解锁操作判断协程是否已加锁,对于未加锁的 Mutex 进行解锁将会触发 fatal error。我们需要在代码逻辑中保证先加锁后解锁的顺序。

解锁的流程图如下:

3. 参考