golang runtime源码阅读 channal实现
概念及使用场景
通道(channal)是Golang实现CSP并发模型的关键,分为 有缓冲通道 和无缓冲通道。
- 有缓冲管道: channal持有一个固定大小的队列,队列满时发送者将阻塞(反之亦然)。多用于数据共享。
- 无缓冲管道: 发送和接收数据同时完成,如果没有goroutine读取channal,则发送者阻塞(反之亦然)。多用于协程同步。
- 有缓冲+select: +for实现对多个chan的监听操作 +time实现超时控制
基本使用
简单的 无缓冲channal 使用
func main() {ch := make(chan int) //创建无缓冲管道go func() { //发起goroutine,向管道写入数据ch <- 1close(ch) //关闭管道}() t := <- ch //主goroutine等待读取到管道数据后退出fmt.Println(t)return
需要注意的是:
1. 无缓冲在写入时,必须有其他线程在对端接受,否则线程将阻塞。
2. 向关闭的chan写入数据将Panic
3. 可以从关闭的chan中读取数据
4. 重复关闭chann会导致panic
基本结构及内存布局
chan结构体定义在 ./src/runtime/chan.go
文件,type hchan struct
type hchan struct {qcount uint // 队列数据长度lendataqsiz uint // 缓冲区长度cap c:=make(chan int, 10)即此值为10buf unsafe.Pointer // 指向dataqsiz数组的指针elemsize uint16 //元素类型大小 sizeof(struct)closed uint32 //关闭标志elemtype *_type // chan接收的元素类型sendx uint // 写入chan索引,写入1个数据时,sendx+1recvx uint // 读取chan索引,被读走1个数据时,recvx+1recvq waitq // 阻塞在chann上的读等待队列sendq waitq // 阻塞在chann上的写等待队列lock mutex //互斥锁 保证hchan的数据读写安全,包括被阻塞的waitq中的sudog
}
waitq类型链表,sudog 为封装的goroutine,包含等待 读/写的被阻塞的goroutine信息。
type waitq struct {first *sudoglast *sudog
}type sudog struct {g *g // 阻塞的goroutineisSelect bool // select中使用chan是特殊处理的,本章暂时不论next *sudog prev *sudogelem unsafe.Pointer // 数据指针(指向堆内存或栈空间)acquiretime int64releasetime int64ticket uint32parent *sudog // semaRoot binary treewaitlink *sudog // g.waiting list or semaRootwaittail *sudog // semaRootc *hchan // channel
}
结构中大约了解下 channal 的实现
channal 有缓存时通过读写索引读取数据,并保存了可容纳元素数量(dataqsize) 及当前元素量(qcount)。recvq和sendq为两个队列,遵循先进先出原则,队列元素sudog为封装后的goroutine,包含goroutine等待的chan信息和数据地址,一般为阻塞状态。互斥锁保证hchan中的数据读写安全。
对于无缓冲channal,recvq 和 sendq至少有一个为空,且 dataqsiz 和 qcount 都为0。
初始化 channal 过程如下
// use example:
ch := make(chan int, 10)
// code
func makechan(t *chantype, size int) *hchan {elem := t.elem// 检查是否存在size越界问题 if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {panic(plainError("makechan: size out of range"))}var c *hchanswitch {// make(chan int, 0) size=0 but elem.size=8// make(struct{}, 2) size=2 but elem.size=0case size == 0 || elem.size == 0: //分配hchan大小,无缓冲chan只分配这些,以下是有缓冲chanc = (*hchan)(mallocgc(hchanSize, nil, true))// race检测,不懂c.buf = c.raceaddr()// channal类型不包含指针 case elem.kind&kindNoPointers != 0:// 分配空间大小为(hchan结构大小 单个元素大小*请求分配数量)的堆内存c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))// c.buf指针指向hchan结构后,即存储元素区c.buf = add(unsafe.Pointer(c), hchanSize)// channal类型包含指针default:// 分配一块hchan内存// c.buf 指向 分配固定大小的元素存储空间 的堆内存c = new(hchan)c.buf = mallocgc(uintptr(size)*elem.size, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)return c
}
如上,make channal时内存分配如下图
如上,
无缓冲队列中仅有hchan,即没有空间可用于存储元素,所以如果发生写数据请求,只有 1)阻塞在sendq队列中 2) 对端有阻塞recvq,直接copy到对方goroutine空间
有缓冲chan之所以要分开指针类型缓冲区主要是为了区分gc操作,需要将它设置为flagNoScan。并且指针大小固定,可以跟hchan头部一起分配内存。
然后看下sendq和recvq操作具体如何发生
chan如何工作
写 channal 操作
1. 写入未初始化的chan,程序将永久阻塞。此时go run 报错:<font color=orange> fatal error: all goroutines are asleep - deadlock! </font>,goroutine被强制结束
2. 写入已关闭chan,程序将发生panic。go run报错: <font color=orange> panic: send on closed channel </font>
3. 如果是无缓冲chan,对端有等待goroutine时,复制数据给goroutine
4. 写入有缓冲chan,并且有空间可写时,将消息复制给等待的goroutine
5. 缓冲队列已满, 新建(sudog)G,并加入到sendq阻塞队列
// 编译代码中 c <- x 入口函数
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}
// c为chan, ep为需要写入的变量的地址, block=TRUE, callerpc
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {//CASE 1: chan未初始化时为Nil,send数据将永久阻塞if c == nil {if !block {return false}//gopark函数会使goroutine休眠, 通过unlockf唤醒,但此处unlockf为nil,所以将一直休眠gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}// 未被加锁 未关闭 且 1.(队列长度cap为0(无缓冲channal) 且 外部写chan协程队列为空) 或 2.(cap>0 且 cap=len已满)返回false. 但是chansend1调用时block是true,判断跳过if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {return false}// 关于cpu调度var t0 int64if blockprofilerate > 0 {t0 = cputicks()}//获取chann的锁lock(&c.lock)//CASE 2: send已关闭chan会panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}//CASE 3: c.recvq队列中有等待接收goroutine。调用send函数将ep数据发给sg(recvGoroutine),稍后看send函数if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}//CASE 4: 有缓冲管道,并且有空间可写。将消息复制到等待goroutineif c.qcount < c.dataqsiz {// 定位写chan索引位置。c.sendx为写chan的个数位置,qp定位在 c+sendx*数据类型长度qp := chanbuf(c, c.sendx)//将需要写入的数据ep拷贝到qp指向的地址typedmemmove(c.elemtype, qp, ep)//chan每次只能写入一个元素c.sendx++//sendx为写入chann的索引地址,==dataqsiz时缓冲区写满。带缓冲chan使用环形地址,写入索引sendx变为0if c.sendx == c.dataqsiz {c.sendx = 0}//数据长度++后解锁c.qcount++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// CASE5: 缓冲队列已满,新建(sudog)G,并加入到sendq队列//获取当前goroutinegp := getg()// 创建一个等待队列抽象goroutine(sudog)mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// mySg加入到send队列c.sendq.enqueue(mysg)//然后此协程阻塞,直到获取到c.lockgoparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
}
关于send过程
// send在空chan上执行写操作。ep值被发送者copy到recv队列中的sg中。chan必须为空并且锁定,send通过unlockf函数。注意调用此函数时,对端有等待recvq阻塞,说明缓冲为空或无缓冲
// 为其解锁. sg必须被退出c的队列,ep值必须非空并指向堆或者调用栈。
// use eg: send(c, recvSg, ep, func() { unlock(&c.lock) }, 3)
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {//接收sg的elem在等待队列中时,一般会指向一片内存(栈或堆), 检测到不为空时,直接将数据写入那片内存并将指针置空if sg.elem != nil {// 将数据ep数据复制给sg的存储数据地址sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.g// 解锁chanunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}//gp 置位可运行态goready(gp, skip+1)
}
读channal操作
以下为 “从chan中读数据到ep” 情况:
- CASE 1: 读未初始化的chan,程序将永久阻塞。此时go run 报错: fatal error: all goroutines are asleep - deadlock! ,goroutine被强制结束
- CASE 2: 与写chan不同。读已关闭chan,不会发生panic,如果缓冲区无数据,返回true(表示读取操作成功) 和 false(表示未读到数据)
- CASE 3: 对端有阻塞等待写goroutine 且 无缓冲区,直接将数据写入对端goroutine
- CASE 4: 对端有阻塞等待写goroutine 且 是有缓冲chan时,此时缓冲队列已满,又因为readx和recvx索引为环形队列,所以此时两值相等。 从缓冲区recvx处取出数据复制给ep,再将写goroutine的数据写入缓冲区,过程中队列依旧满。
- CASE 5:对端无阻塞等待写goroutine,且缓冲有数据时,从recvx处复制数据给ep。
(recv时,ep可为nil,此时相当于清理chan缓冲中一个元素)
// 编译代码 <- c 入口函数
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {//CASE 1:读未初始化的chan,永久阻塞if c == nil {if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 {return}lock(&c.lock)//CASE 2:已关闭且数据长度为0.解锁,清理ep的数据指向内存,返回true(chan正常), false(未取到数据)if c.closed != 0 && c.qcount == 0 {unlock(&c.lock)if ep != nil {// 清理内存typedmemclr(c.elemtype, ep)}return true, false}//CASE 3: sendq队列中有阻塞gorotine时,recv数据。分为有缓冲和无缓冲情况,在recv函数中if sg := c.sendq.dequeue(); sg != nil {recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}//CASE 4: 无sendSg但chan缓冲中有数据,取数据返回true trueif c.qcount > 0 {// Receive directly from queue 找到读索引位置qp := chanbuf(c, c.recvx)//ep不为空时读走数据if ep != nil {typedmemmove(c.elemtype, ep, qp)}//ep清理读索引指向的那一个数据(即不管左端是否有ep接受数据,这个数据都没有了)typedmemclr(c.elemtype, qp)//读索引++c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}//队列中数据量-1c.qcount--unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}//CASE 4: 缓冲无数据,创建sudog加入到recvqueue中阻塞等待chan有新数据gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)releaseSudog(mysg)return true, !closed
}
recv 过程
// 此函数用于从sg(写chan的goroutine)读取数据到ep上,注意调用此函数时,说明缓冲区已满,所以对端有goroutine阻塞(sendx=datasiz recvx=0) 或者无缓冲区
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {// CASE 1: 无缓冲chanif c.dataqsiz == 0 {//直接从sg数据区 copy数据到epif ep != nil {recvDirect(c.elemtype, sg, ep)}//CASE 2: 有缓冲chan} else {//找到读数据的索引位置qp := chanbuf(c, c.recvx)// 从chan缓冲区读索引位置读c.elemtype类型大小的内存到ep中if ep != nil {typedmemmove(c.elemtype, ep, qp)}// 然后从sendSg中取数据追加给chan缓冲区,recv++typedmemmove(c.elemtype, qp, sg.elem)// chan写数据索引++.索引地址达到最大时归零c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}// recvx++后,sendx=recvx 将内存区当做环形来用,相当于sendx++ c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}
close channal过程
func closechan(c *hchan) {//关闭未初始化chan,panicif c == nil {panic(plainError("close of nil channel"))}//close已经关闭chan, paniclock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}c.closed = 1var glist *g// 释放所有的阻塞的读goroutinefor {//获取读数据的队列,清理接收队列中的数据空间等sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {//清除sg.elem指向的内存为type的类型,然后指向空typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilgp.schedlink.set(glist)glist = gp}// 释放所有的阻塞的写goroutine(将panic) for {sg := c.sendq.dequeue()if sg == nil {break}// 未对阻塞写的空间进行清理sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilgp.schedlink.set(glist)glist = gp}unlock(&c.lock)// 依次唤醒所有阻塞的goroutinefor glist != nil {gp := glistglist = glist.schedlink.ptr()gp.schedlink = 0goready(gp, 3)}
}
遗留问题:
为什么指针类型的chan make时,hchan要和缓冲区分开分配。而非指针类型是连续的一块存储?? 好像跟gc有关系
为什么清理读goroutine 未清理写goroutine?? 一般读goroutine的数据区是等待写入数据或为nil的,一般无数据可释放。 写goroutine一般是已经初始化的数据
golang runtime源码阅读 channal实现相关推荐
- golangsha1解码_如何阅读Golang的源码?
Go 的源码在安装包的 src/ 目录下.怎么看它的源码呢?直接看吧!没人教的情况下,只能自己撸了.当然,这种内容一般也不会有人教. 怎么撸? Go 源码中,应该可分为与语言息息相关的部分,和官方提供 ...
- Golang流媒体实战之五:lal推流服务源码阅读
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <Golang流媒体实战>系列的链接 体验 ...
- Golang流媒体实战之六:lal拉流服务源码阅读
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <Golang流媒体实战>系列的链接 体验 ...
- gh-ost大表DDL工具源码阅读
gh-ost大表DDL工具源码阅读 最终目的 开发环境与测试数据库准备 一个简单的ddl案例 debug分析程序执行过程 vscode debug配置 变量介绍 核心处理逻辑 分析我的需求 最终目的 ...
- syzkaller 源码阅读笔记1(syz-extract syz-sysgen)
文章目录 1. syz-extract 1-0 总结 1-1. `main()` 1-2 `archList()` - `1-1 (3)` 获取架构 name list 1-3 `createArch ...
- go 中 select 源码阅读
Python微信订餐小程序课程视频 https://blog.csdn.net/m0_56069948/article/details/122285951 Python实战量化交易理财系统 https ...
- Go调度器系列(4)源码阅读与探索
各位朋友,这次想跟大家分享一下Go调度器源码阅读相关的知识和经验,网络上已经有很多剖析源码的好文章,所以这篇文章不是又一篇源码剖析文章,注重的不是源码分析分享,而是带给大家一些学习经验,希望大家能更好 ...
- 源码阅读:AFNetworking(十六)——UIWebView+AFNetworking
该文章阅读的AFNetworking的版本为3.2.0. 这个分类提供了对请求周期进行控制的方法,包括进度监控.成功和失败的回调. 1.接口文件 1.1.属性 /**网络会话管理者对象*/ @prop ...
- 转-OpenJDK源码阅读导航跟编译
OpenJDK源码阅读导航 OpenJDK源码阅读导航 博客分类: Virtual Machine HotSpot VM Java OpenJDK openjdk 这是链接帖.主体内容都在各链接中. ...
- android tcp socket框架_最流行的 Web 框架 Gin 源码阅读
最近公司大部分项目开始往golang换, api的框架选定使用gin, 于是将 gin的源码看了一遍, 会用几篇文章将gin的流程及流程做一个梳理, 下面进入正题. gin框架预览 上图大概是 gin ...
最新文章
- Jupyter Notebook 添加目录
- hdu 4417 Super Mario 树状数组||主席树
- Go http client 连接池不复用的问题
- [ASP.NET MVC2 系列] ASP.NET MVC 之如何创建自定义路由约束
- 函数的命名空间以及作用域
- python安装不了jupyter_python学习笔记——Windowns下Python3之安装jupyter
- 学习使用 Go 的反射
- 深度学习 占用gpu内存 使用率为0_2020年深度学习最佳GPU一览,看看哪一款最适合你!...
- 百度地图和openlayers融合封装(想法)
- 谷歌浏览器如何长截屏
- MAC苹果电脑装单win10系统
- Ubuntu查找文件
- Linux手动安装JDK并配置多个版本JDK--JDK配置和Jenv的配置使用
- matlab将图片旋转的代码_论文写作经验分享word+mathtype+matlab
- 没有时间进行测试? —有关在Python中对AWS Lambda进行单元测试的12条建议
- mail = imaplib.IMAP4_SSL('k20gslf-0kF')
- Python3 pip安装-Star.hou
- 分支限界算法 之 A*算法(启发式搜索算法)---九宫重排游戏(也称八数码问题)
- 集线器、网桥、交换机、路由器
- PLC数据采集的方法小结及成本比较
热门文章
- web测试----死链检查(Xenu)
- 卓有成效的管理者(笔记)——要事优先
- Angr源码分析——DDG的生成
- 数据库在软件开发中的作用是什么?
- Acwing-873. 欧拉函数
- android 11.0禁用电源键(屏蔽关机短按长按事件)
- 计算机主板清理,电脑主板脏了如何清洗电脑主板才是正确
- Using ‘UTF-8‘ encoding to copy filtered resources. skip non existing resourceDirectory
- 可靠传输协议 rdt 1.0、rdt 2.0、rdt 2.1、rdt 2.2、rdt3.0
- WPF 精修篇 滑条