// stack 描述的是 Go 的执行栈,下界和上界分别为 [lo, hi] // 如果从传统内存布局的角度来讲,Go 的栈实际上是分配在 C 语言中的堆区的 // 所以才能比 ulimit -s 的 stack size 还要大(1GB) type stack struct { lo uintptr hi uintptr }
// g 的运行现场 type gobuf struct { sp uintptr// sp 寄存器 pc uintptr// pc 寄存器 g guintptr // g 指针 ctxt unsafe.Pointer // 这个似乎是用来辅助 gc 的 ret sys.Uintreg lr uintptr// 这是在 arm 上用的寄存器,不用关心 bp uintptr// 开启 GOEXPERIMENT=framepointer,才会有这个 }
_panic *_panic _defer *_defer m *m // 当前与 g 绑定的 m sched gobuf // goroutine 的现场 syscallsp uintptr// if status==Gsyscall, syscallsp = sched.sp to use during gc syscallpc uintptr// if status==Gsyscall, syscallpc = sched.pc to use during gc stktopsp uintptr// expected sp at top of stack, to check in traceback param unsafe.Pointer // wakeup 时的传入参数 atomicstatus uint32 stackLock uint32// sigprof/scang lock; TODO: fold in to atomicstatus goid int64// goroutine id waitsince int64// g 被阻塞之后的近似时间 waitreason string// if status==Gwaiting schedlink guintptr preempt bool// 抢占标记,这个为 true 时,stackguard0 是等于 stackpreempt 的 throwsplit bool// must not split stack raceignore int8// ignore race detection events sysblocktraced bool// StartTrace has emitted EvGoInSyscall about this goroutine sysexitticks int64// syscall 返回之后的 cputicks,用来做 tracing traceseq uint64// trace event sequencer tracelastp puintptr // last P emitted an event for this goroutine lockedm muintptr // 如果调用了 LockOsThread,那么这个 g 会绑定到某个 m 上 sig uint32 writebuf []byte sigcode0 uintptr sigcode1 uintptr sigpc uintptr gopc uintptr// 创建该 goroutine 的语句的指令地址 startpc uintptr// goroutine 函数的指令地址 racectx uintptr waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order cgoCtxt []uintptr// cgo traceback context labels unsafe.Pointer // profiler labels timer *timer // time.Sleep 缓存的定时器 selectDone uint32// 该 g 是否正在参与 select,是否已经有人从 select 中胜出 }
当 g 遇到阻塞,或需要等待的场景时,会被打包成 sudog 这样一个结构。一个 g 可能被打包为多个 sudog 分别挂在不同的等待队列上:
// sudog 代表在等待列表里的 g,比如向 channel 发送/接收内容时 // 之所以需要 sudog 是因为 g 和同步对象之间的关系是多对多的 // 一个 g 可能会在多个等待队列中,所以一个 g 可能被打包为多个 sudog // 多个 g 也可以等待在同一个同步对象上 // 因此对于一个同步对象就会有很多 sudog 了 // sudog 是从一个特殊的池中进行分配的。用 acquireSudog 和 releaseSudog 来分配和释放 sudog type sudog struct {
// 之后的这些字段都是被该 g 所挂在的 channel 中的 hchan.lock 来保护的 // shrinkstack depends on // this for sudogs involved in channel ops. g *g
// isSelect 表示一个 g 是否正在参与 select 操作 // 所以 g.selectDone 必须用 CAS 来操作,以胜出唤醒的竞争 isSelect bool next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack)
// 下面这些字段则永远都不会被并发访问 // 对于 channel 来说,waitlink 只会被 g 访问 // 对于信号量来说,所有的字段,包括上面的那些字段都只在持有 semaRoot 锁时才可以访问 acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
type m struct { g0 *g // 用来执行调度指令的 goroutine morebuf gobuf // gobuf arg to morestack divmod uint32// div/mod denominator for arm - known to liblink
// Fields not known to debuggers. procid uint64// for debuggers, but offset not hard-coded gsignal *g // signal-handling g goSigStack gsignalStack // Go-allocated signal handling stack sigmask sigset // storage for saved signal mask tls [6]uintptr// thread-local storage (for x86 extern register) mstartfn func() curg *g // 当前运行的用户 goroutine caughtsig guintptr // goroutine running during fatal signal p puintptr // attached p for executing go code (nil if not executing go code) nextp puintptr id int64 mallocing int32 throwing int32 preemptoff string// 该字段不等于空字符串的话,要保持 curg 始终在这个 m 上运行 locks int32 softfloat int32 dying int32 profilehz int32 helpgc int32 spinning bool// m 失业了,正在积极寻找工作~ blocked bool// m 正阻塞在 note 上 inwb bool// m 正在执行 write barrier newSigstack bool// minit on C thread called sigaltstack printlock int8 incgo bool// m 正在执行 cgo call freeWait uint32// if == 0, safe to free g0 and delete m (atomic) fastrand [2]uint32 needextram bool traceback uint8 ncgocall uint64// cgo 调用总计数 ncgo int32// 当前正在执行的 cgo 订单计数 cgoCallersUse uint32// if non-zero, cgoCallers in use temporarily cgoCallers *cgoCallers // cgo traceback if crashing in cgo call park note alllink *m // on allm schedlink muintptr mcache *mcache lockedg guintptr createstack [32]uintptr// stack that created this thread. freglo [16]uint32// d[i] lsb and f[i] freghi [16]uint32// d[i] msb and f[i+16] fflag uint32// floating point compare flags lockedExt uint32// tracking for external LockOSThread lockedInt uint32// tracking for internal lockOSThread nextwaitm muintptr // 正在等待锁的下一个 m waitunlockf unsafe.Pointer // todo go func(*g, unsafe.pointer) bool waitlock unsafe.Pointer waittraceev byte waittraceskip int startingtrace bool syscalltick uint32 thread uintptr// thread handle freelink *m // on sched.freem
// these are here because they are too large to be on the stack // of low-level NOSPLIT functions. libcall libcall libcallpc uintptr// for cpu profiler libcallsp uintptr libcallg guintptr syscall libcall // 存储 windows 平台的 syscall 参数
mOS }
抽象数据结构,可以认为是 processor 的抽象,代表了任务执行时的上下文,m 必须获得 p 才能执行:
id int32 status uint32// one of pidle/prunning/... link puintptr schedtick uint32// 每次调用 schedule 时会加一 syscalltick uint32// 每次系统调用时加一 sysmontick sysmontick // 上次 sysmon 观察到的 tick 时间 m muintptr // 和相关联的 m 的反向指针,如果 p 是 idle 的话,那这个指针是 nil mcache *mcache racectx uintptr
deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go) deferpoolbuf [5][32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. goidcache uint64 goidcacheend uint64
// runnable 状态的 goroutine。访问时是不加锁的 runqhead uint32 runqtail uint32 runq [256]guintptr // runnext 非空时,代表的是一个 runnable 状态的 G, // 这个 G 是被 当前 G 修改为 ready 状态的, // 并且相比在 runq 中的 G 有更高的优先级 // 如果当前 G 的还有剩余的可用时间,那么就应该运行这个 G // 运行之后,该 G 会继承当前 G 的剩余时间 // If a set of goroutines is locked in a // communicate-and-wait pattern, this schedules that set as a // unit and eliminates the (potentially large) scheduling // latency that otherwise arises from adding the ready'd // goroutines to the end of the run queue. runnext guintptr
// Available G's (status == Gdead) gfree *g gfreecnt int32
sudogcache []*sudog sudogbuf [128]*sudog
tracebuf traceBufPtr
// traceSweep indicates the sweep events should be traced. // This is used to defer the sweep start event until a span // has actually been swept. traceSweep bool // traceSwept and traceReclaimed track the number of bytes // swept and reclaimed by sweeping in the current sweep loop. traceSwept, traceReclaimed uintptr
palloc persistentAlloc // per-P to avoid mutex
// Per-P GC state gcAssistTime int64// Nanoseconds in assistAlloc gcFractionalMarkTime int64// Nanoseconds in fractional mark worker gcBgMarkWorker guintptr gcMarkWorkerMode gcMarkWorkerMode
// gcw is this P's GC work buffer cache. The work buffer is // filled by write barriers, drained by mutator assists, and // disposed on certain GC state transitions. gcw gcWork
// wbBuf is this P's GC write barrier buffer. // // TODO: Consider caching this in the running G. wbBuf wbBuf
runSafePointFn uint32// if 1, run sched.safePointFn at next safe point
midle muintptr // idle m's waiting for work nmidle int32// 当前等待工作的空闲 m 计数 nmidlelocked int32// 当前等待工作的被 lock 的 m 计数 mnext int64// 当前预缴创建的 m 数,并且该值会作为下一个创建的 m 的 ID maxmcount int32// 允许创建的最大的 m 数量 nmsys int32// number of system m's not counted for deadlock nmfreed int64// cumulative number of freed m's
ngsys uint32// number of system goroutines; updated atomically
pidle puintptr // 空闲 p's npidle uint32 nmspinning uint32// See "Worker thread parking/unparking" comment in proc.go.
// 全局的可运行 g 队列 runqhead guintptr runqtail guintptr runqsize int32
// dead G 的全局缓存 gflock mutex gfreeStack *g gfreeNoStack *g ngfree int32
gcwaiting uint32// gc is waiting to run stopwait int32 stopnote note sysmonwait uint32 sysmonnote note
// safepointFn should be called on each P at the next GC // safepoint if p.runSafePointFn is set. safePointFn func(*p) safePointWait int32 safePointNote note
profilehz int32// cpu profiling rate
procresizetime int64// 上次修改 gomaxprocs 的纳秒时间 totaltime int64// ∫gomaxprocs dt up to procresizetime }
g/p/m 的关系
Go 实现了所谓的 M:N 模型,执行用户代码的 goroutine 可以认为都是对等的 goroutine。不考虑 g0 和 gsignal 的话,我们可以简单地认为调度就是将 m 绑定到 p,然后在 m 中不断循环执行调度函数(runtime.schedule),寻找可用的 g 来执行,下图为 m 绑定到 p 时,可能得到的 g 的来源:
// For example: // // func f(arg1, arg2, arg3 int) { // pc := getcallerpc() // sp := getcallersp(unsafe.Pointer(&arg1)) //} // // These two lines find the PC and SP immediately following // the call to f (where f will return). //
if fn == nil { _g_.m.throwing = -1// do not dump full stacks throw("go of nil func value") } _g_.m.locks++ // disable preemption because it can be holding p in a local var siz := narg siz = (siz + 7) &^ 7
_p_ := _g_.m.p.ptr() newg := gfget(_p_) if newg == nil { newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. }
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign sp := newg.stack.hi - totalSize spArg := sp
// 初始化 g,g 的 gobuf 现场,g 的 m 的 curg // 以及各种寄存器 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.startpc = fn.fn if _g_.m.curg != nil { newg.labels = _g_.m.curg.labels }
// adjust Gobuf as if it executed a call to fn // and then did an immediate gosave. funcgostartcallfn(gobuf *gobuf, fv *funcval) { var fn unsafe.Pointer if fv != nil { fn = unsafe.Pointer(fv.fn) } else { fn = unsafe.Pointer(funcPC(nilfunc)) } gostartcall(gobuf, fn, unsafe.Pointer(fv)) }
// adjust Gobuf as if it executed a call to fn with context ctxt // and then did an immediate gosave. funcgostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) { sp := buf.sp if sys.RegSize > sys.PtrSize { sp -= sys.PtrSize *(*uintptr)(unsafe.Pointer(sp)) = 0 } sp -= sys.PtrSize *(*uintptr)(unsafe.Pointer(sp)) = buf.pc // 注意这里,这个,这里的 buf.pc 实际上是 goexit 的 pc buf.sp = sp buf.pc = uintptr(fn) buf.ctxt = ctxt }
// runqput 尝试把 g 放到本地执行队列中 // next 参数如果是 false 的话,runqput 会将 g 放到运行队列的尾部 // If next if false, runqput adds g to the tail of the runnable queue. // If next is true, runqput puts g in the _p_.runnext slot. // If the run queue is full, runnext puts g on the global queue. // Executed only by the owner P. funcrunqput(_p_ *p, gp *g, next bool) { if randomizeScheduler && next && fastrand()%2 == 0 { next = false }
if next { retryNext: oldnext := _p_.runnext if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } if oldnext == 0 { return } // 把之前的 runnext 踢到正常的 runq 中 gp = oldnext.ptr() }
retry: h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers t := _p_.runqtail if t-h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption return } if runqputslow(_p_, gp, h, t) { return } // 队列没有满的话,上面的 put 操作会成功 goto retry }
// 因为 slow,所以会一次性把本地队列里的多个 g (包含当前的这个) 放到全局队列 // 只会被 g 的 owner P 执行 funcrunqputslow(_p_ *p, gp *g, h, t uint32)bool { var batch [len(_p_.runq)/2 + 1]*g
// 先从本地队列抓一批 g n := t - h n = n / 2 if n != uint32(len(_p_.runq)/2) { throw("runqputslow: queue is not full") } for i := uint32(0); i < n; i++ { batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() } if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume returnfalse } batch[n] = gp
if randomizeScheduler { for i := uint32(1); i <= n; i++ { j := fastrandn(i + 1) batch[i], batch[j] = batch[j], batch[i] } }
// 把这些 goroutine 构造成链表 for i := uint32(0); i < n; i++ { batch[i].schedlink.set(batch[i+1]) }
if debug.scavenge > 0 { // Scavenge-a-lot for testing. forcegcperiod = 10 * 1e6 scavengelimit = 20 * 1e6 }
lastscavenge := nanotime() nscavenge := 0
lasttrace := int64(0) idle := 0// how many cycles in succession we had not wokeup somebody delay := uint32(0) for { if idle == 0 { // 初始化时 20us sleep delay = 20 } elseif idle > 50 { // start doubling the sleep after 1ms... delay *= 2 } if delay > 10*1000 { // 最多到 10ms delay = 10 * 1000 } usleep(delay) if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { lock(&sched.lock) if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) { atomic.Store(&sched.sysmonwait, 1) unlock(&sched.lock) // Make wake-up period small enough // for the sampling to be correct. maxsleep := forcegcperiod / 2 if scavengelimit < forcegcperiod { maxsleep = scavengelimit / 2 } shouldRelax := true if osRelaxMinNS > 0 { next := timeSleepUntil() now := nanotime() if next-now < osRelaxMinNS { shouldRelax = false } } if shouldRelax { osRelax(true) } notetsleep(&sched.sysmonnote, maxsleep) if shouldRelax { osRelax(false) } lock(&sched.lock) atomic.Store(&sched.sysmonwait, 0) noteclear(&sched.sysmonnote) idle = 0 delay = 20 } unlock(&sched.lock) } // trigger libc interceptors if needed if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } // 如果 10ms 没有 poll 过 network,那么就 netpoll 一次 lastpoll := int64(atomic.Load64(&sched.lastpoll)) now := nanotime() if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) gp := netpoll(false) // 非阻塞 -- 返回一个 goroutine 的列表 if gp != nil { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1) injectglist(gp) incidlelocked(1) } } // 接收在 syscall 状态阻塞的 P // 抢占长时间运行的 G if retake(now) != 0 { idle = 0 } else { idle++ } // 检查是否需要 force GC(两分钟一次的) if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 { lock(&forcegc.lock) forcegc.idle = 0 forcegc.g.schedlink = 0 injectglist(forcegc.g) unlock(&forcegc.lock) } // 每过一段时间扫描一次堆 if lastscavenge+scavengelimit/2 < now { mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit)) lastscavenge = now nscavenge++ } if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now { lasttrace = now schedtrace(debug.scheddetail > 0) } } }
// If we are dying because of a signal caught on an already idle thread, // freezetheworld will cause all running threads to block. // And runtime will essentially enter into deadlock state, // except that there is a thread that will call exit soon. if panicking > 0 { return }
run := mcount() - sched.nmidle - sched.nmidlelocked - sched.nmsys if run > 0 { return } if run < 0 { print("runtime: checkdead: nmidle=", sched.nmidle, " nmidlelocked=", sched.nmidlelocked, " mcount=", mcount(), " nmsys=", sched.nmsys, "\n") throw("checkdead: inconsistent counts") }
grunning := 0 lock(&allglock) for i := 0; i < len(allgs); i++ { gp := allgs[i] if isSystemGoroutine(gp) { continue } s := readgstatus(gp) switch s &^ _Gscan { case _Gwaiting: grunning++ case _Grunnable, _Grunning, _Gsyscall: unlock(&allglock) print("runtime: checkdead: find g ", gp.goid, " in status ", s, "\n") throw("checkdead: runnable g") } } unlock(&allglock) if grunning == 0 { // possible if main goroutine calls runtime·Goexit() throw("no goroutines (main called runtime.Goexit) - deadlock!") }
// Maybe jump time forward for playground. gp := timejump() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) globrunqput(gp) _p_ := pidleget() if _p_ == nil { throw("checkdead: no p for timer") } mp := mget() if mp == nil { // There should always be a free M since // nothing is running. throw("checkdead: no m for timer") } mp.nextp.set(_p_) notewakeup(&mp.park) return }
getg().m.throwing = -1// do not dump full stacks throw("all goroutines are asleep - deadlock!") }
// forcePreemptNS is the time slice given to a G before it is // preempted. const forcePreemptNS = 10 * 1000 * 1000// 10ms
funcretake(now int64)uint32 { n := 0 // Prevent allp slice changes. This lock will be completely // uncontended unless we're already stopping the world. lock(&allpLock) // We can't use a range loop over allp because we may // temporarily drop the allpLock. Hence, we need to re-fetch // allp each time around the loop. for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { // 在 procresize 修改了 allp 但还没有创建新的 p 的时候 // 会有这种情况 continue } pd := &_p_.sysmontick s := _p_.status if s == _Psyscall { // 从 syscall 接管 P,如果它进行 syscall 已经经过了一个 sysmon 的 tick(至少 20us) t := int64(_p_.syscalltick) ifint64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // 一方面如果没有其它工作可做的话,我们不想接管 p // 但另一方面为了避免 sysmon 线程陷入沉睡,我们最终还是会接管这些 p if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // 解开 allplock 的锁,然后就可以持有 sched.lock 锁了 unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ handoffp(_p_) } incidlelocked(1) lock(&allpLock) } elseif s == _Prunning { // 如果 G 运行时间太长,那么抢占它 t := int64(_p_.schedtick) ifint64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now continue } if pd.schedwhen+forcePreemptNS > now { continue } preemptone(_p_) } } unlock(&allpLock) returnuint32(n) }
funcnewosproc(mp *m, stk unsafe.Pointer) { // Disable signals during clone, so that the new thread starts // with signals disabled. It will enable them in minit. var oset sigset sigprocmask(_SIG_SETMASK, &sigset_all, &oset) ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) sigprocmask(_SIG_SETMASK, &oset, nil)
if ret < 0 { print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n") if ret == -_EAGAIN { println("runtime: may need to increase max user processes (ulimit -u)") } throw("newosproc") } }
if _g_.m.locks != 0 { throw("schedule: holding locks") }
if _g_.m.lockedg != 0 { stoplockedm() execute(_g_.m.lockedg.ptr(), false) // Never returns. }
// 执行 cgo 调用的 g 不能被 schedule 走 // 因为 cgo 调用使用 m 的 g0 栈 if _g_.m.incgo { throw("schedule: in cgo") }
top: if sched.gcwaiting != 0 { gcstopm() goto top } if _g_.m.p.ptr().runSafePointFn != 0 { runSafePointFn() }
var gp *g var inheritTime bool if trace.enabled || trace.shutdown { gp = traceReader() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) traceGoUnpark(gp, 0) } } if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) } if gp == nil { // 每调度几次就检查一下全局的 runq 来确保公平 // 否则两个 goroutine 就可以通过互相调用 // 完全占用本地的 runq 了 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } } if gp == nil { gp, inheritTime = findrunnable() // 在找到 goroutine 之前会一直阻塞下去 }
// 当前线程将要执行 goroutine,并且不会再进入 spinning 状态 // 所以如果它被标记为 spinning,我们需要 reset 这个状态 // 可能会重启一个新的 spinning 状态的 M if _g_.m.spinning { resetspinning() }
if gp.lockedm != 0 { // Hands off own p to the locked m, // then blocks waiting for a new p. startlockedm(gp) goto top }
// Schedules gp to run on the current M. // If inheritTime is true, gp inherits the remaining time in the // current time slice. Otherwise, it starts a new time slice. // Never returns. // // Write barriers are allowed because this is called immediately after // acquiring a P in several places. // //go:yeswritebarrierrec funcexecute(gp *g, inheritTime bool) { _g_ := getg() // 这个可能是 m 的 g0
casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard if !inheritTime { _g_.m.p.ptr().schedtick++ } _g_.m.curg = gp // 把当前 g 的位置让给 m gp.m = _g_.m // 把 gp 指向 m,建立双向关系
gogo(&gp.sched) }
比较简单,绑定 g 和 m,然后 gogo 执行绑定的 g 中的函数。
gogo
runtime.gogo 是汇编完成的,功能就是执行 go func() 的这个 func(),可以看到功能主要是把 g 对象的 gobuf 里的内容搬到寄存器里。然后从 gobuf.pc 寄存器存储的指令位置开始继续向后执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// void gogo(Gobuf*) // restore state from Gobuf; longjmp TEXT runtime·gogo(SB), NOSPLIT, $16-8 MOVQ buf+0(FP), BX // gobuf MOVQ gobuf_g(BX), DX MOVQ 0(DX), CX // make sure g != nil get_tls(CX) MOVQ DX, g(CX) MOVQ gobuf_sp(BX), SP // restore SP MOVQ gobuf_ret(BX), AX MOVQ gobuf_ctxt(BX), DX MOVQ gobuf_bp(BX), BP MOVQ $0, gobuf_sp(BX) // clear to help garbage collector MOVQ $0, gobuf_ret(BX) MOVQ $0, gobuf_ctxt(BX) MOVQ $0, gobuf_bp(BX) MOVQ gobuf_pc(BX), BX JMP BX
// Goexit terminates the goroutine that calls it. No other goroutine is affected. // Goexit runs all deferred calls before terminating the goroutine. Because Goexit // is not a panic, any recover calls in those deferred functions will return nil. // // Calling Goexit from the main goroutine terminates that goroutine // without func main returning. Since func main has not returned, // the program continues execution of other goroutines. // If all other goroutines exit, the program crashes. funcGoexit() { // Run all deferred functions for the current goroutine. // This code is similar to gopanic, see that implementation // for detailed comments. gp := getg() for { d := gp._defer if d == nil { break } if d.started { if d._panic != nil { d._panic.aborted = true d._panic = nil } d.fn = nil gp._defer = d.link freedefer(d) continue } d.started = true reflectcall(nil, unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz)) if gp._defer != d { throw("bad defer entry in Goexit") } d._panic = nil d.fn = nil gp._defer = d.link freedefer(d) // Note: we ignore recovers here because Goexit isn't a panic } goexit1() }
// Finishes execution of the current goroutine. funcgoexit1() { if raceenabled { racegoend() } if trace.enabled { traceGoEnd() } mcall(goexit0) }
1 2 3 4 5 6 7
// The top-most function running on a goroutine // returns to goexit+PCQuantum. TEXT runtime·goexit(SB),NOSPLIT,$0-0 BYTE $0x90// NOP CALL runtime·goexit1(SB) // does not return // traceback from goexit1 must hit code range of goexit BYTE $0x90// NOP
// func mcall(fn func(*g)) // Switch to m->g0's stack, call fn(g). // Fn must never return. It should gogo(&g->sched) // to keep running g. TEXT runtime·mcall(SB), NOSPLIT, $0-8 MOVQ fn+0(FP), DI
get_tls(CX) MOVQ g(CX), AX // save state in g->sched MOVQ 0(SP), BX // caller's PC MOVQ BX, (g_sched+gobuf_pc)(AX) LEAQ fn+0(FP), BX // caller's SP MOVQ BX, (g_sched+gobuf_sp)(AX) MOVQ AX, (g_sched+gobuf_g)(AX) MOVQ BP, (g_sched+gobuf_bp)(AX)
// switch to m->g0 & its stack, call fn MOVQ g(CX), BX MOVQ g_m(BX), BX MOVQ m_g0(BX), SI CMPQ SI, AX // if g == m->g0 call badmcall JNE 3(PC) MOVQ $runtime·badmcall(SB), AX JMP AX MOVQ SI, g(CX) // g = m->g0 MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp PUSHQ AX MOVQ DI, DX MOVQ 0(DI), DI CALL DI POPQ AX MOVQ $runtime·badmcall2(SB), AX JMP AX RET
// Tries to add one more P to execute G's. // Called when a G is made runnable (newproc, ready). funcwakep() { // be conservative about spinning threads if !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) }
// Schedules some M to run the p (creates an M if necessary). // If p==nil, tries to get an idle P, if no idle P's does nothing. // May run with m.p==nil, so write barriers are not allowed. // If spinning is set, the caller has incremented nmspinning and startm will // either decrement nmspinning or set m.spinning in the newly started M. //go:nowritebarrierrec funcstartm(_p_ *p, spinning bool) { lock(&sched.lock) if _p_ == nil { _p_ = pidleget() if _p_ == nil { unlock(&sched.lock) if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. ifint32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } return } } mp := mget() unlock(&sched.lock) if mp == nil { var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_) return } if mp.spinning { throw("startm: m is spinning") } if mp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning mp.nextp.set(_p_) notewakeup(&mp.park) }
// Puts the current goroutine into a waiting state and calls unlockf. // If unlockf returns false, the goroutine is resumed. // unlockf must not access this G's stack, as it may be moved between // the call to gopark and the call to unlockf. funcgopark(unlockf func(*g, unsafe.Pointer)bool, lockunsafe.Pointer, reasonstring, traceEvbyte, traceskipint) { mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf)) gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) }
// Mark gp ready to run. funcready(gp *g, traceskip int, next bool) { if trace.enabled { traceGoUnpark(gp, traceskip) }
status := readgstatus(gp)
// Mark runnable. _g_ := getg() _g_.m.locks++ // disable preemption because it can be holding p in a local var if status&^_Gscan != _Gwaiting { dumpgstatus(gp) throw("bad g->status in ready") }
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { wakep() } _g_.m.locks-- if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in Case we've cleared it in newstack _g_.stackguard0 = stackPreempt } }
// 从其它 p 那里偷 g procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { // GOMAXPROCS=1 或者除了我们其它的 p 都是 idle // 新的工作可能从 syscall/cgocall,网络或者定时器中来。 // 上面这些任务都不会被放到本地的 runq,所有没有可以 stealing 的点 goto stop } // 如果正在自旋的 M 的数量 >= 忙着的 P,那么阻塞 // 这是为了 // 当 GOMAXPROCS 远大于 1,但程序的并行度又很低的时候 // 防止过量的 CPU 消耗 if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2// first look for ready queues with more than 1 g if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } }
stop:
// 没有可以干的事情。如果我们正在 GC 的标记阶段,可以安全地扫描和加深对象的颜色, // 这样可以进行空闲时间的标记,而不是直接放弃 P if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false }
// Before we drop our P, make a snapshot of the allp slice, // which can change underfoot once we no longer block // safe-points. We don't need to snapshot the contents because // everything up to cap(allp) is immutable. allpSnapshot := allp
// 返回 P 并阻塞 lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state, // potentially concurrently with submission of new goroutines. We must // drop nmspinning first and then check all per-P queues again (with // #StoreLoad memory barrier in between). If we do it the other way around, // another thread can submit a goroutine after we've checked all run queues // but before we drop nmspinning; as the result nobody will unpark a thread // to run the goroutine. // If we discover new work below, we need to restore m.spinning as a signal // for resetspinning to unpark a new worker thread (because there can be more // than one starving goroutine). However, if after discovering new work // we also observe no idle Ps, it is OK to just park the current thread: // the system is fully loaded so no spinning threads are required. // Also see "Worker thread parking/unparking" comment at the top of the file. wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false ifint32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } }
// 再检查一下所有的 runq for _, _p_ := range allpSnapshot { if !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } }
// 再检查 gc 空闲 g if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) { lock(&sched.lock) _p_ = pidleget() if _p_ != nil && _p_.gcBgMarkWorker == 0 { pidleput(_p_) _p_ = nil } unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // Go back to idle GC check. goto stop } }
// poll network if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } if _g_.m.spinning { throw("findrunnable: netpoll with spinning") } gp := netpoll(true) // 阻塞到返回为止 atomic.Store64(&sched.lastpoll, uint64(nanotime())) if gp != nil { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } injectglist(gp) } } stopm() goto top }
m 和 p 解绑定
handoffp
graph TD
mexit --> A[is m0?]
A --> |yes|B[handoffp]
A --> |no| C[iterate allm]
C --> |m found|handoffp
C --> |m not found| throw
forEachP --> |p status == syscall| handoffp
stoplockedm --> handoffp
entersyscallblock --> entersyscallblock_handoff
entersyscallblock_handoff --> handoffp
retake --> |p status == syscall| handoffp
// Hands off P from syscall or locked M. // Always runs without a P, so write barriers are not allowed. //go:nowritebarrierrec funchandoffp(_p_ *p) { // handoffp must start an M in any situation where // findrunnable would return a G to run on _p_.
// if it has local work, start it straight away if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } // if it has GC work, start it straight away if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { startm(_p_, false) return } // no local work, check that there are no spinning/idle M's, // otherwise our help is not required if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } lock(&sched.lock) if sched.gcwaiting != 0 { _p_.status = _Pgcstop sched.stopwait-- if sched.stopwait == 0 { notewakeup(&sched.stopnote) } unlock(&sched.lock) return } if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { sched.safePointFn(_p_) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } } if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } pidleput(_p_) unlock(&sched.lock) }
// Called from runtime·morestack when more stack is needed. // Allocate larger stack and relocate to new stack. // Stack growth is multiplicative, for constant amortized cost. // // g->atomicstatus will be Grunning or Gscanrunning upon entry. // If the GC is trying to stop this g then it will set preemptscan to true. // // This must be nowritebarrierrec because it can be called as part of // stack growth from other nowritebarrierrec functions, but the // compiler doesn't check this. // //go:nowritebarrierrec funcnewstack() { thisg := getg() // TODO: double check all gp. shouldn't be getg(). if thisg.m.morebuf.g.ptr().stackguard0 == stackFork { throw("stack growth after fork") } if thisg.m.morebuf.g.ptr() != thisg.m.curg { print("runtime: newstack called from g=", hex(thisg.m.morebuf.g), "\n"+"\tm=", thisg.m, " m->curg=", thisg.m.curg, " m->g0=", thisg.m.g0, " m->gsignal=", thisg.m.gsignal, "\n") morebuf := thisg.m.morebuf traceback(morebuf.pc, morebuf.sp, morebuf.lr, morebuf.g.ptr()) throw("runtime: wrong goroutine in newstack") }
// NOTE: stackguard0 may change underfoot, if another thread // is about to try to preempt gp. Read it just once and use that same // value now and below. preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
// Be conservative about where we preempt. // We are interested in preempting user Go code, not runtime code. // If we're holding locks, mallocing, or preemption is disabled, don't // preempt. // This check is very early in newstack so that even the status change // from Grunning to Gwaiting and back doesn't happen in this case. // That status change by itself can be viewed as a small preemption, // because the GC might change Gwaiting to Gscanwaiting, and then // this goroutine has to wait for the GC to finish before continuing. // If the GC is in some way dependent on this goroutine (for example, // it needs a lock held by the goroutine), that small preemption turns // into a real deadlock. if preempt { if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning { // Let the goroutine keep running for now. // gp->preempt is set, so it will be preempted next time. gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // never return } }
if preempt { if gp == thisg.m.g0 { throw("runtime: preempt g0") } if thisg.m.p == 0 && thisg.m.locks == 0 { throw("runtime: g is running but p is not") } // Synchronize with scang. casgstatus(gp, _Grunning, _Gwaiting) if gp.preemptscan { for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) { // Likely to be racing with the GC as // it sees a _Gwaiting and does the // stack scan. If so, gcworkdone will // be set and gcphasework will simply // return. } if !gp.gcscandone { // gcw is safe because we're on the // system stack. gcw := &gp.m.p.ptr().gcw scanstack(gp, gcw) if gcBlackenPromptly { gcw.dispose() } gp.gcscandone = true } gp.preemptscan = false gp.preempt = false casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting) // This clears gcscanvalid. casgstatus(gp, _Gwaiting, _Grunning) gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // never return }
// Act like goroutine called runtime.Gosched. casgstatus(gp, _Gwaiting, _Grunning) gopreempt_m(gp) // never return }
// Allocate a bigger segment and move the stack. oldsize := gp.stack.hi - gp.stack.lo newsize := oldsize * 2 if newsize > maxstacksize { print("runtime: goroutine stack exceeds ", maxstacksize, "-byte limit\n") throw("stack overflow") }
// The goroutine must be executing in order to call newstack, // so it must be Grunning (or Gscanrunning). casgstatus(gp, _Grunning, _Gcopystack)
// The concurrent GC will not scan the stack while we are doing the copy since // the gp is in a Gcopystack status. copystack(gp, newsize, true) if stackDebug >= 1 { print("stack grow done\n") } casgstatus(gp, _Gcopystack, _Grunning) gogo(&gp.sched) }