社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
官方包的注释:
// Package sync provides basic synchronization primitives such as mutual
// exclusion locks. Other than the Once and WaitGroup types, most are intended
// for use by low-level library routines. Higher-level synchronization is
// better done via channels and communication.
sync
包提供基础的同步原语,sync.Mutext
、sync.RWMutex
、sync.WaitGroup
、sync.Once
和sync.Cond
。
Go 语言的sync.Mutex
由两个字段state
和sema
组成。其中,state
表示当前互斥锁的状态,sema
是用来控制锁状态的信号量。
type Mutex struct {
state int32
sema uint32
}
上述两个加起来只占 8 字节空间的结构体表过了 Go 语言中的互斥锁。
互斥锁的状态:
const (
mutexLocked = 1 << iota // 锁定
mutexWoken // 唤醒
mutexStarving // 饥饿
...
)
sync.Mutex
有两种模式——正常模式和饥饿模式。
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换为饥饿模式,防止部分 Goroutine 被“饿死”。
引入饥饿模式的目的是为了保证互斥锁的公平性。在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁,也不会进入自旋状态,只会在队列的末尾等待。如果一个 Goroutine 获取到了互斥锁并且它在队列末尾的时间或者它等待的时间少于 1ms ,那么当前的互斥锁就会切换回正常模式。
与饥饿模式相比,正常模式下的互斥锁能够提供更好的性能,饥饿模式能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延迟。
上锁sync.Mutex.Lock
,解锁sync.Mutex.Unlock
。
互斥锁的上锁方法经过精简,方法的主干只保留最常见、简单的情况 ——当锁的状态是 0 时,将mutextLocked
位置换成 1:
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
如果互斥锁的状态不是 0 时就会调用sync.Mutex.lockSlow
尝试通过自旋等方式等待锁的释放,该方法的主体是一个非常大的for循环。这里将它分成几个部分进行介绍:
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
}
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核处理器上,自旋可以避免 $G$ 的切换,使用恰当能更好地利用资源,发挥更好的性能,但是使用不当,会拖慢整个程序,所以 $G$ 进入自旋的条件非常苛刻:
runtime.sync_runtime_canSpin
需要返回true
:
一旦当前 $G$ 能够进入自旋就会调用runtime.sync_runtime_doSpin
和runtime.procyield
并执行 30 次的PAUSE
指令,该指令只会占用 CPU 并消耗 CPU 时间:
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新state
字段中存储的不同信息:
const (
mutexLocked
mutexWoken
mutexStarving
mutexWaiterShift
)
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
计算了新的互斥锁状态之后,会使用 CAS 函数sync/atomic.CompareAndSwapInt32
更新状态:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 用 CAS 上锁
}
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
如果没有通过 CAS 获得锁,会调用runtime.sync_runtime_SemacquireMutex
通过信号量保证资源不会被两个 $G$ 获取。runtime.sync_runtime_SemacquireMutex
会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 $G$ 可以获取信号量,它就会立刻返回,sync.Mutex.Lock
的剩余代码也会继续执行。
互斥锁的解锁过程sync.Mutex.Unlock
与加锁过程相比就很简单,该过程会先使用sync/atomic.AddInt32
函数快速解锁,这时会发生下面的两种情况:
sync.Mutex.unlockSlow
开始慢速解锁func (m *Mutex) Unlock() {
...
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
sync.Mutex.unlockSlow
会先校验锁状态的合法性——如果当前互斥锁已经被解锁过了会直接抛出导常”sync: unlock of unlocked mutex“终止程序。
在正常情况下, 会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 正常模式
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
在正常模式下,上述代码会使用如下所示的处理过程:
mutexLocked
、mutexStarving
、mutexWoken
状态都不为 0,那么当前方法可以直接返回,不需要唤醒其他等待者runtime.sync_runtime_Semrelease
唤醒等待者并移交锁的所有权runtime.sync_runtime_Semrelease
将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态对上锁和解锁进行简单总结。
互斥锁的上锁过程比较复杂,涉及自旋、信号量以及调度等概念:
mutexLocked
上锁mutexLocked
状态并且在普通模式下工作,会进入自旋,执行 30 次PAUSE
指令占用 CPU 时间等待锁的释放runtime.sync_runtime_SemacquireMutex
将尝试获取锁的 $G$ 切换到休眠状态,等待锁的持有者唤醒互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
sync.Mutex.Unlock
会直接抛出异常mutexLocked
标志位runtime.sync_runtime_Semrelease
唤醒对应的 $G$读写互斥锁sync.RWMutex
是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。
常见服务的资源读写比例会非常高,因为大多数的读请求之间不会相互影响,所以我们可以分离读写操作,以此来提高服务的性能。
type RWMutex struct {
w Mutex // 如果有未完成(pending)的写操作(writers)就一直维持互斥锁
writerSem uint32 // 写等待读的信号
readerSem uint32 // 读等待写的信号
readerCount int32 // 未完成(pending)的读操作(readers)的数量
readerWait int32 // 即将结束/正在离开(departing)的读操作的数量
}
写操作的锁使用sync.RWMutex.Lock
和sync.RWMutex.Unlock
方法。
当资源的使用者想要获取锁时,需要调用sync.RWMutex.Lock
方法:
func (rw *RWMutex) Lock() {
...
rw.w.Lock()
// 通过把 rw.readerCount 设置为负数,来告知读操作所有者有写操作未完成
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
...
}
调用结构体持有的sync.Mutex
结构体的sync.Mutex.Lock
阻塞后续的写操作
sync/atomic.AddInt32
函数阻塞后续的读操作runtime.sync_runtime_SemacquireMutex
进入休眠状态等待所有读锁的所有者执行结束后释放writeSem
信号量将当前协程唤醒写锁的释放会调用sync.RWMutex.Unlock
:
func (rw *RWMutex) Unlock() {
...
// 将 readerCount 的值增加 rwmutexMaxReaders,使 readerCount 变为非负数,宣告有读操作即将结束
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
...
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
...
}
与加锁的过程正好相反,写锁的释放分为以下几步:
sync/atomic.AddInt32
函数将readerCount
变回正数,释放读锁sync.Mutex.Unlock
释放写锁获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作“饿死”。
读锁的加锁方法sync.RMWutex.RLock
很简单,该方法会通过sync/actomic.AddInt32
将readerCount
加一:
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 有一个写操作未完成,等待它执行完毕
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
...
}
runtime.sync_runtime_SemacquireMutex
陷入休眠等待锁的释放当 $G$ 想要释放读锁时,会调用如下所示的sync.RMWutex.RUnlock
方法:
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
...
}
该方法会先减少正在读资源的readerCount
整数,根据sync/atomic.AddInt32
的返回值不同会分别进行处理:
sync.RWMutex.rUnlockSlow
方法func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
...
throw("sync: RUnlock of unlocked RWMutex")
}
// 有一个写操作未完成
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
sync.RWMutex.rUnlockSlow
会减少获取锁的写操作等待的读操作数readerWait
并在所有读操作都被释放之后触发写操作的信号量writerSem
,该信号量被触发时,调度器就会唤醒尝试获取写锁的 $G$。
虽然读写互斥锁sync.RMWutex
提供的功能比较复杂,但它是建立在sync.Mutex
的基出上,所以代码实现很简单。
读锁和写锁的关系:
调用sync.RMWutex.Lock
尝试获取锁时
sync.RMWutex.RUlock
都会将readerCount
减一,当它归零时该 $G$ 会获得写锁readerCount
减少rwmutexMaxReaders
以阻塞后续的读操作sync.RWMutex.Unlock
释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁读写互斥锁在互斥锁之外提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。
sync.WaitGroup
可以等待一组 $G$ 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := sync.WaitGroup{}
wg.add(len(requests))
for _, request := range requests {
go func(r *request) {
defer wg.Done()
...
}(request)
}
wg.Wait()
可以通过sync.WaitGroup
将原本顺序执行的代码在多个 $G$ 中并发执行,加快程序处理速度。
type WaitGroup struct {
noCopy noCopy // 保证 wg 不会被开发者通过再赋值的方式拷贝
state1 [3]uint32 // 存储状态和信号量
}
sync.noCopy
是一个私有结构体,在编绎时会检查被拷贝的变量中是否包含sync.noCopy
或者实现了Lock
和Unlock
方法。如果包含该结构体或者实现了对应的方法就会报出以下错误:
func main() {
wg := sync.WaitGroup{}
wg2 := wg
fmt.Println(wg, wg2)
}
$ go vet main.go
# command-line-arguments
./main.go:10:9: assignment copies lock value to wg2: sync.WaitGroup contains sync.noCopy
./main.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
./main.go:11:18: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
这段代码会因为变量赋值或调用函数时发生值拷贝导致分析器报错。
sync.state1
的代码注释:
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
sync.WaitGroup
提供的私有方法sync.WaitGroup.state
能够帮我们从state1
字段中取出它的状态和信号量。
sync.WaitGroup
对外暴露了三个方法:Add
,Wait
,Done
。
其中Done
方法只是向Add
中传入了 -1,所以重点分析另外两个方法Add
和Wait
。
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
...
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
...
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Add
方法向可能是负数的WaitGroup
的counter
上增加增量。
如果counter
归零,所有Wait
的被阻塞的 $G$ 都被释放。
如果counter
是负数,会引发 panic。
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
...
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
...
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
...
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
...
return
}
}
}
Wait
的作用就是在WaitGroup
的counter
归零前一直阻塞。
sync.WaitGroup
必须在sync.WaitGroup.Wait
方法返回之后才能被重新使用sync.WaitGroup.Done
只是向sync.WaitGroup.Add
方法传入 -1 以唤醒等待的 $G$。所以也可以通过向Add
内传递一个负数来代替Done
sync.WaitGroup
计数器归零,这些 $G$ 会被同时唤醒sync.Once
可以保证程序运行期间某段代码只执行一次。
简单示例:
func main() {
o := sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("once")
})
}
}
$ go run main.go
once
type Once struct {
done uint32 // 代码是否执行过的标识
m Mutex // 互斥锁
}
sync.Once.Do
是sync.Once
结构体对外暴露的唯一的方法,该方法会接收一个入参为空的函数:
sync.Once.doSlow
执行传入的函数func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
执行过程:
done
更新成 1sync.Once
会通过成员变量done
确保函数不会执行第二次。
作为用于保证函数执行次数的sync.Once
结构体,使用了互斥锁sync/atomic
包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。
在使用该结构体时,也需要注意以下问题:
sync.Once.Do
方法中传入的函数只会被执行一次,哪怕函数中发生了panic
sync.Once.Do
方法传入不同的函数只会执行第一次传入的函数sync.Cond
是条件变量,可以让一组 $G$ 都在满足特定条件时被唤醒。每一个sync.Cond
结构体在初始化时都需要传入一个互斥锁。
简单示例:
var status uint32
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadUint32(&status) != 1 {
c.Wait()
}
fmt.Println("listenning")
c.L.Unlock()
}
func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreUint32(&status, 1)
c.Broadcast()
c.L.Unlock()
}
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
上述代码同时运行了 11 个 $G$,这 11 个 $G$ 分别做了不同的事:
sync.Cond.Wait
等待特定条件的满足sync.Cond.Broadcast
唤醒所有陷入等待的 $G$调用sync.Cond.Broadcast
方法后,上述代码会打印出 10 次 "listenning" 并结束调用。
type Cond struct {
noCopy noCopy // 保证结构体不会在编绎时拷贝
L Locker // 保护内部的`notify`字段
notify notifyList // 一个 Goroutine 链表,实现同步机制的核心结构
checker copyChecker // 禁止运行期间发生拷贝
}
sync.Cond
对外暴露的sync.Cond.Wait
方法会将当前 $G$ 陷入休眠状态,它的执行过程分成以下两个步骤:
runtime.notifyListAdd
将等待计数器加一并解锁runtime.notifyListWait
等待其他 $G$ 的唤醒并加锁func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
sync.Cond.Signal
和sync.Cond.Broadcast
就是用来唤醒陷入休眠的 $G$ 的方法,它们的实现有一些细微的差别:
Signal
方法会唤醒队列最前面的 $G$Broadcast
方法会唤醒队列中全部的 $G$func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
$G$ 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的可能需要等待调度器的调度。
一般情况下,我们都会先调用sync.Cond.Wait
陷入休眠等待满足期望条件,当满足唤醒条件时,就可以选择使用sync.Cond.Signal
或者sync.Cond.Broadcast
唤醒一个或者全部的 $G$。
sync.Cond
不是一个常用的同步机制,但是在条件长时间无法满足时,与使用for {}
进行忙碌等待相比,sync.Cond
能够让出处理器的使用权,提供 CPU 的利用率。使用时需要注意以下问题:
Wait
在调用之前一定要上锁,否则会触发panic
,程序崩溃Signal
唤醒的 $G$ 都是队列最前面、等待最久的 $G$Broadcast
会按照一定顺序广播通知等待的全部 $G$如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!