今天小编给大家分享一下Go调度器学习之goroutine调度怎么创建的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
1. 协程调度发生的时机
在以下情形中,
goroutine可能会发生调度:
情形 | 说明 |
---|---|
go func(){} | 使用go关键字创建一个新的goroutine,调度器会考虑调度 |
GC | 由于GC也需要在系统线程M上执行,且其中需要所有的goroutine都停止运行,所以也会发生调度 |
系统调用 | 发生系统的调用时,会阻塞M,所以它会被调度走,同时新的goroutine也会被调度上来 |
同步内存访问 | mutex、channel等操作会使得goroutine阻塞,因此会被调度走,等条件满足后,还会被调度上来继续运行 |
2. 创建协程时的调度
其中,使用
go关键字创建协程时的调度分析,上篇博客做了初步的分析,特别是有关调度循环的分析,但是我们没有具体分析,当创建协程时,系统是怎么发生调度的。
func newproc(fn *funcval) { gp := getg() pc := getcallerpc() systemstack(func() { newg := newproc1(fn, gp, pc) _p_ := getg().m.p.ptr() runqput(_p_, newg, true) if mainStarted { wakep() } }) }
我们还记得,
go关键字在创建协程时,
Go的编译器会将其转换为
runtime.newproc函数,上篇我们详细分析了
main goroutine的创建过程,在
runtime.main函数中,全局变量
mainStarted会被置为
true,之后普通协程的创建,则会调用
runtime.wakep函数尝试唤醒空闲的P。
func wakep() { if atomic.Load(&sched.npidle) == 0 { return } // be conservative about spinning threads if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) }
wakep函数首先确认是否有其他线程正在处于
spinning状态,即M是否在找工作,如果没有的话,则调用
startm函数创建一个新的、或者唤醒一个处于睡眠状态的工作线程出来工作。
func startm(_p_ *p, spinning bool) { // Disable preemption. // // Every owned P must have an owner that will eventually stop it in the // event of a GC stop request. startm takes transient ownership of a P // (either from argument or pidleget below) and transfers ownership to // a started M, which will be responsible for performing the stop. // // Preemption must be disabled during this transient ownership, // otherwise the P this is running on may enter GC stop while still // holding the transient P, leaving that P in limbo and deadlocking the // STW. // // Callers passing a non-nil P must already be in non-preemptible // context, otherwise such preemption could occur on function entry to // startm. Callers passing a nil P may be preemptible, so we must // disable preemption before acquiring a P from pidleget below. mp := acquirem() // 保证在此期间不会发生栈扩展 lock(&sched.lock) if _p_ == nil { // 没有指定p,那么需要从空闲队列中取一个p _p_ = pidleget() if _p_ == nil {// 如果没有空闲的p,直接返回 unlock(&sched.lock) if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } releasem(mp) return } } nmp := mget() // 如果有空闲的p,那么取出一个空闲的m if nmp == nil {// 如果没有空闲的m,那么调用newm创建一个,然后返回 // No M is available, we must drop sched.lock and call newm. // However, we already own a P to assign to the M. // // Once sched.lock is released, another G (e.g., in a syscall), // could find no idle P while checkdead finds a runnable G but // no running M's because this new M hasn't started yet, thus // throwing in an apparent deadlock. // // Avoid this situation by pre-allocating the ID for the new M, // thus marking it as 'running' before we drop sched.lock. This // new M will eventually run the scheduler to execute any // queued G's. id := mReserveID() unlock(&sched.lock) var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_, id) // Ownership transfer of _p_ committed by start in newm. // Preemption is now safe. releasem(mp) return } unlock(&sched.lock) if nmp.spinning { throw("startm: m is spinning") } if nmp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. nmp.spinning = spinning nmp.nextp.set(_p_) notewakeup(&nmp.park) // 如果有空闲的m,则唤醒这个m // Ownership transfer of _p_ committed by wakeup. Preemption is now // safe. releasem(mp) }
startm函数首先判断是否有空闲的P,如果没有则直接返回;如果有,则判断是否有空闲的M,如果没有,则新建一个;如果有空闲的M,则唤醒这个M。说白了,
wakep函数就是为了更大程度的利用P,利用CPU资源。
说到这里,我们就需要重温一下上篇博客讲到的,调度中获取
goroutine的规则是:
每调度61次就需要从全局队列中获取
goroutine;
其次优先从本P所在队列中获取
goroutine;
如果还没有获取到,则从其他P的运行队列中窃取
goroutine;
其中,从其他P队列中窃取
goroutine,调用的是
findrunnable函数,这个函数很长,为了简化说明,我们删除一些不是很重要的代码:
func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() ... // local runq // 再从本地队列找找 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq // 再看看全局队列 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } ... // Spinning Ms: steal work from other Ps. // // Limit the number of spinning Ms to half the number of busy Ps. // This is necessary to prevent excessive CPU consumption when // GOMAXPROCS>>1 but the program parallelism is low. procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } gp, inheritTime, tnow, w, newWork := stealWork(now) // 调用stealWork盗取goroutine now = tnow if gp != nil { // Successfully stole. return gp, inheritTime } if newWork { // There may be new timer or GC work; restart to // discover. goto top } if w != 0 && (pollUntil == 0 || w < pollUntil) { // Earlier timer to wait for. pollUntil = w } } ... // return P and block // 上面的窃取没有成功,那么解除m和p的绑定,摒弃娥江p放到空闲队列,然后去休眠 lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock) ... _g_.m.spinning = false // m即将睡眠,状态不再是spinning if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } ... stopm() // 休眠 goto top }
从上面的代码可以看出,工作线程会反复尝试寻找运行的
goroutine,实在找不到的情况下才会进入到睡眠。需要注意的是,工作线程M从其他P的本地队列中盗取goroutine时的状态称之为自旋(spinning)状态,而前面讲到
wakep调用
startm函数,也是优先从自旋状态的M中选取,实在没有才去唤醒休眠的M,再没有就创建新的M。
窃取算法
stealWork我们就不分析了,有兴趣的同学可以看看。下面具体分析下
stopm是怎么实现线程睡眠的。
func stopm() { _g_ := getg() if _g_.m.locks != 0 { throw("stopm holding locks") } if _g_.m.p != 0 { throw("stopm holding p") } if _g_.m.spinning { throw("stopm spinning") } lock(&sched.lock) mput(_g_.m) // 把m放到sched.midle空闲队列 unlock(&sched.lock) mPark() acquirep(_g_.m.nextp.ptr()) // 绑定这个m和其下一个p,这里没有看懂为啥这么操作 _g_.m.nextp = 0 } func mPark() { gp := getg() notesleep(&gp.m.park) // 进入睡眠状态 noteclear(&gp.m.park) }
可以看出,
stopm主要是将m对象放到调度器的空闲线程队列,然后通过
notesleep进入睡眠状态。
note是
go runtime实现的一次性睡眠和唤醒机制,通过
notesleep进入睡眠状态,然后另一个线程可以通过
notewakeup唤醒这个线程。
小结
上面巴拉巴拉讲了那么多,看的人有点头晕,我们接下来讲一个很小的例子梳理一下以上的逻辑(主线程的创建和执行在上一篇博客中详细叙述过,这里不再赘述),主线程创建了一个
goroutine,这时候会触发
wakep,接下来可能会唤醒空闲的工作线程(如果是第一个
非main goroutine,就没有空闲的工作线程),或者创建一个新的工作线程,或者什么都不做。
如果是创建一个新的工作线程,那么其开启执行的点也是
mstart函数(注意区分
mstart和
startm),然后在
schedule函数中会尝试去获取
goroutine,如果全局和本地的
goroutine队列都没有,则会去其他的P上窃取
goroutine,如果窃取不成功,则会休眠。
如果是去唤醒工作协程,唤醒后会在休眠的地方开始,重新进行窃取。
窃取到工作协程后,就会去执行,然后就会因为各种原因重新开始调度循环。
3. 主动挂起
在
Go中,有很多种情形会导致
goroutine阻塞,即其主动挂起,然后被调度走,等满足其运行条件时,还会被调度上来继续运行。比如
channel的读写,我们以通道的阻塞读为例,来介绍
goroutine的主动挂起的调度方式。
3.1 协程挂起
和前面介绍的Map一样,
channel的读也有以下两种读取方式:
v := <- ch v, ok := <- ch
分别对应以下
chanrecv1和
chanrecv2函数:
//go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } //go:nosplit func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
无论是哪个函数,最终调用的都是
chanrecv函数:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... c.recvq.enqueue(mysg) // 将这个goroutine放到channel的recv的queue中 atomic.Store8(&gp.parkingOnChan, 1) // 挂起这个goroutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) ... }
chanrecv会先判断channel是否有数据可读,如果有则直接读取并返回,如果没有则将这个
goroutine放到
channel的
recv的
queue中,然后调用
gopark函数将当前
goroutine挂起并阻塞。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { if reason != waitReasonSleep { checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy } mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) }
gopark函数则使用
mcall函数(前面分析过,主要作用是保存当前
goroutine现场,然后切换到
g0栈去调用作为参数传入的函数)取执行
park_m函数:
// park continuation on g0. func park_m(gp *g) { _g_ := getg() if trace.enabled { traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip) } casgstatus(gp, _Grunning, _Gwaiting) dropg() if fn := _g_.m.waitunlockf; fn != nil { ok := fn(gp, _g_.m.waitlock) _g_.m.waitunlockf = nil _g_.m.waitlock = nil if !ok { if trace.enabled { traceGoUnpark(gp, 2) } casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, true) // Schedule it back, never returns. } } schedule() }
park_m首先把当前
goroutine的状态设置为
_Gwaiting(因为它正在等待其它
goroutine往
channel里面写数据),然后调用
dropg函数解除
g和
m之间的关系,最后通过调用
schedule函数进入调度循环。
至此,一个
goroutine就被主动挂起了。
3.2 协程唤醒
我们继续以上例子,当另一个
goroutine对这个
channel发送数据的时候
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ... } func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... goready(gp, skip+1) }
channel的发送流程和读取类似,当检查到接收队列中有等待着时,会调用
send函数然后调用
goready唤醒协程:
func goready(gp *g, traceskip int) { systemstack(func() { ready(gp, traceskip, true) }) } func ready(gp *g, traceskip int, next bool) { if trace.enabled { traceGoUnpark(gp, traceskip) } status := readgstatus(gp) // Mark runnable. _g_ := getg() mp := acquirem() // disable preemption because it can be holding p in a local var if status&^_Gscan != _Gwaiting { dumpgstatus(gp) throw("bad g->status in ready") } // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) wakep() releasem(mp) }
这里发现,
ready函数和创建协程时一样,会触发
wakep来检查是否需要唤醒空闲P来执行。而在此之前,这个被唤醒的
goroutine会放到P的本地队列的下一个执行
goroutine,以提升时效性。
到这里,一个被挂起的协程也就被唤醒了。