golang源码分析-调度过程概述

本文主要概述一下golang的调度器的大概工作的流程,众所周知golang是基于用户态的协程的调度来完成多任务的执行。在Linux操作系统中,以往的多线程执行都是通过操作系统陷入内核来创建线程并提供给操作系统进行调度,在操作系统中的线程调度可以充分利用操作系统提供的各种资源,当线程执行到阻塞或者等待操作时,操作系统会休眠对应线程直到阻塞的事情来唤醒该线程继续执行,但是在通过操作系统创建的线程无论是阻塞还是调度都需要陷入内核,从而导致线程在这些过程中的开销较大。golang中的协程更多的是在用户态进行调度不需要陷入内核,但是同时这也限制了golang的调度策略并不能使用操作系统提供的阻塞唤醒或者抢占式调度的机制,本文主要就是探讨一下golang在用户态是如何进行调度执行。

golang的运行模型

golang主要根据CSP模型,通过通信进行数据交互,并且由于是实现的用户态的协程调度,但是本质上还是对应与操作系统的线程去详细执行对应的具体内容,故在golang中就设置了三种不同的模型分别为M,P和G。

Machine(M)操作系统线程

Machine即对应于真正的操作系统创建的线程,这个线程的创建调度与运行都是受操作系统所控制,如果golang执行的是一个阻塞操作,那么该线程还是会阻塞,知道阻塞完成之后被操作系统唤醒并继续执行。

Processor§

Processor就是虚拟的提供给g执行的上下文环境,该环境包括一个本地的g的队列,本地内存的对象等操作资源,只有M在绑定了P之后才能执行对应的G。

Groutine(G)

Groutine就是golang中对应的用户态的协程的具体内容,默认的用户态栈的大小是2KB,包括这执行任务的上下文的环境,在切换过程中保存执行的环境,调度器就是调度G到可执行的P中从而完成高效的并发调度操作。

三者整体的运行状态如图所示;

golang可能的一个运行状态图如上所示,从运行过程也可看出,G的调度过程都是在用户态进行的,接下来就分析一下调度的场景

golang的调度场景

在golang的初始化过程中,首先第一个M0就是初始化完成的M0,该M0就会在初始化完成之后调度执行对应的G,在golang的启动过程中可知,golang中的main函数其实也是对应的一个G来调度执行,如果在golang程序中启动协程来执行,并根据协程的执行情况或者现有的内核线程的工作情况来决定是否重新开启一个内核线程。

内核线程的启动过程

在拥有大量的G未执行的时候,或者是有的内核线程在执行系统调用阻塞的情况下,或者有些G长时间运行的情况,会根据情况来开启一个新的内核线程来执行可执行的G,从而确保G能够快速被执行。

在golang的启动过程中,会启动一个sysmon内核线程,该线程不知道具体的G内容,而是用来监控一些非阻塞的事件是否完成,监控各个正在被执行的G的运行时间,并从事抢占性调度的标志位的设置。

func newm(fn func(), _p_ *p) {    // 生成内核工作线程mp := allocm(_p_, fn)           // 申请对应的内存设置新的栈信息mp.nextp.set(_p_) mp.sigmask = initSigmaskif gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {// We're on a locked M or a thread that may have been// started by C. The kernel state of this thread may// be strange (the user may have locked it for that// purpose). We don't want to clone that into another// thread. Instead, ask a known-good thread to create// the thread for us.//// This is disabled on Plan 9. See golang.org/issue/22227.//// TODO: This may be unnecessary on Windows, which// doesn't model thread creation off fork.lock(&newmHandoff.lock)if newmHandoff.haveTemplateThread == 0 {throw("on a locked thread with no template thread")}mp.schedlink = newmHandoff.newmnewmHandoff.newm.set(mp)if newmHandoff.waiting {newmHandoff.waiting = falsenotewakeup(&newmHandoff.wake)}unlock(&newmHandoff.lock)return}newm1(mp)         // 生成该工作线程
}func newm1(mp *m) {if iscgo {var ts cgothreadstartif _cgo_thread_start == nil {throw("_cgo_thread_start missing")}ts.g.set(mp.g0)ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))ts.fn = unsafe.Pointer(funcPC(mstart))if msanenabled {msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))}execLock.rlock() // Prevent process clone.asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))execLock.runlock()return}execLock.rlock() // Prevent process clone.newosproc(mp)         // 系统调用线程  Linux主要是clone系统调用execLock.runlock()
}
func newosproc(mp *m) {stk := unsafe.Pointer(mp.g0.stack.hi)         // 设置栈/** note: strace gets confused if we use CLONE_PTRACE here.*/if false {print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", funcPC(clone), " id=", mp.id, " ostk=", &mp, "\n")}// Disable signals during clone, so that the new thread starts// with signals disabled. It will enable them in minit.var oset sigsetsigprocmask(_SIG_SETMASK, &sigset_all, &oset)ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))                // 系统调用生成线程并设置g0堆栈开始执行mstart函数,从而重新开启一个线程执行sigprocmask(_SIG_SETMASK, &oset, nil)if ret < 0 {print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")if ret == -_EAGAIN {println("runtime: may need to increase max user processes (ulimit -u)")}throw("newosproc")}
}

从流程可知,生成一个工作线程主要通过系统调用生成一个,生成完成之后再重新从mstart函数开始执行任务,重新开始去调度执行G。新增工作内核线程可能会在系统调用的过程中触发检查也可能在监控线程中通过retake函数触发。

schedule调度过程
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {_g_ := getg()                            if _g_.m.locks != 0 {throw("schedule: holding locks")}if _g_.m.lockedg != 0 {stoplockedm()execute(_g_.m.lockedg.ptr(), false) // Never returns.}// We should not schedule away from a g that is executing a cgo call,// since the cgo call is using the m's g0 stack.if _g_.m.incgo {throw("schedule: in cgo")}top:if sched.gcwaiting != 0 {gcstopm()goto top}if _g_.m.p.ptr().runSafePointFn != 0 {runSafePointFn()}var gp *gvar inheritTime boolif trace.enabled || trace.shutdown {gp = traceReader()if gp != nil {casgstatus(gp, _Gwaiting, _Grunnable)traceGoUnpark(gp, 0)}}if gp == nil && gcBlackenEnabled != 0 {gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())   // 进行GC模式}if gp == nil {                                  // Check the global runnable queue once in a while to ensure fairness.// Otherwise two goroutines can completely occupy the local runqueue// by constantly respawning each other.if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {    // 为了公平每隔61个检查一下全局列表中是否有可执行的G如果有则执行lock(&sched.lock)gp = globrunqget(_g_.m.p.ptr(), 1)             // 从全局列表中获取一个Gunlock(&sched.lock)}}if gp == nil {                                                                   // 如果全局没有获取到或者没从全局获取gp, inheritTime = runqget(_g_.m.p.ptr())        // 从本地的p的队列中获取Gif gp != nil && _g_.m.spinning {throw("schedule: spinning with local work")    // 检查是否是自选}}if gp == nil {gp, inheritTime = findrunnable() // blocks until work is available   从其他地方获取G如果获取不到则阻塞在这里直到找到}// This thread is going to run a goroutine and is not spinning anymore,// so if it was marked as spinning we need to reset it now and potentially// start a new spinning M.if _g_.m.spinning {resetspinning()}if sched.disable.user && !schedEnabled(gp) {// Scheduling of this goroutine is disabled. Put it on// the list of pending runnable goroutines for when we// re-enable user scheduling and look again.lock(&sched.lock)if schedEnabled(gp) {// Something re-enabled scheduling while we// were acquiring the lock.unlock(&sched.lock)} else {sched.disable.runnable.pushBack(gp)sched.disable.n++unlock(&sched.lock)goto top}}if gp.lockedm != 0 {// Hands off own p to the locked m,// then blocks waiting for a new p.startlockedm(gp)goto top}execute(gp, inheritTime)      // 找到之后就执行该G
}

调度函数主要执行的流程就是;

  1. 如果隔了61次调度,则本次去全局G列表中去查找一个可执行的G;
  2. 如果不是61次或者61次去查找全局G列表的时候未能找到,则获取本地P中的G列表中的G;
  3. 如果本地都还没有找到则通过findrunnable函数去查找,该函数会分别从全局、poll列表中或者其他的P中去尝试获取可运行的G,如果还没有找到则进入休眠。
G的执行过程

如果在上一步找到了可执行的G,则此时就会执行execute(gp, inheritTime)函数,执行该任务。

G的任务正常执行流程
func execute(gp *g, inheritTime bool) {_g_ := getg()casgstatus(gp, _Grunnable, _Grunning)         // 设置该G位运行可调用可运行状态gp.waitsince = 0gp.preempt = false                            // 是否抢占式调度标志位gp.stackguard0 = gp.stack.lo + _StackGuard     // 设置堆栈if !inheritTime {_g_.m.p.ptr().schedtick++}_g_.m.curg = gpgp.m = _g_.m// Check whether the profiler needs to be turned on or off.hz := sched.profilehzif _g_.m.profilehz != hz {setThreadCPUProfiler(hz)}if trace.enabled {// GoSysExit has to happen when we have a P, but before GoStart.// So we emit it here.if gp.syscallsp != 0 && gp.sysblocktraced {traceGoSysExit(gp.sysexitticks)}traceGoStart()}gogo(&gp.sched)             // 执行G对应的内容
}

主要就是进行了检查和设置标志位之后,再就调用gogo执行;

TEXT runtime·gogo(SB), NOSPLIT, $16-8MOVQ    buf+0(FP), BX      // gobufMOVQ    gobuf_g(BX), DXMOVQ 0(DX), CX       // make sure g != nilget_tls(CX)MOVQ   DX, g(CX)MOVQ   gobuf_sp(BX), SP    // restore SP    将gobuf中保存的现场内容回复MOVQ   gobuf_ret(BX), AXMOVQ   gobuf_ctxt(BX), DXMOVQ  gobuf_bp(BX), BPMOVQ    $0, gobuf_sp(BX)    // clear to help garbage collectorMOVQ  $0, gobuf_ret(BX)MOVQ   $0, gobuf_ctxt(BX)MOVQ  $0, gobuf_bp(BX)MOVQ    gobuf_pc(BX), BX       // 将要执行的地址放入BXJMP    BX                       // 跳转执行该处代码

此时我们回到newproc1函数中创建G的过程中的时候,在G执行完成之后的执行地址设置成了goexit函数处。

 newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function

此时查看goexit函数的执行过程;

// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT,$0-0BYTE    $0x90   // NOPCALL  runtime·goexit1(SB) // does not return    调用goexit1// traceback from goexit1 must hit code range of goexitBYTE  $0x90   // NOP
func goexit1() {if raceenabled {racegoend()}if trace.enabled {traceGoEnd()}mcall(goexit0)  // 切换到g0释放该执行完成的g
}TEXT runtime·mcall(SB), NOSPLIT, $0-8MOVQ  fn+0(FP), DIget_tls(CX)MOVQ    g(CX), AX   // save state in g->schedMOVQ    0(SP), BX   // caller's PCMOVQ BX, (g_sched+gobuf_pc)(AX)LEAQ fn+0(FP), BX   // caller's SPMOVQ BX, (g_sched+gobuf_sp)(AX)MOVQ AX, (g_sched+gobuf_g)(AX)MOVQ  BP, (g_sched+gobuf_bp)(AX)// switch to m->g0 & its stack, call fn    切换栈MOVQ    g(CX), BXMOVQ   g_m(BX), BXMOVQ m_g0(BX), SICMPQ    SI, AX  // if g == m->g0 call badmcallJNE  3(PC)MOVQ   $runtime·badmcall(SB), AXJMP    AXMOVQ  SI, g(CX)   // g = m->g0MOVQ    (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp   调用g0的sched.sp的内容PUSHQ  AXMOVQ  DI, DXMOVQ  0(DI), DI CALL  DI                                    // 执行该函数POPQ  AXMOVQ  $runtime·badmcall2(SB), AXJMP   AXRET// goexit continuation on g0.
func goexit0(gp *g) {_g_ := getg()casgstatus(gp, _Grunning, _Gdead)        // 设置状态为执行完成if isSystemGoroutine(gp, false) {atomic.Xadd(&sched.ngsys, -1)}gp.m = nil                              // 设置m为空locked := gp.lockedm != 0               // 值重新置空gp.lockedm = 0_g_.m.lockedg = 0gp.paniconfault = falsegp._defer = nil // should be true already but just in case.gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.gp.writebuf = nilgp.waitreason = 0gp.param = nilgp.labels = nilgp.timer = nilif gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {// Flush assist credit to the global pool. This gives// better information to pacing if the application is// rapidly creating an exiting goroutines.scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)gp.gcAssistBytes = 0}// Note that gp's stack scan is now "valid" because it has no// stack.gp.gcscanvalid = truedropg()                       // 将该G与M的关系if GOARCH == "wasm" { // no threads yet on wasmgfput(_g_.m.p.ptr(), gp)schedule() // never returns}if _g_.m.lockedInt != 0 {print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")throw("internal lockOSThread error")}gfput(_g_.m.p.ptr(), gp)      // 放入到空余列表中if locked {// The goroutine may have locked this thread because// it put it in an unusual kernel state. Kill it// rather than returning it to the thread pool.// Return to mstart, which will release the P and exit// the thread.if GOOS != "plan9" { // See golang.org/issue/22227.gogo(&_g_.m.g0.sched)} else {// Clear lockedExt on plan9 since we may end up re-using// this thread._g_.m.lockedExt = 0}}schedule()                                   // 重新调度
}

至此一个正常的G的一个执行过程就完成了。函数的调用链路如下;

执行完成
schedule函数
execute函数
gogo函数
G协程的内容
goexit函数
mcall函数
goexit0函数

总结

本文只是简单的概述了一下golang中的一些基本场景,然后分析了一下G的调度执行过程,其中有大量的细节还未涉及,只是简单的把正常的G的创建过程和执行流程梳理了一下,具体的调度策略和实现还需要进一步学习与了解。由于本人才疏学浅,如有错误请批评指正。

golang源码分析-调度概述相关推荐

  1. golang源码分析-启动过程概述

    golang源码分析-启动过程概述 golang语言作为根据CSP模型实现的一种强类型的语言,本文主要就是通过简单的实例来分析一下golang语言的启动流程,为深入了解与学习做铺垫. golang代码 ...

  2. 【我的架构师之路】- golang源码分析之协程调度器底层实现( G、M、P)

    本人的源码是基于go 1.9.7 版本的哦! 紧接着之前写的 [我的区块链之路]- golang源码分析之select的底层实现 和 [我的区块链之路]- golang源码分析之channel的底层实 ...

  3. v09.04 鸿蒙内核源码分析(调度故事) | 用故事说内核调度 | 百篇博客分析HarmonyOS源码

    子曰:"吾与回言终日,不违如愚.退而省其私,亦足以发.回也,不愚."<论语>:为政篇 百篇博客系列篇.本篇为: v09.xx 鸿蒙内核源码分析(调度故事篇) | 用故事 ...

  4. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  5. Python3.5源码分析-Dict概述

    Python3源码分析 本文环境python3.5.2. 参考书籍<<Python源码剖析>> python官网 Python3的Dict对象 在生成d = {}和d['1'] ...

  6. Python3.5源码分析-List概述

    Python3源码分析 本文环境python3.5.2. 参考书籍<<Python源码剖析>> python官网 Python3的List对象 list对象是一个变长对象,在运 ...

  7. 【golang源码分析】chan底层原理——附带读写用户队列的环形缓冲区

    1 环形缓冲区 1.1 环形缓冲区结构 环形缓冲区通常有一个读指针和一个写指针.读指针指向环形缓冲区中可读的数据,写指针指向环形缓冲区中可写的缓冲区.通过移动读指针和写指针就可以实现缓冲区的数据读取和 ...

  8. spring-transaction源码分析(1)概述和事务传播级别

    spring-tx概述 spring-tx包使用注解驱动和AOP通知将事务开启.提交/回滚.以及复杂的传播机制封装了起来,开发者不再需要编写事务管理的代码,而是可以只关注自己的业务逻辑. 本文将简单介 ...

  9. golang源码分析:调度器chan调度

    golang调度机制chan调度 golang的调度策略中,碰见阻塞chan就会将该chan放入到阻塞的g中,然后再等待该chan被唤醒,这是golang调度器策略的主动调度策略之一,其中还有其他的主 ...

最新文章

  1. 如何在 1 秒内将 50 个 OpenCV 帧上传到云存储
  2. php删除指定符号,利用PHP删除特殊符号
  3. python分支语句_Python中分支语句与循环语句实例详解
  4. oracle 12创建一个表,oracle 12 c 创建表空间,用户名,及表
  5. Backward_chaining
  6. 基于 CODING 的 Spring Boot 持续集成项目 1
  7. CC2530之定时器T3
  8. Could not load NIB in bundle: 'NSBundle /Users/wyd/Library/Application Support/iPhone Simulator/5.0
  9. google glog 简单使用小结
  10. 马化腾绝地逢生:山重水复疑无路,柳暗花明又一村
  11. SAP ABAP MOVE-CORRESPONDING ... TO ...的使用
  12. PIPIOJ1103: PIPI的数学题I/同余定理
  13. 士兵 POJ1723
  14. python爬取去哪网数据_用户观点:企查查数据爬取技术与Python 爬取企查查数据...
  15. 金刚菩提子开裂自动修复此计算机,金刚菩提子开裂怎么办 金刚菩提子为什么会开裂...
  16. 【UE4】给制作的小地图加上方向指针
  17. 2022山东视力防控大会,中国护眼产品展,济南近视矫正设备展
  18. 资源 | 完备的 AI 学习路线资源整理!
  19. 海康威视SDK实例QtDemo显示NVR视频窗口(Linux+Qt)
  20. java雍俊海_JAVA程序设计 雍俊海(学习笔记2)

热门文章

  1. 程序员大厂不一定要进,算法必须要学!收藏89篇精选算法文章
  2. 掌握深度学习,为什么要用PyTorch、TensorFlow框架?
  3. XLNet:公平PK,BERT你已经被超过!
  4. Decoders对于语义分割的重要性 | CVPR 2019
  5. 科大讯飞AI开发者大赛报名开启,百万奖金等你来!
  6. 一个普通AI程序员的内心独白...
  7. 都2021年了,你还在用Jenkins?赶快看看这些替代方案吧!
  8. 使用Intellij IDEA 解决Java8的数据流问题
  9. 设计一个成功的微服务,堪称必备的9个基础知识
  10. 讲真,下次打死我也不敢随便改serialVersionUID了