TOC

首先看下源码文件 sync/mutex.go里边开头的注释

sync包提供基本同步原语,如互斥锁,除了Once和WaitGroup外,大多数类型都是供低层级库的协程使用,更高级别的同步最好通过通道和通信来完成,不应复制包含此包中定义的类型的值。

由注释可以看出,官方建议使用channel来做同步,而且此包中的值不允许复制,主要原因是,复制后加锁和解锁操作就不作用在同一个对象上,这样就会出问题。如下例子运行时会报unlock of unlocked mutex错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import "sync"

func main() {

lock := sync.Mutex{}
Lock(lock)
lock.Unlock()// 此行panic
}
// 调用此函数时会发生复制,这里加锁针对的是副本加锁
// 源lock未加锁,因此上边调用加锁会发生panic
// sync包下其他类型复制会发生同样问题
func Lock(lock sync.Mutex) {
lock.Lock()
}

再看下定义常量下边的注释:

锁公平:

互斥可以有两种操作模式:正常和饥饿。

  1. 在正常模式下,等待加锁的G按FIFO顺序排队,但被唤醒的waiter并不持有锁,它们与新来的G争夺所有权,新来的G有一个优势,它们已经在CPU上运行,而且可能有很多,所以一个被唤醒的G很有可能抢不到锁,在这种情况下,它将在等待队列前面排队,如果有waiter超过1ms无法获取互斥量,则互斥量mutex进入饥饿模式;

  2. 在饥饿模式下,互斥体的所有权直接从解锁的G移交给队列前边的waiter,新来的G不会去争夺锁,即使它看起来是解锁的,也不会去spinning,它们会在队尾排队。如果waiter收到互斥的所有权,并看到①它是队列中最后一个waiter或②它等待不到1ms,它将从互斥模式切换回正常模式。

正常模式有相当好的性能,因为Goroutine可以连续多次获取互斥,即使有阻塞的等待程序。饥饿模式对预防尾部潜伏期病变具有重要意义。

基本结构

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

其中 Mutex.sema表示信号量,Mutex.status字段每一位含义如下:

1
2
3
4
5
6
7
8
9
10
state:   |32|31|...| |3|2|1|
\__________/ | | |
| | | |
| | | +--- mutex的占用状态(1被占用,0可用)
| | |
| | +---mutex的当前goroutine是否被唤醒
| |
| +---饥饿位,0正常,1饥饿
|
+---等待唤醒以尝试锁定的goroutine的计数,0表示没有等待者

加锁解锁方法源码

Mutex一共有两个方法:Lock()Unlock()

Lock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// 锁定Mutex.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}

var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 如果原状态为非饥饿模式的锁定状态,且canSpin(函数实现在下边,当iter<4、cpu数>1、至少由1个运行中的p、当前p的运行g队列为空,则可spin)
if old&(mutexLocked|mutexStarving) == mutexLocked // 确定是已锁定的非饥饿状态 mutexLocked|mutexStarving = 00...00101 当且仅当old==1时 old & 101 == 1成立
&& runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 尝试设置mutexWoken标记来通知Unlock,不去唤醒其他阻塞的G
//
if !awoke &&
old&mutexWoken == 0 && // 再次确认未被唤醒
old>>mutexWaiterShift != 0 && // 确认有G在排队
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 将对象锁置为唤醒状态

awoke = true // 标记当前G awoke
}
runtime_doSpin() // 空转,根据iter,重试一定次数将不再空转
iter++// 自旋次数
old = m.state // 更新old
continue
}
new := old // 不可空转、 或 !非饥饿模式的锁定状态
// 不要试图获取饥饿的mutex,新来的G必须排队.
if old&mutexStarving == 0 { // 若old是非饥饿模式,标记加锁
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 { // 如果old 已加锁/饥饿模式/加锁数!=0 则加锁数+1
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.当前G切换到饥饿模式,但若mutex当前未加锁,则不切换
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case. Unlock期望饥饿模式下有等待者,这种情况下不会为true
if starving && old&mutexLocked != 0 { // 若处于饥饿模式,且(已加锁或有waiter),则new饥饿位做标记
new |= mutexStarving
}
if awoke { // 如果唤醒
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 { // 如果new的mutexWoken位为0,则抛异常
throw("sync: inconsistent mutex state")
}// 将唤醒位置0
new &^= mutexWoken
}// 获锁成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 { // 如果old值锁位与饥饿位标志都为0,则说明获锁成功直接break
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0 // 如果队列不为0,则入队
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

// runtime/proc.go linkname sync.runtime_canSpin()
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

Unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// 一个加锁状态的mutex不会与特定的G关联,允许在一个G加锁在另一个G解锁
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}

// Fast path: drop lock bit. 快速路径,删除锁标记
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 如果new的饥饿标记位为0
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果没有waiter或有G已经被唤醒或获得锁,不必再去唤醒
// 在饥饿模式,解锁的G直接将锁所有权移交给下一个waiter
// 我们不是
if old>>mutexWaiterShift == 0 || // 没有waiter在排队
old&(mutexLocked|mutexWoken|mutexStarving) != 0 {// 处于锁定、唤醒、饥饿状态
return
}
// Grab the right to wake someone.
// 抓住唤醒一个waiter的机会
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿模式:移交mutex所有权到下一个waiter
// 注意:MutexLocked没有被设置,waiter将会在唤醒后设置它,但如果mutexStarving被设置的情况下
// 需要考虑锁定mutex,因此新到达的G不会获得锁
runtime_Semrelease(&m.sema, true)
}
}