golang源码分析:调度器chan调度
golang调度机制chan调度
golang的调度策略中,碰见阻塞chan就会将该chan放入到阻塞的g中,然后再等待该chan被唤醒,这是golang调度器策略的主动调度策略之一,其中还有其他的主动调度策略包括进入调用系统调用或者主动调用Gosched时,都会发生主动调度,对应的当然也有被动调度,被动调度主要是后台线程在检查到某一个g执行的时间过长,再该g进行栈扩展(即调用函数时)就会进行被动调度,这些内容后续再讨论。本文先了解一下golang在阻塞chan下的调度机制。
chan阻塞调度示例代码
package mainimport "fmt"func main(){chan_w := make(chan int)go func(){chan_w <- 1fmt.Println("over")}()<- chan_wfmt.Println("main over")
}
示例代码相对简单,即创建一个chan_w,调用一个协程去传入值,然后主协程就等待chan_w的结果返回,此时我们查看一下对应的汇编输出。
# go build main.go
# go tool objdump -s "main\.main" main
TEXT main.main(SB) /root/open_falcon/main.gomain.go:5 0x4871b0 64488b0c25f8ffffff MOVQ FS:0xfffffff8, CX main.go:5 0x4871b9 483b6110 CMPQ 0x10(CX), SP main.go:5 0x4871bd 0f86c3000000 JBE 0x487286 main.go:5 0x4871c3 4883ec60 SUBQ $0x60, SP main.go:5 0x4871c7 48896c2458 MOVQ BP, 0x58(SP) main.go:5 0x4871cc 488d6c2458 LEAQ 0x58(SP), BP main.go:6 0x4871d1 488d05c8050100 LEAQ 0x105c8(IP), AX main.go:6 0x4871d8 48890424 MOVQ AX, 0(SP) main.go:6 0x4871dc 48c744240800000000 MOVQ $0x0, 0x8(SP) main.go:6 0x4871e5 e886d2f7ff CALL runtime.makechan(SB) main.go:6 0x4871ea 488b442410 MOVQ 0x10(SP), AX main.go:6 0x4871ef 4889442440 MOVQ AX, 0x40(SP) main.go:7 0x4871f4 c7042408000000 MOVL $0x8, 0(SP) main.go:7 0x4871fb 488d0d268c0300 LEAQ 0x38c26(IP), CX main.go:7 0x487202 48894c2408 MOVQ CX, 0x8(SP) main.go:7 0x487207 e8e49efaff CALL runtime.newproc(SB) main.go:11 0x48720c 488b442440 MOVQ 0x40(SP), AX main.go:11 0x487211 48890424 MOVQ AX, 0(SP) main.go:11 0x487215 48c744240800000000 MOVQ $0x0, 0x8(SP) main.go:11 0x48721e e8bddff7ff CALL runtime.chanrecv1(SB) main.go:12 0x487223 0f57c0 XORPS X0, X0 main.go:12 0x487226 0f11442448 MOVUPS X0, 0x48(SP) main.go:12 0x48722b 488d05ae110100 LEAQ 0x111ae(IP), AX main.go:12 0x487232 4889442448 MOVQ AX, 0x48(SP) main.go:12 0x487237 488d05b2850400 LEAQ main.statictmp_0(SB), AX main.go:12 0x48723e 4889442450 MOVQ AX, 0x50(SP) main.go:12 0x487243 90 NOPL print.go:275 0x487244 488b05a5050d00 MOVQ os.Stdout(SB), AX print.go:275 0x48724b 488d0dce9a0400 LEAQ go.itab.*os.File,io.Writer(SB), CX print.go:275 0x487252 48890c24 MOVQ CX, 0(SP) print.go:275 0x487256 4889442408 MOVQ AX, 0x8(SP) print.go:275 0x48725b 488d442448 LEAQ 0x48(SP), AX print.go:275 0x487260 4889442410 MOVQ AX, 0x10(SP) print.go:275 0x487265 48c744241801000000 MOVQ $0x1, 0x18(SP) print.go:275 0x48726e 48c744242001000000 MOVQ $0x1, 0x20(SP) print.go:275 0x487277 e8e498ffff CALL fmt.Fprintln(SB) print.go:275 0x48727c 488b6c2458 MOVQ 0x58(SP), BP print.go:275 0x487281 4883c460 ADDQ $0x60, SP print.go:275 0x487285 c3 RET main.go:5 0x487286 e8b580fcff CALL runtime.morestack_noctxt(SB) main.go:5 0x48728b e920ffffff JMP main.main(SB) TEXT main.main.func1(SB) /root/open_falcon/main.gomain.go:7 0x487290 64488b0c25f8ffffff MOVQ FS:0xfffffff8, CX main.go:7 0x487299 483b6110 CMPQ 0x10(CX), SP main.go:7 0x48729d 0f868b000000 JBE 0x48732e main.go:7 0x4872a3 4883ec58 SUBQ $0x58, SP main.go:7 0x4872a7 48896c2450 MOVQ BP, 0x50(SP) main.go:7 0x4872ac 488d6c2450 LEAQ 0x50(SP), BP main.go:8 0x4872b1 488b442460 MOVQ 0x60(SP), AX main.go:8 0x4872b6 48890424 MOVQ AX, 0(SP) main.go:8 0x4872ba 488d05ff810400 LEAQ main.statictmp_1(SB), AX main.go:8 0x4872c1 4889442408 MOVQ AX, 0x8(SP) main.go:8 0x4872c6 e8d5d3f7ff CALL runtime.chansend1(SB) main.go:9 0x4872cb 0f57c0 XORPS X0, X0 main.go:9 0x4872ce 0f11442440 MOVUPS X0, 0x40(SP) main.go:9 0x4872d3 488d0506110100 LEAQ 0x11106(IP), AX main.go:9 0x4872da 4889442440 MOVQ AX, 0x40(SP) main.go:9 0x4872df 488d051a850400 LEAQ main.statictmp_2(SB), AX main.go:9 0x4872e6 4889442448 MOVQ AX, 0x48(SP) main.go:9 0x4872eb 90 NOPL print.go:275 0x4872ec 488b05fd040d00 MOVQ os.Stdout(SB), AX print.go:275 0x4872f3 488d0d269a0400 LEAQ go.itab.*os.File,io.Writer(SB), CX print.go:275 0x4872fa 48890c24 MOVQ CX, 0(SP) print.go:275 0x4872fe 4889442408 MOVQ AX, 0x8(SP) print.go:275 0x487303 488d442440 LEAQ 0x40(SP), AX print.go:275 0x487308 4889442410 MOVQ AX, 0x10(SP) print.go:275 0x48730d 48c744241801000000 MOVQ $0x1, 0x18(SP) print.go:275 0x487316 48c744242001000000 MOVQ $0x1, 0x20(SP) print.go:275 0x48731f e83c98ffff CALL fmt.Fprintln(SB) print.go:275 0x487324 488b6c2450 MOVQ 0x50(SP), BP print.go:275 0x487329 4883c458 ADDQ $0x58, SP print.go:275 0x48732d c3 RET main.go:7 0x48732e e80d80fcff CALL runtime.morestack_noctxt(SB) main.go:7 0x487333 e958ffffff JMP main.main.func1(SB)
从汇编信息中可以看出,首先调用了runtime的makechan方法,接着就调用了runtime.newproc函数将func包装成为一个协程,此时主协程就调用了runtime的chanrecv1方法等待协程的数据返回。在golang的初始化启动流程中,golang会将main包装成为一个主协程进行运行,接着就分析一下具体的执行流程。
chan的主动让出调度与唤醒
chanrecv1主动选择调度
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan { // 是否是调试模式print("chanrecv: chan=", c, "\n")}if c == nil { // 如果传入的chan为空if !block { // 如果是非阻塞的则返回return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) // 设置等待为一个接受为空的chanthrow("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not ready for receiving, we observe that the// channel is not closed. Each of these observations is a single word-sized read// (first c.sendq.first or c.qcount, and second c.closed).// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.//// The order of operations is important here: reversing the operations can lead to// incorrect behavior when racing with a close.if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 { return}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock) // 该chan加锁if c.closed != 0 && c.qcount == 0 { // 如果该chan不是关闭状态并且初始缓存个数不为0if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock) // 解锁 返回if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 如果可以立即接受 则唤醒调度等待的协程return true, true}if c.qcount > 0 { // 如果是一个带缓冲区的chan// Receive directly from queueqp := chanbuf(c, c.recvx) // 则直接从队列中获取数据并拷贝到chan的接收队列中if raceenabled {raceacquire(qp)racerelease(qp)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}if !block { // 如果非阻塞则直接返回unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg() // 如果没有可以接受的则阻塞mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = false mysg.c = cgp.param = nilc.recvq.enqueue(mysg)goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3) // 调用其他的协程// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}
从chanrecv的执行流程中可以看出,首先先根据chan的类型来判断是否是可以立即有数据可用,如果有则直接返回执行,如果是带缓冲区的chan并且chan中也没有数据可以使用则调用goparkunlock主动调用其他的协程。
goparkunlock函数调度其他协程
func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {if reason != waitReasonSleep {checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy}mp := acquirem() // 获取mgp := mp.curgstatus := readgstatus(gp)if status != _Grunning && status != _Gscanrunning {throw("gopark: bad g status")}mp.waitlock = lockmp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))gp.waitreason = reason // 等待阻塞的原因mp.waittraceev = traceEvmp.waittraceskip = traceskipreleasem(mp)// can't do anything that might move the G between Ms here.mcall(park_m) // 切换栈调用park_m
}func park_m(gp *g) {_g_ := getg()if trace.enabled {traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)}casgstatus(gp, _Grunning, _Gwaiting) // 将该g设置为阻塞状态dropg() // 接触g m 的关系if _g_.m.waitunlockf != nil { // waitunlockf是否为空fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))ok := fn(gp, _g_.m.waitlock) // 如果不为空则执行该函数_g_.m.waitunlockf = nil_g_.m.waitlock = nilif !ok {if trace.enabled {traceGoUnpark(gp, 2)}casgstatus(gp, _Gwaiting, _Grunnable) // 获取锁之后就设置该g位可运行状态execute(gp, true) // Schedule it back, never returns. // 继续执行该g}}schedule() // 重新调度
}
紧接着就判断是否需要等待锁释放函数,如果没有需要等待的函数则直接进行schedule进行调度。此时就调度其它协程的执行。
chan的唤醒操作
在main.main.func1汇编代码中,在传入值1之后就调用了runtime.chansend1(SB)方法,该方法就是唤醒被该通道阻塞的G
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {if c == nil { // 如果通道为空if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) // 向为空的通道发送数据throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation.if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock) // 加锁if c.closed != 0 { unlock(&c.lock)panic(plainError("send on closed channel"))}if sg := c.recvq.dequeue(); sg != nil { // 如果当前接受队列不为空// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 向该接受队列中发送数据return true}if c.qcount < c.dataqsiz { // 如果接受的数据为缓冲区// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true // 如果缓冲区有值则可以继续执行}...
}
其中处理的逻辑也是检查接受的队列上面是否有内容,如果有值则直接调用send方法去发送数据,否则则等待chan的锁的释放去阻塞等待该值被执行。
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if raceenabled {if c.dataqsiz == 0 {racesync(c, sg)} else {// Pretend we go through the buffer, even though// we copy directly. Note that we need to increment// the head/tail locations only when raceenabled.qp := chanbuf(c, c.recvx)raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}}if sg.elem != nil {sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1) // 调用goready函数执行
}func goready(gp *g, traceskip int) {systemstack(func() {ready(gp, traceskip, true)})
}// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {if trace.enabled {traceGoUnpark(gp, traceskip) // 是否trace跟踪}status := readgstatus(gp)// Mark runnable._g_ := getg()_g_.m.locks++ // disable preemption because it can be holding p in a local varif status&^_Gscan != _Gwaiting { dumpgstatus(gp)throw("bad g->status in ready")}// status is Gwaiting or Gscanwaiting, make Grunnable and put on runqcasgstatus(gp, _Gwaiting, _Grunnable) // 设置状态为可运行状态runqput(_g_.m.p.ptr(), gp, next) // 放入队列当中if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {wakep() // 如果有空闲的p 且 m没有处于自旋状态 则创建一个线程工作}_g_.m.locks--if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in Case we've cleared it in newstack_g_.stackguard0 = stackPreempt}
}
此时将唤醒的g设置为可运行状态并检查是否有空闲的p,如果有空闲的p并且没有自旋状态的线程则调用wakep函数要么唤醒(唤醒在Linux中主要利用了futex系统调用)一个m或者创建一个新的工作线程。
至此chan对应的阻塞和唤醒的流程基本完成。
总结
本文简单的分析了一下有关chan的一个阻塞与唤醒的流程,其中主要就是通过设置g的状态,然后再唤醒等待该chan的协程以便继续执行,从而完成chan状态转移下的调度。由于本人才疏学浅,如有错误请批评指正。
golang源码分析:调度器chan调度相关推荐
- 【我的架构师之路】- golang源码分析之协程调度器底层实现( G、M、P)
本人的源码是基于go 1.9.7 版本的哦! 紧接着之前写的 [我的区块链之路]- golang源码分析之select的底层实现 和 [我的区块链之路]- golang源码分析之channel的底层实 ...
- golang源码分析-调度概述
golang源码分析-调度过程概述 本文主要概述一下golang的调度器的大概工作的流程,众所周知golang是基于用户态的协程的调度来完成多任务的执行.在Linux操作系统中,以往的多线程执行都是通 ...
- 【golang源码分析】chan底层原理——附带读写用户队列的环形缓冲区
1 环形缓冲区 1.1 环形缓冲区结构 环形缓冲区通常有一个读指针和一个写指针.读指针指向环形缓冲区中可读的数据,写指针指向环形缓冲区中可写的缓冲区.通过移动读指针和写指针就可以实现缓冲区的数据读取和 ...
- golang源码分析-启动过程概述
golang源码分析-启动过程概述 golang语言作为根据CSP模型实现的一种强类型的语言,本文主要就是通过简单的实例来分析一下golang语言的启动流程,为深入了解与学习做铺垫. golang代码 ...
- Spring AOP 源码分析 - 拦截器链的执行过程
1.简介 本篇文章是 AOP 源码分析系列文章的最后一篇文章,在前面的两篇文章中,我分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在我们的得 ...
- 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析
目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...
- Struts2 源码分析——拦截器的机制
本章简言 上一章讲到关于action代理类的工作.即是如何去找对应的action配置信息,并执行action类的实例.而这一章笔者将讲到在执行action需要用到的拦截器.为什么要讲拦截器呢?可以这样 ...
- sis地址获取器_TencentOS tiny深度源码分析(2)—— 调度器
温馨提示:本文不描述与浮点相关的寄存器的内容,如需了解自行查阅 调度器的基本概念 TencentOS tiny中提供的任务调度器是基于优先级的全抢占式调度,在系统运行过程中,当有比当前任务优先级更高的 ...
- 【TencentOS tiny学习】源码分析(2)——调度器
文章目录 调度器的基本概念 启动调度器 Cortex-M内核关中断指令 回归正题 看看任务栈的初始化 查找最高优先级任务 任务切换的实现 SysTick SysTick初始化 SysTick中断 温馨 ...
最新文章
- python 黑白tif提取边界像素坐标_OpenCV GrabCut算法:前景分割和提取
- 单域名多php,php多域名单站点路由
- idea persistence生成_真厉害!竟然可以这样用IDEA通过数据库生成lombok版的POJO...
- halcon旋转后坐标_基于FPGA的图像旋转设计
- CV之MTCNN:MTCNN算法过程及其相关思路配图集合
- 最常用的决策树算法(三):XGBoost 和 LightGBM
- 安装eclipse中html/jsp/xml editor插件以及改动html页面的字体
- zen cart如何给新产品、特价、推荐产品页面加标题、关键字、描述
- 8086指令系统 操作数地址,双操作数,单操作数,无操作数指令。一,传送类指令;二,二、算数运算类指令
- 比亚迪高端车正驶入“囧途“?
- 基于JAVA+SpringMVC+Mybatis+MYSQL的企业通用门户网站官网
- 2017-2018-2 20179215《密码与安全新技术》第七周作业
- altium 快速设置网络_通过加载CAD“快速选择”插件,可以很大的提高绘图效率...
- Cdence版图设计手册
- [中英字幕]吴恩达机器学习系列课程 笔记
- 黔江哪里可以学计算机,黔江有什么大学
- value函数介绍和实例
- 锚文本链接用html怎么做,锚文本链接是什么?
- boost之日期 时间(date_time)
- 为什么upupoo显示服务器维护中,首先,电脑能正常上网,有的网站可以上传图片,但就是不能显示上传图片的功能按钮,网页打开都正常。...
热门文章
- 人工智能语音技术支持“多情感程度”调节,细腻演绎“人声”
- 用机器学习还原《隐秘的角落》那些被修改的台词
- 一文告诉你,如何使用Python构建一个“谷歌搜索”系统 | 内附代码
- 入门必备 | 一文读懂神经架构搜索
- Dropbox如何使用机器学习从数十亿图片中自动提取文字
- 达沃斯群英纵论人工智能,核心观点汇总
- 理解Java对象:要从内存布局及底层机制说起,话说....
- 有了这款可视化工具,Java 应用性能分析、调优 so easy...
- 史上最全的数据库面试题,不看后悔篇!
- 尹伊:Datawhale做的一件事