golang调度机制chan调度

golang的调度策略中,碰见阻塞chan就会将该chan放入到阻塞的g中,然后再等待该chan被唤醒,这是golang调度器策略的主动调度策略之一,其中还有其他的主动调度策略包括进入调用系统调用或者主动调用Gosched时,都会发生主动调度,对应的当然也有被动调度,被动调度主要是后台线程在检查到某一个g执行的时间过长,再该g进行栈扩展(即调用函数时)就会进行被动调度,这些内容后续再讨论。本文先了解一下golang在阻塞chan下的调度机制。

chan阻塞调度示例代码

package mainimport "fmt"func main(){chan_w := make(chan int)go func(){chan_w <- 1fmt.Println("over")}()<- chan_wfmt.Println("main over")
}

示例代码相对简单,即创建一个chan_w,调用一个协程去传入值,然后主协程就等待chan_w的结果返回,此时我们查看一下对应的汇编输出。

# go build main.go
# go tool objdump -s "main\.main" main
TEXT main.main(SB) /root/open_falcon/main.gomain.go:5       0x4871b0        64488b0c25f8ffffff  MOVQ FS:0xfffffff8, CX          main.go:5       0x4871b9        483b6110        CMPQ 0x10(CX), SP           main.go:5       0x4871bd        0f86c3000000        JBE 0x487286                main.go:5       0x4871c3        4883ec60        SUBQ $0x60, SP              main.go:5       0x4871c7        48896c2458      MOVQ BP, 0x58(SP)           main.go:5       0x4871cc        488d6c2458      LEAQ 0x58(SP), BP           main.go:6       0x4871d1        488d05c8050100      LEAQ 0x105c8(IP), AX            main.go:6       0x4871d8        48890424        MOVQ AX, 0(SP)              main.go:6       0x4871dc        48c744240800000000  MOVQ $0x0, 0x8(SP)          main.go:6       0x4871e5        e886d2f7ff      CALL runtime.makechan(SB)       main.go:6       0x4871ea        488b442410      MOVQ 0x10(SP), AX           main.go:6       0x4871ef        4889442440      MOVQ AX, 0x40(SP)           main.go:7       0x4871f4        c7042408000000      MOVL $0x8, 0(SP)            main.go:7       0x4871fb        488d0d268c0300      LEAQ 0x38c26(IP), CX            main.go:7       0x487202        48894c2408      MOVQ CX, 0x8(SP)            main.go:7       0x487207        e8e49efaff      CALL runtime.newproc(SB)        main.go:11      0x48720c        488b442440      MOVQ 0x40(SP), AX           main.go:11      0x487211        48890424        MOVQ AX, 0(SP)              main.go:11      0x487215        48c744240800000000  MOVQ $0x0, 0x8(SP)          main.go:11      0x48721e        e8bddff7ff      CALL runtime.chanrecv1(SB)      main.go:12      0x487223        0f57c0          XORPS X0, X0                main.go:12      0x487226        0f11442448      MOVUPS X0, 0x48(SP)         main.go:12      0x48722b        488d05ae110100      LEAQ 0x111ae(IP), AX            main.go:12      0x487232        4889442448      MOVQ AX, 0x48(SP)           main.go:12      0x487237        488d05b2850400      LEAQ main.statictmp_0(SB), AX       main.go:12      0x48723e        4889442450      MOVQ AX, 0x50(SP)           main.go:12      0x487243        90          NOPL                    print.go:275        0x487244        488b05a5050d00      MOVQ os.Stdout(SB), AX          print.go:275        0x48724b        488d0dce9a0400      LEAQ go.itab.*os.File,io.Writer(SB), CX print.go:275        0x487252        48890c24        MOVQ CX, 0(SP)              print.go:275        0x487256        4889442408      MOVQ AX, 0x8(SP)            print.go:275        0x48725b        488d442448      LEAQ 0x48(SP), AX           print.go:275        0x487260        4889442410      MOVQ AX, 0x10(SP)           print.go:275        0x487265        48c744241801000000  MOVQ $0x1, 0x18(SP)         print.go:275        0x48726e        48c744242001000000  MOVQ $0x1, 0x20(SP)         print.go:275        0x487277        e8e498ffff      CALL fmt.Fprintln(SB)           print.go:275        0x48727c        488b6c2458      MOVQ 0x58(SP), BP           print.go:275        0x487281        4883c460        ADDQ $0x60, SP              print.go:275        0x487285        c3          RET                 main.go:5       0x487286        e8b580fcff      CALL runtime.morestack_noctxt(SB)   main.go:5       0x48728b        e920ffffff      JMP main.main(SB)           TEXT main.main.func1(SB) /root/open_falcon/main.gomain.go:7     0x487290        64488b0c25f8ffffff  MOVQ FS:0xfffffff8, CX          main.go:7       0x487299        483b6110        CMPQ 0x10(CX), SP           main.go:7       0x48729d        0f868b000000        JBE 0x48732e                main.go:7       0x4872a3        4883ec58        SUBQ $0x58, SP              main.go:7       0x4872a7        48896c2450      MOVQ BP, 0x50(SP)           main.go:7       0x4872ac        488d6c2450      LEAQ 0x50(SP), BP           main.go:8       0x4872b1        488b442460      MOVQ 0x60(SP), AX           main.go:8       0x4872b6        48890424        MOVQ AX, 0(SP)              main.go:8       0x4872ba        488d05ff810400      LEAQ main.statictmp_1(SB), AX       main.go:8       0x4872c1        4889442408      MOVQ AX, 0x8(SP)            main.go:8       0x4872c6        e8d5d3f7ff      CALL runtime.chansend1(SB)      main.go:9       0x4872cb        0f57c0          XORPS X0, X0                main.go:9       0x4872ce        0f11442440      MOVUPS X0, 0x40(SP)         main.go:9       0x4872d3        488d0506110100      LEAQ 0x11106(IP), AX            main.go:9       0x4872da        4889442440      MOVQ AX, 0x40(SP)           main.go:9       0x4872df        488d051a850400      LEAQ main.statictmp_2(SB), AX       main.go:9       0x4872e6        4889442448      MOVQ AX, 0x48(SP)           main.go:9       0x4872eb        90          NOPL                    print.go:275        0x4872ec        488b05fd040d00      MOVQ os.Stdout(SB), AX          print.go:275        0x4872f3        488d0d269a0400      LEAQ go.itab.*os.File,io.Writer(SB), CX print.go:275        0x4872fa        48890c24        MOVQ CX, 0(SP)              print.go:275        0x4872fe        4889442408      MOVQ AX, 0x8(SP)            print.go:275        0x487303        488d442440      LEAQ 0x40(SP), AX           print.go:275        0x487308        4889442410      MOVQ AX, 0x10(SP)           print.go:275        0x48730d        48c744241801000000  MOVQ $0x1, 0x18(SP)         print.go:275        0x487316        48c744242001000000  MOVQ $0x1, 0x20(SP)         print.go:275        0x48731f        e83c98ffff      CALL fmt.Fprintln(SB)           print.go:275        0x487324        488b6c2450      MOVQ 0x50(SP), BP           print.go:275        0x487329        4883c458        ADDQ $0x58, SP              print.go:275        0x48732d        c3          RET                 main.go:7       0x48732e        e80d80fcff      CALL runtime.morestack_noctxt(SB)   main.go:7       0x487333        e958ffffff      JMP main.main.func1(SB)

从汇编信息中可以看出,首先调用了runtime的makechan方法,接着就调用了runtime.newproc函数将func包装成为一个协程,此时主协程就调用了runtime的chanrecv1方法等待协程的数据返回。在golang的初始化启动流程中,golang会将main包装成为一个主协程进行运行,接着就分析一下具体的执行流程。

chan的主动让出调度与唤醒

chanrecv1主动选择调度
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {                                                         // 是否是调试模式print("chanrecv: chan=", c, "\n")}if c == nil {                               // 如果传入的chan为空if !block {                               // 如果是非阻塞的则返回return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)   // 设置等待为一个接受为空的chanthrow("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not ready for receiving, we observe that the// channel is not closed. Each of these observations is a single word-sized read// (first c.sendq.first or c.qcount, and second c.closed).// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.//// The order of operations is important here: reversing the operations can lead to// incorrect behavior when racing with a close.if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 {    return}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)                            // 该chan加锁if c.closed != 0 && c.qcount == 0 {   // 如果该chan不是关闭状态并且初始缓存个数不为0if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)                     // 解锁  返回if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)    // 如果可以立即接受 则唤醒调度等待的协程return true, true}if c.qcount > 0 {                                  // 如果是一个带缓冲区的chan// Receive directly from queueqp := chanbuf(c, c.recvx)                        // 则直接从队列中获取数据并拷贝到chan的接收队列中if raceenabled {raceacquire(qp)racerelease(qp)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}if !block {                             // 如果非阻塞则直接返回unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()                             // 如果没有可以接受的则阻塞mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = false                         mysg.c = cgp.param = nilc.recvq.enqueue(mysg)goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)   // 调用其他的协程// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}

从chanrecv的执行流程中可以看出,首先先根据chan的类型来判断是否是可以立即有数据可用,如果有则直接返回执行,如果是带缓冲区的chan并且chan中也没有数据可以使用则调用goparkunlock主动调用其他的协程。

goparkunlock函数调度其他协程
func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {if reason != waitReasonSleep {checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy}mp := acquirem()                          // 获取mgp := mp.curgstatus := readgstatus(gp)if status != _Grunning && status != _Gscanrunning {throw("gopark: bad g status")}mp.waitlock = lockmp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))gp.waitreason = reason                     // 等待阻塞的原因mp.waittraceev = traceEvmp.waittraceskip = traceskipreleasem(mp)// can't do anything that might move the G between Ms here.mcall(park_m)                    // 切换栈调用park_m
}func park_m(gp *g) {_g_ := getg()if trace.enabled {traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)}casgstatus(gp, _Grunning, _Gwaiting)               // 将该g设置为阻塞状态dropg()                                                                        // 接触g m 的关系if _g_.m.waitunlockf != nil {               // waitunlockf是否为空fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))ok := fn(gp, _g_.m.waitlock)             // 如果不为空则执行该函数_g_.m.waitunlockf = nil_g_.m.waitlock = nilif !ok {if trace.enabled {traceGoUnpark(gp, 2)}casgstatus(gp, _Gwaiting, _Grunnable)        // 获取锁之后就设置该g位可运行状态execute(gp, true) // Schedule it back, never returns.   // 继续执行该g}}schedule()                                                                        // 重新调度
}

紧接着就判断是否需要等待锁释放函数,如果没有需要等待的函数则直接进行schedule进行调度。此时就调度其它协程的执行。

chan的唤醒操作

在main.main.func1汇编代码中,在传入值1之后就调用了runtime.chansend1(SB)方法,该方法就是唤醒被该通道阻塞的G

func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {if c == nil {                                  // 如果通道为空if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)  // 向为空的通道发送数据throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation.if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {            return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)                      // 加锁if c.closed != 0 {                unlock(&c.lock)panic(plainError("send on closed channel"))}if sg := c.recvq.dequeue(); sg != nil {      // 如果当前接受队列不为空// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)   // 向该接受队列中发送数据return true}if c.qcount < c.dataqsiz {                         // 如果接受的数据为缓冲区// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true                                                                           // 如果缓冲区有值则可以继续执行}...
}

其中处理的逻辑也是检查接受的队列上面是否有内容,如果有值则直接调用send方法去发送数据,否则则等待chan的锁的释放去阻塞等待该值被执行。

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if raceenabled {if c.dataqsiz == 0 {racesync(c, sg)} else {// Pretend we go through the buffer, even though// we copy directly. Note that we need to increment// the head/tail locations only when raceenabled.qp := chanbuf(c, c.recvx)raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}}if sg.elem != nil {sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)        // 调用goready函数执行
}func goready(gp *g, traceskip int) {systemstack(func() {ready(gp, traceskip, true)})
}// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {if trace.enabled {traceGoUnpark(gp, traceskip)         // 是否trace跟踪}status := readgstatus(gp)// Mark runnable._g_ := getg()_g_.m.locks++ // disable preemption because it can be holding p in a local varif status&^_Gscan != _Gwaiting {        dumpgstatus(gp)throw("bad g->status in ready")}// status is Gwaiting or Gscanwaiting, make Grunnable and put on runqcasgstatus(gp, _Gwaiting, _Grunnable)         // 设置状态为可运行状态runqput(_g_.m.p.ptr(), gp, next)              // 放入队列当中if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {wakep()                        // 如果有空闲的p 且 m没有处于自旋状态 则创建一个线程工作}_g_.m.locks--if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in Case we've cleared it in newstack_g_.stackguard0 = stackPreempt}
}

此时将唤醒的g设置为可运行状态并检查是否有空闲的p,如果有空闲的p并且没有自旋状态的线程则调用wakep函数要么唤醒(唤醒在Linux中主要利用了futex系统调用)一个m或者创建一个新的工作线程。

至此chan对应的阻塞和唤醒的流程基本完成。

总结

本文简单的分析了一下有关chan的一个阻塞与唤醒的流程,其中主要就是通过设置g的状态,然后再唤醒等待该chan的协程以便继续执行,从而完成chan状态转移下的调度。由于本人才疏学浅,如有错误请批评指正。

golang源码分析:调度器chan调度相关推荐

  1. 【我的架构师之路】- golang源码分析之协程调度器底层实现( G、M、P)

    本人的源码是基于go 1.9.7 版本的哦! 紧接着之前写的 [我的区块链之路]- golang源码分析之select的底层实现 和 [我的区块链之路]- golang源码分析之channel的底层实 ...

  2. golang源码分析-调度概述

    golang源码分析-调度过程概述 本文主要概述一下golang的调度器的大概工作的流程,众所周知golang是基于用户态的协程的调度来完成多任务的执行.在Linux操作系统中,以往的多线程执行都是通 ...

  3. 【golang源码分析】chan底层原理——附带读写用户队列的环形缓冲区

    1 环形缓冲区 1.1 环形缓冲区结构 环形缓冲区通常有一个读指针和一个写指针.读指针指向环形缓冲区中可读的数据,写指针指向环形缓冲区中可写的缓冲区.通过移动读指针和写指针就可以实现缓冲区的数据读取和 ...

  4. golang源码分析-启动过程概述

    golang源码分析-启动过程概述 golang语言作为根据CSP模型实现的一种强类型的语言,本文主要就是通过简单的实例来分析一下golang语言的启动流程,为深入了解与学习做铺垫. golang代码 ...

  5. Spring AOP 源码分析 - 拦截器链的执行过程

    1.简介 本篇文章是 AOP 源码分析系列文章的最后一篇文章,在前面的两篇文章中,我分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在我们的得 ...

  6. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  7. Struts2 源码分析——拦截器的机制

    本章简言 上一章讲到关于action代理类的工作.即是如何去找对应的action配置信息,并执行action类的实例.而这一章笔者将讲到在执行action需要用到的拦截器.为什么要讲拦截器呢?可以这样 ...

  8. sis地址获取器_TencentOS tiny深度源码分析(2)—— 调度器

    温馨提示:本文不描述与浮点相关的寄存器的内容,如需了解自行查阅 调度器的基本概念 TencentOS tiny中提供的任务调度器是基于优先级的全抢占式调度,在系统运行过程中,当有比当前任务优先级更高的 ...

  9. 【TencentOS tiny学习】源码分析(2)——调度器

    文章目录 调度器的基本概念 启动调度器 Cortex-M内核关中断指令 回归正题 看看任务栈的初始化 查找最高优先级任务 任务切换的实现 SysTick SysTick初始化 SysTick中断 温馨 ...

最新文章

  1. python 黑白tif提取边界像素坐标_OpenCV GrabCut算法:前景分割和提取
  2. 单域名多php,php多域名单站点路由
  3. idea persistence生成_真厉害!竟然可以这样用IDEA通过数据库生成lombok版的POJO...
  4. halcon旋转后坐标_基于FPGA的图像旋转设计
  5. CV之MTCNN:MTCNN算法过程及其相关思路配图集合
  6. 最常用的决策树算法(三):XGBoost 和 LightGBM
  7. 安装eclipse中html/jsp/xml editor插件以及改动html页面的字体
  8. zen cart如何给新产品、特价、推荐产品页面加标题、关键字、描述
  9. 8086指令系统 操作数地址,双操作数,单操作数,无操作数指令。一,传送类指令;二,二、算数运算类指令
  10. 比亚迪高端车正驶入“囧途“?
  11. 基于JAVA+SpringMVC+Mybatis+MYSQL的企业通用门户网站官网
  12. 2017-2018-2 20179215《密码与安全新技术》第七周作业
  13. altium 快速设置网络_通过加载CAD“快速选择”插件,可以很大的提高绘图效率...
  14. Cdence版图设计手册
  15. [中英字幕]吴恩达机器学习系列课程 笔记
  16. 黔江哪里可以学计算机,黔江有什么大学
  17. value函数介绍和实例
  18. 锚文本链接用html怎么做,锚文本链接是什么?
  19. boost之日期 时间(date_time)
  20. 为什么upupoo显示服务器维护中,首先,电脑能正常上网,有的网站可以上传图片,但就是不能显示上传图片的功能按钮,网页打开都正常。...

热门文章

  1. 人工智能语音技术支持“多情感程度”调节,细腻演绎“人声”
  2. 用机器学习还原《隐秘的角落》那些被修改的台词
  3. 一文告诉你,如何使用Python构建一个“谷歌搜索”系统 | 内附代码
  4. 入门必备 | 一文读懂神经架构搜索
  5. Dropbox如何使用机器学习从数十亿图片中自动提取文字
  6. 达沃斯群英纵论人工智能,核心观点汇总
  7. 理解Java对象:要从内存布局及底层机制说起,话说....
  8. 有了这款可视化工具,Java 应用性能分析、调优 so easy...
  9. 史上最全的数据库面试题,不看后悔篇!
  10. 尹伊:Datawhale做的一件事