«

Go调度器学习之goroutine调度怎么创建

时间:2024-4-30 09:01     作者:韩俊     分类: Go语言


今天小编给大家分享一下Go调度器学习之goroutine调度怎么创建的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

    1. 协程调度发生的时机

    在以下情形中,

    goroutine
    可能会发生调度:

    情形说明
    go func(){}使用go关键字创建一个新的goroutine,调度器会考虑调度
    GC由于GC也需要在系统线程M上执行,且其中需要所有的goroutine都停止运行,所以也会发生调度
    系统调用发生系统的调用时,会阻塞M,所以它会被调度走,同时新的goroutine也会被调度上来
    同步内存访问mutex、channel等操作会使得goroutine阻塞,因此会被调度走,等条件满足后,还会被调度上来继续运行

    2. 创建协程时的调度

    其中,使用

    go
    关键字创建协程时的调度分析,上篇博客做了初步的分析,特别是有关调度循环的分析,但是我们没有具体分析,当创建协程时,系统是怎么发生调度的。

    func newproc(fn *funcval) {
       gp := getg()
       pc := getcallerpc()
       systemstack(func() {
          newg := newproc1(fn, gp, pc)
    
          _p_ := getg().m.p.ptr()
          runqput(_p_, newg, true)
    
          if mainStarted {
             wakep()
          }
       })
    }

    我们还记得,

    go
    关键字在创建协程时,
    Go
    的编译器会将其转换为
    runtime.newproc
    函数,上篇我们详细分析了
    main goroutine
    的创建过程,在
    runtime.main
    函数中,全局变量
    mainStarted
    会被置为
    true
    ,之后普通协程的创建,则会调用
    runtime.wakep
    函数尝试唤醒空闲的P。

    func wakep() {
       if atomic.Load(&sched.npidle) == 0 {
          return
       }
       // be conservative about spinning threads
       if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
          return
       }
       startm(nil, true)
    }

    wakep
    函数首先确认是否有其他线程正在处于
    spinning
    状态,即M是否在找工作,如果没有的话,则调用
    startm
    函数创建一个新的、或者唤醒一个处于睡眠状态的工作线程出来工作。

    func startm(_p_ *p, spinning bool) {
       // Disable preemption.
       //
       // Every owned P must have an owner that will eventually stop it in the
       // event of a GC stop request. startm takes transient ownership of a P
       // (either from argument or pidleget below) and transfers ownership to
       // a started M, which will be responsible for performing the stop.
       //
       // Preemption must be disabled during this transient ownership,
       // otherwise the P this is running on may enter GC stop while still
       // holding the transient P, leaving that P in limbo and deadlocking the
       // STW.
       //
       // Callers passing a non-nil P must already be in non-preemptible
       // context, otherwise such preemption could occur on function entry to
       // startm. Callers passing a nil P may be preemptible, so we must
       // disable preemption before acquiring a P from pidleget below.
       mp := acquirem()  // 保证在此期间不会发生栈扩展
       lock(&sched.lock)
       if _p_ == nil {   // 没有指定p,那么需要从空闲队列中取一个p
          _p_ = pidleget()
          if _p_ == nil {// 如果没有空闲的p,直接返回
             unlock(&sched.lock)
             if spinning {
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                   throw("startm: negative nmspinning")
                }
             }
             releasem(mp)
             return
          }
       }
       nmp := mget()  // 如果有空闲的p,那么取出一个空闲的m
       if nmp == nil {// 如果没有空闲的m,那么调用newm创建一个,然后返回
          // No M is available, we must drop sched.lock and call newm.
          // However, we already own a P to assign to the M.
          //
          // Once sched.lock is released, another G (e.g., in a syscall),
          // could find no idle P while checkdead finds a runnable G but
          // no running M's because this new M hasn't started yet, thus
          // throwing in an apparent deadlock.
          //
          // Avoid this situation by pre-allocating the ID for the new M,
          // thus marking it as 'running' before we drop sched.lock. This
          // new M will eventually run the scheduler to execute any
          // queued G's.
          id := mReserveID()
          unlock(&sched.lock)
    
          var fn func()
          if spinning {
             // The caller incremented nmspinning, so set m.spinning in the new M.
             fn = mspinning
          }
          newm(fn, _p_, id)
          // Ownership transfer of _p_ committed by start in newm.
          // Preemption is now safe.
          releasem(mp)
          return
       }
       unlock(&sched.lock)
       if nmp.spinning {
          throw("startm: m is spinning")
       }
       if nmp.nextp != 0 {
          throw("startm: m has p")
       }
       if spinning && !runqempty(_p_) {
          throw("startm: p has runnable gs")
       }
       // The caller incremented nmspinning, so set m.spinning in the new M.
       nmp.spinning = spinning
       nmp.nextp.set(_p_)
       notewakeup(&nmp.park) // 如果有空闲的m,则唤醒这个m
       // Ownership transfer of _p_ committed by wakeup. Preemption is now
       // safe.
       releasem(mp)
    }

    startm
    函数首先判断是否有空闲的P,如果没有则直接返回;如果有,则判断是否有空闲的M,如果没有,则新建一个;如果有空闲的M,则唤醒这个M。说白了,
    wakep
    函数就是为了更大程度的利用P,利用CPU资源。

    说到这里,我们就需要重温一下上篇博客讲到的,调度中获取

    goroutine
    的规则是:

      每调度61次就需要从全局队列中获取

      goroutine

      其次优先从本P所在队列中获取

      goroutine

      如果还没有获取到,则从其他P的运行队列中窃取

      goroutine

    其中,从其他P队列中窃取

    goroutine
    ,调用的是
    findrunnable
    函数,这个函数很长,为了简化说明,我们删除一些不是很重要的代码:

    func findrunnable() (gp *g, inheritTime bool) {
       _g_ := getg()
    
    top:
       _p_ := _g_.m.p.ptr()
       ...
    
       // local runq
       // 再从本地队列找找
       if gp, inheritTime := runqget(_p_); gp != nil {
          return gp, inheritTime
       }
    
       // global runq
       // 再看看全局队列
       if sched.runqsize != 0 {
          lock(&sched.lock)
          gp := globrunqget(_p_, 0)
          unlock(&sched.lock)
          if gp != nil {
             return gp, false
          }
       }
    
       ...
    
       // Spinning Ms: steal work from other Ps.
       //
       // Limit the number of spinning Ms to half the number of busy Ps.
       // This is necessary to prevent excessive CPU consumption when
       // GOMAXPROCS>>1 but the program parallelism is low.
       procs := uint32(gomaxprocs)
       if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
          if !_g_.m.spinning {
             _g_.m.spinning = true
             atomic.Xadd(&sched.nmspinning, 1)
          }
    
          gp, inheritTime, tnow, w, newWork := stealWork(now) // 调用stealWork盗取goroutine
          now = tnow
          if gp != nil {
             // Successfully stole.
             return gp, inheritTime
          }
          if newWork {
             // There may be new timer or GC work; restart to
             // discover.
             goto top
          }
          if w != 0 && (pollUntil == 0 || w < pollUntil) {
             // Earlier timer to wait for.
             pollUntil = w
          }
       }
    
       ...
    
       // return P and block
       // 上面的窃取没有成功,那么解除m和p的绑定,摒弃娥江p放到空闲队列,然后去休眠
       lock(&sched.lock)
       if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
          unlock(&sched.lock)
          goto top
       }
       if sched.runqsize != 0 {
          gp := globrunqget(_p_, 0)
          unlock(&sched.lock)
          return gp, false
       }
       if releasep() != _p_ {
          throw("findrunnable: wrong p")
       }
       pidleput(_p_)
       unlock(&sched.lock)
    
       ...
          _g_.m.spinning = false // m即将睡眠,状态不再是spinning
          if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
             throw("findrunnable: negative nmspinning")
          }
    
       ...
       stopm() // 休眠
       goto top
    }

    从上面的代码可以看出,工作线程会反复尝试寻找运行的

    goroutine
    ,实在找不到的情况下才会进入到睡眠。需要注意的是,工作线程M从其他P的本地队列中盗取goroutine时的状态称之为自旋(spinning)状态,而前面讲到
    wakep
    调用
    startm
    函数,也是优先从自旋状态的M中选取,实在没有才去唤醒休眠的M,再没有就创建新的M。

    窃取算法

    stealWork
    我们就不分析了,有兴趣的同学可以看看。下面具体分析下
    stopm
    是怎么实现线程睡眠的。

    func stopm() {
       _g_ := getg()
    
       if _g_.m.locks != 0 {
          throw("stopm holding locks")
       }
       if _g_.m.p != 0 {
          throw("stopm holding p")
       }
       if _g_.m.spinning {
          throw("stopm spinning")
       }
    
       lock(&sched.lock)
       mput(_g_.m)         // 把m放到sched.midle空闲队列
       unlock(&sched.lock)
       mPark()
       acquirep(_g_.m.nextp.ptr()) // 绑定这个m和其下一个p,这里没有看懂为啥这么操作
       _g_.m.nextp = 0
    }
    
    func mPark() {
       gp := getg()
       notesleep(&gp.m.park) // 进入睡眠状态
       noteclear(&gp.m.park)
    }

    可以看出,

    stopm
    主要是将m对象放到调度器的空闲线程队列,然后通过
    notesleep
    进入睡眠状态。
    note
    go runtime
    实现的一次性睡眠和唤醒机制,通过
    notesleep
    进入睡眠状态,然后另一个线程可以通过
    notewakeup
    唤醒这个线程。

    小结

    上面巴拉巴拉讲了那么多,看的人有点头晕,我们接下来讲一个很小的例子梳理一下以上的逻辑(主线程的创建和执行在上一篇博客中详细叙述过,这里不再赘述),主线程创建了一个

    goroutine
    ,这时候会触发
    wakep
    ,接下来可能会唤醒空闲的工作线程(如果是第一个
    非main goroutine
    ,就没有空闲的工作线程),或者创建一个新的工作线程,或者什么都不做。

    如果是创建一个新的工作线程,那么其开启执行的点也是

    mstart
    函数(注意区分
    mstart
    startm
    ),然后在
    schedule
    函数中会尝试去获取
    goroutine
    ,如果全局和本地的
    goroutine
    队列都没有,则会去其他的P上窃取
    goroutine
    ,如果窃取不成功,则会休眠。

    如果是去唤醒工作协程,唤醒后会在休眠的地方开始,重新进行窃取。

    窃取到工作协程后,就会去执行,然后就会因为各种原因重新开始调度循环。

    3. 主动挂起

    Go
    中,有很多种情形会导致
    goroutine
    阻塞,即其主动挂起,然后被调度走,等满足其运行条件时,还会被调度上来继续运行。比如
    channel
    的读写,我们以通道的阻塞读为例,来介绍
    goroutine
    的主动挂起的调度方式。

    3.1 协程挂起

    和前面介绍的Map一样,

    channel
    的读也有以下两种读取方式:

    v := <- ch
    v, ok := <- ch

    分别对应以下

    chanrecv1
    chanrecv2
    函数:

    //go:nosplit
    func chanrecv1(c *hchan, elem unsafe.Pointer) {
       chanrecv(c, elem, true)
    }
    
    //go:nosplit
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
       _, received = chanrecv(c, elem, true)
       return
    }

    无论是哪个函数,最终调用的都是

    chanrecv
    函数:

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
       ...
       
       c.recvq.enqueue(mysg) // 将这个goroutine放到channel的recv的queue中
       
       atomic.Store8(&gp.parkingOnChan, 1)
       // 挂起这个goroutine
       gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
       ...
    }

    chanrecv
    会先判断channel是否有数据可读,如果有则直接读取并返回,如果没有则将这个
    goroutine
    放到
    channel
    recv
    queue
    中,然后调用
    gopark
    函数将当前
    goroutine
    挂起并阻塞。

    func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
       if reason != waitReasonSleep {
          checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
       }
       mp := acquirem()
       gp := mp.curg
       status := readgstatus(gp)
       if status != _Grunning && status != _Gscanrunning {
          throw("gopark: bad g status")
       }
       mp.waitlock = lock
       mp.waitunlockf = unlockf
       gp.waitreason = reason
       mp.waittraceev = traceEv
       mp.waittraceskip = traceskip
       releasem(mp)
       // can't do anything that might move the G between Ms here.
       mcall(park_m)
    }

    gopark
    函数则使用
    mcall
    函数(前面分析过,主要作用是保存当前
    goroutine
    现场,然后切换到
    g0
    栈去调用作为参数传入的函数)取执行
    park_m
    函数:

    // park continuation on g0.
    func park_m(gp *g) {
       _g_ := getg()
    
       if trace.enabled {
          traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
       }
    
       casgstatus(gp, _Grunning, _Gwaiting)
       dropg()
    
       if fn := _g_.m.waitunlockf; fn != nil {
          ok := fn(gp, _g_.m.waitlock)
          _g_.m.waitunlockf = nil
          _g_.m.waitlock = nil
          if !ok {
             if trace.enabled {
                traceGoUnpark(gp, 2)
             }
             casgstatus(gp, _Gwaiting, _Grunnable)
             execute(gp, true) // Schedule it back, never returns.
          }
       }
       schedule()
    }

    park_m
    首先把当前
    goroutine
    的状态设置为
    _Gwaiting
    (因为它正在等待其它
    goroutine
    channel
    里面写数据),然后调用
    dropg
    函数解除
    g
    m
    之间的关系,最后通过调用
    schedule
    函数进入调度循环。

    至此,一个

    goroutine
    就被主动挂起了。

    3.2 协程唤醒

    我们继续以上例子,当另一个

    goroutine
    对这个
    channel
    发送数据的时候

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
       ...
    
       if sg := c.recvq.dequeue(); sg != nil {
          // Found a waiting receiver. We pass the value we want to send
          // directly to the receiver, bypassing the channel buffer (if any).
          send(c, sg, ep, func() { unlock(&c.lock) }, 3)
          return true
       }
    
       ...
    }
    
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
       ...
       goready(gp, skip+1)
    }

    channel
    的发送流程和读取类似,当检查到接收队列中有等待着时,会调用
    send
    函数然后调用
    goready
    唤醒协程:

    func goready(gp *g, traceskip int) {
       systemstack(func() {
          ready(gp, traceskip, true)
       })
    }
    
    func ready(gp *g, traceskip int, next bool) {
       if trace.enabled {
          traceGoUnpark(gp, traceskip)
       }
    
       status := readgstatus(gp)
    
       // Mark runnable.
       _g_ := getg()
       mp := acquirem() // disable preemption because it can be holding p in a local var
       if status&^_Gscan != _Gwaiting {
          dumpgstatus(gp)
          throw("bad g->status in ready")
       }
    
       // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
       casgstatus(gp, _Gwaiting, _Grunnable)
       runqput(_g_.m.p.ptr(), gp, next)
       wakep()
       releasem(mp)
    }

    这里发现,

    ready
    函数和创建协程时一样,会触发
    wakep
    来检查是否需要唤醒空闲P来执行。而在此之前,这个被唤醒的
    goroutine
    会放到P的本地队列的下一个执行
    goroutine
    ,以提升时效性。

    到这里,一个被挂起的协程也就被唤醒了。

    标签: golang

    热门推荐