«

Go协作与抢占怎么实现

时间:2024-7-14 15:41     作者:韩俊     分类: Go语言


这篇“Go协作与抢占怎么实现”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Go协作与抢占怎么实现”文章吧。

    1. 用户主动让出CPU:runtime.Gosched函数

    在介绍两种抢占调度之前,我们首先介绍一下

    runtime.Gosched
    函数:

    // Gosched yields the processor, allowing other goroutines to run. It does not
    // suspend the current goroutine, so execution resumes automatically.
    func Gosched() {
       checkTimeouts()
       mcall(gosched_m)
    }

    根据说明,

    runtime.Gosched
    函数会主动放弃当前处理器,并且允许其他协程执行,但是起并不会暂停自己,而只是让渡调度权,之后依赖调度器获得重新调度。

    之后,会通过

    mcall
    函数切换到
    g0
    栈去执行
    gosched_m
    函数:

    // Gosched continuation on g0.
    func gosched_m(gp *g) {
       if trace.enabled {
          traceGoSched()
       }
       goschedImpl(gp)
    }

    gosched_m
    调用
    goschedImpl
    函数,其会为协程
    gp
    让渡出本M,并且将
    gp
    放到全局队列中,等待调度。

    func goschedImpl(gp *g) {
       status := readgstatus(gp)
       if status&^_Gscan != _Grunning {
          dumpgstatus(gp)
          throw("bad g status")
       }
       casgstatus(gp, _Grunning, _Grunnable)
       dropg()            // 使当前m放弃gp,就是其参数 curg
       lock(&sched.lock)
       globrunqput(gp)    // 并且把gp放到全局队列中,等待调度
       unlock(&sched.lock)
    
       schedule()
    }

    虽然

    runtime.Gosched
    具有主动放弃CPU的能力,但是对用户的要求比较高,并非用户友好的。

    2. 基于协作的抢占式调度

    2.1 场景

    package main
    
    import (
       "fmt"
       "runtime"
       "sync"
       "time"
    )
    
    var once = sync.Once{}
    
    func f() {
       once.Do(func() {
          fmt.Println("I am go routine 1!")
       })
    }
    
    func main() {
       defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
    
       go func() {
          for {
             f()
          }
       }()
    
       time.Sleep(10 * time.Millisecond)
       fmt.Println("I am main goroutine!")
    }

    我们考虑如上代码,首先我们设置P的个数为1,然后起一个协程中进入死循环,循环调用一个函数,如果没有抢占调度,那么这个协程将一直占据P,也就是会一直占据CPU,代码就永远不可能执行到

    fmt.Println("I am main goroutine!")
    这行。下面我们看看,协作式抢占是怎么避免以上问题的。

    2.2 栈扩张与抢占标记

    $ go tool compile -N -l main.go
    $ go tool objdump main.o >> main.i

    我们通过以上指令,得到2.1中代码的汇编代码,截取

    f
    函数的汇编代码如下:

    TEXT "".f(SB) gofile../home/chenyiguo/smb_share/go_routine_test/main.go
      main.go:12      0x151a       493b6610      CMPQ 0x10(R14), SP 
      main.go:12      0x151e       762b         JBE 0x154b    
      main.go:12      0x1520       4883ec18      SUBQ $0x18, SP    
      main.go:12      0x1524       48896c2410    MOVQ BP, 0x10(SP)  
      main.go:12      0x1529       488d6c2410    LEAQ 0x10(SP), BP  
      main.go:13      0x152e       488d0500000000    LEAQ 0(IP), AX    [3:7]R_PCREL:"".once      
      main.go:13      0x1535       488d1d00000000    LEAQ 0(IP), BX    [3:7]R_PCREL:"".f.func1·f  
      main.go:13      0x153c       e800000000    CALL 0x1541       [1:5]R_CALL:sync.(*Once).Do    
      main.go:16      0x1541       488b6c2410    MOVQ 0x10(SP), BP  
      main.go:16      0x1546       4883c418      ADDQ $0x18, SP    
      main.go:16      0x154a       c3       RET          
      main.go:12      0x154b       e800000000    CALL 0x1550       [1:5]R_CALL:runtime.morestack_noctxt   
      main.go:12      0x1550       ebc8         JMP "".f(SB)  

    其中第一行,

    CMPQ 0x10(R14), SP
    就是比较
    SP
    0x10(R14)
    (其实就是
    stackguard0
    )的大小(注意
    AT&T
    格式下
    CMP
    系列指令的顺序),当
    SP
    小于等于
    0x10(R14)
    时,就会调转到
    0x154b
    地址调用
    runtime.morestack_noctxt
    ,触发栈扩张操作。其实如果你仔细观察就会发现,所有的函数的序言(函数调用的最前方)都被插入了检测指令,除非在函数上标记
    //go:nosplit

    接下来,我们将关注于两点来打通整个链路,即:

      栈扩张怎么重新调度,让出CPU的执行权?

      何时会设置栈扩张标记?

    2.3 栈扩张怎么触发重新调度

    // morestack but not preserving ctxt.
    TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
       MOVL   $0, DX
       JMP    runtime·morestack(SB)
    
    TEXT runtime·morestack(SB),NOSPLIT,$0-0
       ...
    
       // Set g->sched to context in f.
       MOVQ   0(SP), AX // f's PC
       MOVQ   AX, (g_sched+gobuf_pc)(SI)
       LEAQ   8(SP), AX // f's SP
       MOVQ   AX, (g_sched+gobuf_sp)(SI)
       MOVQ   BP, (g_sched+gobuf_bp)(SI)
       MOVQ   DX, (g_sched+gobuf_ctxt)(SI)
    
       ...
       CALL   runtime·newstack(SB)
       CALL   runtime·abort(SB)  // crash if newstack returns
       RET

    以上代码中,

    runtime·morestack_noctxt
    调用
    runtime·morestack
    ,在
    runtime·morestack
    中,会首先记录协程的PC和SP,然后调用
    runtime.newstack

    func newstack() {
       ...
    
       gp := thisg.m.curg
       
       ...
       stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
    
       ...
       preempt := stackguard0 == stackPreempt
       ...
    
       if preempt {
          if gp == thisg.m.g0 {
             throw("runtime: preempt g0")
          }
          if thisg.m.p == 0 && thisg.m.locks == 0 {
             throw("runtime: g is running but p is not")
          }
    
          if gp.preemptShrink {
             // We're at a synchronous safe point now, so
             // do the pending stack shrink.
             gp.preemptShrink = false
             shrinkstack(gp)
          }
    
          if gp.preemptStop {
             preemptPark(gp) // never returns
          }
    
          // Act like goroutine called runtime.Gosched.
          gopreempt_m(gp) // never return
       }
    
       ...
    }

    我们简化

    runtime.newstack
    函数,总结起来就是通过现有工作协程的
    stackguard0
    字段,来判断是不是应该发生抢占,如果需要的话,则调用
    gopreempt_m(gp)
    函数:

    func gopreempt_m(gp *g) {
       if trace.enabled {
          traceGoPreempt()
       }
       goschedImpl(gp)
    }

    可以看到,

    gopreempt_m
    函数和前面讲到
    Gosched
    函数时说到的
    gosched_m
    函数一样,都将调用
    goschedImpl
    函数,为协程
    gp
    让渡出本M,并且将
    gp
    放到全局队列中,等待调度。

    这里我们就明白了,一旦发生栈扩张,就有可能会发生让渡出执行权,进行重新调度的可能性,那什么时候会发生栈扩张呢?

    2.4 何时设置栈扩张标记

    在代码中,将

    stackguard0
    字段置为
    stackPreempt
    的地方有不少,但是和我们以上场景相符的还是在后台监护线程
    sysmon
    循环中,对于陷入系统调用和长时间运行的
    goroutine
    的运行权进行夺取的
    retake
    函数:

    func sysmon() {
       ...
    
       for {
          ...
          // retake P's blocked in syscalls
          // and preempt long running G's
          if retake(now) != 0 {
             idle = 0
          } else {
             idle++
          }
          ...
       }
    }
    func retake(now int64) uint32 {
       ...
       for i := 0; i < len(allp); i++ {
          ...
          s := _p_.status
          sysretake := false
          if s == _Prunning || s == _Psyscall {
             // Preempt G if it's running for too long.
             t := int64(_p_.schedtick)
             if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
             } else if pd.schedwhen+forcePreemptNS <= now { // forcePreemptNS=10ms
                preemptone(_p_) // 在这里设置栈扩张标记
                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true
             }
          }
          ...
       }
       unlock(&allpLock)
       return uint32(n)
    }

    其中,在

    preemptone
    函数中进行栈扩张标记的设置:

    func preemptone(_p_ *p) bool {
       mp := _p_.m.ptr()
       if mp == nil || mp == getg().m {
          return false
       }
       gp := mp.curg
       if gp == nil || gp == mp.g0 {
          return false
       }
    
       gp.preempt = true
    
       // Every call in a goroutine checks for stack overflow by
       // comparing the current stack pointer to gp->stackguard0.
       // Setting gp->stackguard0 to StackPreempt folds
       // preemption into the normal stack overflow check.
       gp.stackguard0 = stackPreempt // 设置栈扩张标记
    
       // Request an async preemption of this P.
       if preemptMSupported && debug.asyncpreemptoff == 0 {
          _p_.preempt = true
          preemptM(mp)
       }
    
       return true
    }

    通过以上,我们串通起了

    goroutine
    协作式抢占的逻辑:

      首先,后台监控线程会对运行时间过长(

      &ge;10ms
      )的协程设置栈扩张标记;

      协程运行到任何一个函数的序言的时候,都会首先检查栈扩张标记;

      如果需要进行栈扩张,在进行栈扩张的时候,会夺取这个协程的运行权,从而实现抢占式调度。

    3. 基于信号的抢占式调度

    分析以上结论我们可以知道,上述抢占触发逻辑有一个致命的缺点,那就是必须要运行到函数栈的序言部分,而这根本无法读取以下协程的运行权,在Go的1.14版本之前,一下代码不会打印最后一句

    "I am main goroutine!"

    package main
    
    import (
       "fmt"
       "runtime"
       "sync"
       "time"
    )
    
    var once = sync.Once{}
    
    func main() {
       defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
    
       go func() {
          for {
             once.Do(func() {
                fmt.Println("I am go routine 1!")
             })
          }
       }()
    
       time.Sleep(10 * time.Millisecond)
       fmt.Println("I am main goroutine!")
    }

    因为以上协程中的

    for
    循环是个死循环,且并不会包含栈扩张逻辑,所以不会让渡出自身的执行权。

    3.1 发送抢占信号

    为此,

    Go SDK
    引入了基于信号的抢占式调度。我们注意分析上一节
    preemptone
    函数代码中有以下部分:

    if preemptMSupported && debug.asyncpreemptoff == 0 {
       _p_.preempt = true
       preemptM(mp)
    }

    其中

    preemptM
    函数会发送
    _SIGURG
    信号给需要抢占的线程:

    const sigPreempt = _SIGURG
    
    func preemptM(mp *m) {
       // On Darwin, don't try to preempt threads during exec.
       // Issue #41702.
       if GOOS == "darwin" || GOOS == "ios" {
          execLock.rlock()
       }
    
       if atomic.Cas(&mp.signalPending, 0, 1) {
          if GOOS == "darwin" || GOOS == "ios" {
             atomic.Xadd(&pendingPreemptSignals, 1)
          }
    
          // If multiple threads are preempting the same M, it may send many
          // signals to the same M such that it hardly make progress, causing
          // live-lock problem. Apparently this could happen on darwin. See
          // issue #37741.
          // Only send a signal if there isn't already one pending.
          signalM(mp, sigPreempt)
       }
    
       if GOOS == "darwin" || GOOS == "ios" {
          execLock.runlock()
       }
    }

    3.2 抢占调用的注入

    说到这里,我们就需要回到最开始,在第一个协程

    m0
    开启
    mstart
    的调用链路上,会调用
    mstartm0
    函数,在这里会调用
    initsig

    func initsig(preinit bool) {
      ...
    
       for i := uint32(0); i < _NSIG; i++ {
          ...
    
          handlingSig[i] = 1
          setsig(i, abi.FuncPCABIInternal(sighandler))
       }
    }

    在以上,注册了

    sighandler
    函数:

    func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
       ...
    
       if sig == sigPreempt && debug.asyncpreemptoff == 0 {
          // Might be a preemption signal.
          doSigPreempt(gp, c)
          // Even if this was definitely a preemption signal, it
          // may have been coalesced with another signal, so we
          // still let it through to the application.
       }
    
       ...
    }

    然后接收到

    sigPreempt
    信号时,会通过
    doSigPreempt
    函数处理如下:

    func doSigPreempt(gp *g, ctxt *sigctxt) {
       // Check if this G wants to be preempted and is safe to
       // preempt.
       if wantAsyncPreempt(gp) {
          if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
             // Adjust the PC and inject a call to asyncPreempt.
             ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc) // 插入抢占调用
          }
       }
    
       // Acknowledge the preemption.
       atomic.Xadd(&gp.m.preemptGen, 1)
       atomic.Store(&gp.m.signalPending, 0)
    
       if GOOS == "darwin" || GOOS == "ios" {
          atomic.Xadd(&pendingPreemptSignals, -1)
       }
    }

    最终,

    doSigPreempt&mdash;>asyncPreempt->asyncPreempt2

    func asyncPreempt2() {
       gp := getg()
       gp.asyncSafePoint = true
       if gp.preemptStop {
          mcall(preemptPark)
       } else {
          mcall(gopreempt_m)
       }
       gp.asyncSafePoint = false
    }

    然后,又回到了我们熟悉的

    gopreempt_m
    函数,这里就不赘述了。

    所以对于基于信号的抢占调度,总结如下:

      M1发送信号

      _SIGURG

      M2接收到信号,并通过信号处理函数进行处理;

      M2修改执行的上下文,并恢复到修改后的位置;

      重新进入调度循环,进而调度其他

      goroutine

    标签: golang

    热门推荐