引言

go协程在1.14之前便有抢占调度,但是在1.14之前,抢占调度有一定的条件(非硬性抢占,sysmon标记抢占,在某些过程中判断到被抢占则主动让出资源)。如下代码,在1.14之前不会停止,但在1.14中运行,则会停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
package main
import (
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(1)
go func() {
for {
}
}()
time.Sleep(time.Second)
}

接下来,将分析一下 1.14之前的版本的“抢占调度”与1.14真正的抢占调度是如何实现的(本文基于linux版本进行分析)

<1.14的“抢占调度”

以1.12.5版本的go为例。

在go程序启动过程中,会启动一个M来运行sysmon()任务,sysmon的一个重要功能就是让长时间运行的g主动让出资源给其他g运行。核心方法是retake(now).preemptone(p):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt
return true
}

在这里,将待被抢占g的preempt字段标记为true,并将g.stackguard0 值改为 stackPreempt(uintptrMask & -1314),g.stackguard0字段含义:go栈增长前用来比较的栈指针。这里将stackguard0值改的很小,它的值比 g.stack.lo还要小(go中栈的分配是从高地址到低地址),这样在扩增栈的时候就能够知道是sysmon准备抢占当前g,就会主动让出资源给其他程序运行:

1
2
3
4
5
6
7
8
9
10
11
12
func newstack() {
// ... ...
// 此处判断 gp.stackguard0 是否与 stackPreempt相等来判断是否要被抢占
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
// ... ...
if preempt {
// ... ...
// 此处会发起调度,即让出资源给其他g运行。 gopreempt_m() -> goschedImpl() -> schedule()
gopreempt_m(gp) // never return
}
// ... ...
}

如上,抢占的关键点是,触发栈申请。所以只要是没有触发栈申请的代码,都不会主动让出资源给其他g使用,那么就会导致问题:例如,无法gc,gc过程中要stw。

因此,网上所述g调度的时机点是基于此(触发栈空间的申请,涉及于此便会判断栈空间大小同时也会判断g.stackguard0是否>g.stack.lo,由此得到是否让出资源重新调度)。

1.14中抢占调度

在1.14版本中,保留了原有的抢占方式,在此基础上增加了基于信号的异步抢占,看下1.14版本中的preemptone方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}

return true
}

相较于之前版本的代码,最后多出了一段判断是否支持 preemptM与异步抢占开关处于off状态的逻辑,在条件通过后执行preemptM()方法

1
2
3
4
5
6
7
8
9
10
11
func preemptM(mp *m) {
if !pushCallSupported {
return
}
if GOOS == "darwin" && (GOARCH == "arm" || GOARCH == "arm64") && !iscgo {
return
}
if atomic.Cas(&mp.signalPending, 0, 1) {
signalM(mp, sigPreempt)
}
}

经过一些判断后,最终满足条件的情况下,会执行到 signalM(mp, sigPreempt),该方法会将sigPreempt信号发送给mp(需要被抢占的协程所对应的内核线程)。由定义知,sigPreempt=_SIGURG ,所以是向待被抢占的协程发送 _SIGURG 信号。

接下来,看下待被抢占的协程所在的内核线程是如何处理信号的:

程序创建新的内核线程时会执行以下函数(runtime/signal_unix.go/initsig()),下边看下此函数定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/** 创建新内核线程执行链:
newm() ---------> newm1() ----> 是cgo mstart() -> mstart1() -> mstart0 -> initsig()
templateThread() -----↗ ↘非cgo newosproc() --↗
**/
func initsig(preinit bool) {
if !preinit {
// It's now OK for signal handlers to run.
signalsOK = true
}
// For c-archive/c-shared this is called by libpreinit with
// preinit == true.
if (isarchive || islibrary) && !preinit {
return
}
for i := uint32(0); i < _NSIG; i++ {
t := &sigtable[i]
if t.flags == 0 || t.flags&_SigDefault != 0 {
continue
}
// We don't need to use atomic operations here because
// there shouldn't be any other goroutines running yet.
fwdSig[i] = getsig(i)
if !sigInstallGoHandler(i) {
// Even if we are not installing a signal handler,
// set SA_ONSTACK if necessary.
if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
setsigstack(i)
} else if fwdSig[i] == _SIG_IGN {
sigInitIgnored(i)
}
continue
}
handlingSig[i] = 1
setsig(i, funcPC(sighandler))
}
}

如上,这段代码核心在 setsig(i,funcPC(sighandler)),i代表的是信号,主要作用是当系统收到信号i时,执行sighandler方法。其中,_SIGURG 信号是包含在里边的。然后看下 sighandler方法:

1
2
3
4
5
6
7
8
9
10
11
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
// ... ...
if sig == sigPreempt {
// Might be a preemption signal.
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}
// ... ...
}

核心逻辑在这一段,if sig == sigPreempt,执行 doSigPreempt,sigPreempt上边介绍过,它的值为 _SIGURG,因此这里便是处理抢占逻辑的核心方法,继续挖掘 doSigPreempt:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) {
// Inject a call to asyncPreempt.
ctxt.pushCall(funcPC(asyncPreempt))
}
// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
}
// asyncPreempt saves all user registers and calls asyncPreempt2.
//
// When stack scanning encounters an asyncPreempt frame, it scans that
// frame and its parent frame conservatively.
//
// asyncPreempt is implemented in assembly.
func asyncPreempt()
//go:nosplit
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}

doSigPreempt判断g是否希望异步抢占和是否在safePoint,满足条件则执行pushCall。通过将asyncPreempt函数指针传递过去,在一定时机执行asyncPreempt。通过上边该函数的介绍可以知道,该函数保存用户寄存器信息,并且调用 asyncPreempt2函数;在asyncPreempt2函数中,则会执行到 mcall函数,它将m执行的g切换到g0,然后执行 preemptPark或gopreempt_m函数,无论哪种情况,最终都会调用到schedule(),致使重新调度。此时便完成了通过signal对更多场景中g的抢占。