这篇“Go协作与抢占怎么实现”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Go协作与抢占怎么实现”文章吧。
1. 用户主动让出CPU:runtime.Gosched函数
在介绍两种抢占调度之前,我们首先介绍一下
runtime.Gosched函数:
// Gosched yields the processor, allowing other goroutines to run. It does not // suspend the current goroutine, so execution resumes automatically. func Gosched() { checkTimeouts() mcall(gosched_m) }
根据说明,
runtime.Gosched函数会主动放弃当前处理器,并且允许其他协程执行,但是起并不会暂停自己,而只是让渡调度权,之后依赖调度器获得重新调度。
之后,会通过
mcall函数切换到
g0栈去执行
gosched_m函数:
// Gosched continuation on g0. func gosched_m(gp *g) { if trace.enabled { traceGoSched() } goschedImpl(gp) }
gosched_m调用
goschedImpl函数,其会为协程
gp让渡出本M,并且将
gp放到全局队列中,等待调度。
func goschedImpl(gp *g) { status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } casgstatus(gp, _Grunning, _Grunnable) dropg() // 使当前m放弃gp,就是其参数 curg lock(&sched.lock) globrunqput(gp) // 并且把gp放到全局队列中,等待调度 unlock(&sched.lock) schedule() }
虽然
runtime.Gosched具有主动放弃CPU的能力,但是对用户的要求比较高,并非用户友好的。
2. 基于协作的抢占式调度
2.1 场景
package main import ( "fmt" "runtime" "sync" "time" ) var once = sync.Once{} func f() { once.Do(func() { fmt.Println("I am go routine 1!") }) } func main() { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) go func() { for { f() } }() time.Sleep(10 * time.Millisecond) fmt.Println("I am main goroutine!") }
我们考虑如上代码,首先我们设置P的个数为1,然后起一个协程中进入死循环,循环调用一个函数,如果没有抢占调度,那么这个协程将一直占据P,也就是会一直占据CPU,代码就永远不可能执行到
fmt.Println("I am main goroutine!")这行。下面我们看看,协作式抢占是怎么避免以上问题的。
2.2 栈扩张与抢占标记
$ go tool compile -N -l main.go
$ go tool objdump main.o >> main.i
我们通过以上指令,得到2.1中代码的汇编代码,截取
f函数的汇编代码如下:
TEXT "".f(SB) gofile../home/chenyiguo/smb_share/go_routine_test/main.go
main.go:12 0x151a 493b6610 CMPQ 0x10(R14), SP
main.go:12 0x151e 762b JBE 0x154b
main.go:12 0x1520 4883ec18 SUBQ $0x18, SP
main.go:12 0x1524 48896c2410 MOVQ BP, 0x10(SP)
main.go:12 0x1529 488d6c2410 LEAQ 0x10(SP), BP
main.go:13 0x152e 488d0500000000 LEAQ 0(IP), AX [3:7]R_PCREL:"".once
main.go:13 0x1535 488d1d00000000 LEAQ 0(IP), BX [3:7]R_PCREL:"".f.func1·f
main.go:13 0x153c e800000000 CALL 0x1541 [1:5]R_CALL:sync.(*Once).Do
main.go:16 0x1541 488b6c2410 MOVQ 0x10(SP), BP
main.go:16 0x1546 4883c418 ADDQ $0x18, SP
main.go:16 0x154a c3 RET
main.go:12 0x154b e800000000 CALL 0x1550 [1:5]R_CALL:runtime.morestack_noctxt
main.go:12 0x1550 ebc8 JMP "".f(SB)
其中第一行,
CMPQ 0x10(R14), SP就是比较
SP和
0x10(R14)(其实就是
stackguard0)的大小(注意
AT&T格式下
CMP系列指令的顺序),当
SP小于等于
0x10(R14)时,就会调转到
0x154b地址调用
runtime.morestack_noctxt,触发栈扩张操作。其实如果你仔细观察就会发现,所有的函数的序言(函数调用的最前方)都被插入了检测指令,除非在函数上标记
//go:nosplit。
接下来,我们将关注于两点来打通整个链路,即:
栈扩张怎么重新调度,让出CPU的执行权?
何时会设置栈扩张标记?
2.3 栈扩张怎么触发重新调度
// morestack but not preserving ctxt. TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0 MOVL $0, DX JMP runtime·morestack(SB) TEXT runtime·morestack(SB),NOSPLIT,$0-0 ... // Set g->sched to context in f. MOVQ 0(SP), AX // f's PC MOVQ AX, (g_sched+gobuf_pc)(SI) LEAQ 8(SP), AX // f's SP MOVQ AX, (g_sched+gobuf_sp)(SI) MOVQ BP, (g_sched+gobuf_bp)(SI) MOVQ DX, (g_sched+gobuf_ctxt)(SI) ... CALL runtime·newstack(SB) CALL runtime·abort(SB) // crash if newstack returns RET
以上代码中,
runtime·morestack_noctxt调用
runtime·morestack,在
runtime·morestack中,会首先记录协程的PC和SP,然后调用
runtime.newstack:
func newstack() { ... gp := thisg.m.curg ... stackguard0 := atomic.Loaduintptr(&gp.stackguard0) ... preempt := stackguard0 == stackPreempt ... if preempt { if gp == thisg.m.g0 { throw("runtime: preempt g0") } if thisg.m.p == 0 && thisg.m.locks == 0 { throw("runtime: g is running but p is not") } if gp.preemptShrink { // We're at a synchronous safe point now, so // do the pending stack shrink. gp.preemptShrink = false shrinkstack(gp) } if gp.preemptStop { preemptPark(gp) // never returns } // Act like goroutine called runtime.Gosched. gopreempt_m(gp) // never return } ... }
我们简化
runtime.newstack函数,总结起来就是通过现有工作协程的
stackguard0字段,来判断是不是应该发生抢占,如果需要的话,则调用
gopreempt_m(gp)函数:
func gopreempt_m(gp *g) { if trace.enabled { traceGoPreempt() } goschedImpl(gp) }
可以看到,
gopreempt_m函数和前面讲到
Gosched函数时说到的
gosched_m函数一样,都将调用
goschedImpl函数,为协程
gp让渡出本M,并且将
gp放到全局队列中,等待调度。
这里我们就明白了,一旦发生栈扩张,就有可能会发生让渡出执行权,进行重新调度的可能性,那什么时候会发生栈扩张呢?
2.4 何时设置栈扩张标记
在代码中,将
stackguard0字段置为
stackPreempt的地方有不少,但是和我们以上场景相符的还是在后台监护线程
sysmon循环中,对于陷入系统调用和长时间运行的
goroutine的运行权进行夺取的
retake函数:
func sysmon() { ... for { ... // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { idle = 0 } else { idle++ } ... } }
func retake(now int64) uint32 { ... for i := 0; i < len(allp); i++ { ... s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { // forcePreemptNS=10ms preemptone(_p_) // 在这里设置栈扩张标记 // In case of syscall, preemptone() doesn't // work, because there is no M wired to P. sysretake = true } } ... } unlock(&allpLock) return uint32(n) }
其中,在
preemptone函数中进行栈扩张标记的设置:
func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false } gp.preempt = true // Every call in a goroutine checks for stack overflow by // comparing the current stack pointer to gp->stackguard0. // Setting gp->stackguard0 to StackPreempt folds // preemption into the normal stack overflow check. gp.stackguard0 = stackPreempt // 设置栈扩张标记 // Request an async preemption of this P. if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) } return true }
通过以上,我们串通起了
goroutine协作式抢占的逻辑:
首先,后台监控线程会对运行时间过长(
≥10ms)的协程设置栈扩张标记;
协程运行到任何一个函数的序言的时候,都会首先检查栈扩张标记;
如果需要进行栈扩张,在进行栈扩张的时候,会夺取这个协程的运行权,从而实现抢占式调度。
3. 基于信号的抢占式调度
分析以上结论我们可以知道,上述抢占触发逻辑有一个致命的缺点,那就是必须要运行到函数栈的序言部分,而这根本无法读取以下协程的运行权,在Go的1.14版本之前,一下代码不会打印最后一句
"I am main goroutine!":
package main import ( "fmt" "runtime" "sync" "time" ) var once = sync.Once{} func main() { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) go func() { for { once.Do(func() { fmt.Println("I am go routine 1!") }) } }() time.Sleep(10 * time.Millisecond) fmt.Println("I am main goroutine!") }
因为以上协程中的
for循环是个死循环,且并不会包含栈扩张逻辑,所以不会让渡出自身的执行权。
3.1 发送抢占信号
为此,
Go SDK引入了基于信号的抢占式调度。我们注意分析上一节
preemptone函数代码中有以下部分:
if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) }
其中
preemptM函数会发送
_SIGURG信号给需要抢占的线程:
const sigPreempt = _SIGURG func preemptM(mp *m) { // On Darwin, don't try to preempt threads during exec. // Issue #41702. if GOOS == "darwin" || GOOS == "ios" { execLock.rlock() } if atomic.Cas(&mp.signalPending, 0, 1) { if GOOS == "darwin" || GOOS == "ios" { atomic.Xadd(&pendingPreemptSignals, 1) } // If multiple threads are preempting the same M, it may send many // signals to the same M such that it hardly make progress, causing // live-lock problem. Apparently this could happen on darwin. See // issue #37741. // Only send a signal if there isn't already one pending. signalM(mp, sigPreempt) } if GOOS == "darwin" || GOOS == "ios" { execLock.runlock() } }
3.2 抢占调用的注入
说到这里,我们就需要回到最开始,在第一个协程
m0开启
mstart的调用链路上,会调用
mstartm0函数,在这里会调用
initsig:
func initsig(preinit bool) { ... for i := uint32(0); i < _NSIG; i++ { ... handlingSig[i] = 1 setsig(i, abi.FuncPCABIInternal(sighandler)) } }
在以上,注册了
sighandler函数:
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) { ... if sig == sigPreempt && debug.asyncpreemptoff == 0 { // Might be a preemption signal. doSigPreempt(gp, c) // Even if this was definitely a preemption signal, it // may have been coalesced with another signal, so we // still let it through to the application. } ... }
然后接收到
sigPreempt信号时,会通过
doSigPreempt函数处理如下:
func doSigPreempt(gp *g, ctxt *sigctxt) { // Check if this G wants to be preempted and is safe to // preempt. if wantAsyncPreempt(gp) { if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok { // Adjust the PC and inject a call to asyncPreempt. ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc) // 插入抢占调用 } } // Acknowledge the preemption. atomic.Xadd(&gp.m.preemptGen, 1) atomic.Store(&gp.m.signalPending, 0) if GOOS == "darwin" || GOOS == "ios" { atomic.Xadd(&pendingPreemptSignals, -1) } }
最终,
doSigPreempt—>asyncPreempt->asyncPreempt2:
func asyncPreempt2() { gp := getg() gp.asyncSafePoint = true if gp.preemptStop { mcall(preemptPark) } else { mcall(gopreempt_m) } gp.asyncSafePoint = false }
然后,又回到了我们熟悉的
gopreempt_m函数,这里就不赘述了。
所以对于基于信号的抢占调度,总结如下:
M1发送信号
_SIGURG;
M2接收到信号,并通过信号处理函数进行处理;
M2修改执行的上下文,并恢复到修改后的位置;
重新进入调度循环,进而调度其他
goroutine。