概念及使用场景

通道(channal)是Golang实现CSP并发模型的关键,分为 有缓冲通道 和无缓冲通道。
  • 有缓冲管道: channal持有一个固定大小的队列,队列满时发送者将阻塞(反之亦然)。多用于数据共享。
  • 无缓冲管道: 发送和接收数据同时完成,如果没有goroutine读取channal,则发送者阻塞(反之亦然)。多用于协程同步。
  • 有缓冲+select: +for实现对多个chan的监听操作 +time实现超时控制

基本使用

简单的 无缓冲channal 使用

func main() {ch := make(chan int)  //创建无缓冲管道go func() {          //发起goroutine,向管道写入数据ch <- 1close(ch)                    //关闭管道}() t := <- ch                     //主goroutine等待读取到管道数据后退出fmt.Println(t)return

需要注意的是:
1. 无缓冲在写入时,必须有其他线程在对端接受,否则线程将阻塞。
2. 向关闭的chan写入数据将Panic
3. 可以从关闭的chan中读取数据
4. 重复关闭chann会导致panic

基本结构及内存布局

chan结构体定义在 ./src/runtime/chan.go文件,type hchan struct

type hchan struct {qcount   uint           // 队列数据长度lendataqsiz uint           // 缓冲区长度cap c:=make(chan int, 10)即此值为10buf      unsafe.Pointer // 指向dataqsiz数组的指针elemsize uint16         //元素类型大小 sizeof(struct)closed   uint32          //关闭标志elemtype *_type       // chan接收的元素类型sendx    uint         // 写入chan索引,写入1个数据时,sendx+1recvx    uint           // 读取chan索引,被读走1个数据时,recvx+1recvq    waitq         // 阻塞在chann上的读等待队列sendq    waitq        // 阻塞在chann上的写等待队列lock   mutex          //互斥锁 保证hchan的数据读写安全,包括被阻塞的waitq中的sudog
}

waitq类型链表,sudog 为封装的goroutine,包含等待 读/写的被阻塞的goroutine信息。

type waitq struct {first *sudoglast  *sudog
}type sudog struct {g *g                    // 阻塞的goroutineisSelect bool            // select中使用chan是特殊处理的,本章暂时不论next     *sudog     prev     *sudogelem     unsafe.Pointer // 数据指针(指向堆内存或栈空间)acquiretime int64releasetime int64ticket      uint32parent      *sudog // semaRoot binary treewaitlink    *sudog // g.waiting list or semaRootwaittail    *sudog // semaRootc           *hchan // channel
}
结构中大约了解下 channal 的实现
  1. channal 有缓存时通过读写索引读取数据,并保存了可容纳元素数量(dataqsize) 及当前元素量(qcount)。recvq和sendq为两个队列,遵循先进先出原则,队列元素sudog为封装后的goroutine,包含goroutine等待的chan信息和数据地址,一般为阻塞状态。互斥锁保证hchan中的数据读写安全。

  2. 对于无缓冲channal,recvq 和 sendq至少有一个为空,且 dataqsiz 和 qcount 都为0。

初始化 channal 过程如下
// use example:
ch := make(chan int, 10)
// code
func makechan(t *chantype, size int) *hchan {elem := t.elem// 检查是否存在size越界问题   if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {panic(plainError("makechan: size out of range"))}var c *hchanswitch {// make(chan int, 0) size=0 but elem.size=8// make(struct{}, 2) size=2 but elem.size=0case size == 0 || elem.size == 0:  //分配hchan大小,无缓冲chan只分配这些,以下是有缓冲chanc = (*hchan)(mallocgc(hchanSize, nil, true))// race检测,不懂c.buf = c.raceaddr()// channal类型不包含指针 case elem.kind&kindNoPointers != 0:// 分配空间大小为(hchan结构大小 单个元素大小*请求分配数量)的堆内存c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))// c.buf指针指向hchan结构后,即存储元素区c.buf = add(unsafe.Pointer(c), hchanSize)// channal类型包含指针default:// 分配一块hchan内存// c.buf 指向 分配固定大小的元素存储空间 的堆内存c = new(hchan)c.buf = mallocgc(uintptr(size)*elem.size, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)return c
}

如上,make channal时内存分配如下图

如上,
无缓冲队列中仅有hchan,即没有空间可用于存储元素,所以如果发生写数据请求,只有 1)阻塞在sendq队列中 2) 对端有阻塞recvq,直接copy到对方goroutine空间
有缓冲chan之所以要分开指针类型缓冲区主要是为了区分gc操作,需要将它设置为flagNoScan。并且指针大小固定,可以跟hchan头部一起分配内存。

然后看下sendq和recvq操作具体如何发生

chan如何工作

写 channal 操作

1. 写入未初始化的chan,程序将永久阻塞。此时go run 报错:<font color=orange> fatal error: all goroutines are asleep - deadlock! </font>,goroutine被强制结束
2. 写入已关闭chan,程序将发生panic。go run报错: <font color=orange> panic: send on closed channel </font>
3. 如果是无缓冲chan,对端有等待goroutine时,复制数据给goroutine
4. 写入有缓冲chan,并且有空间可写时,将消息复制给等待的goroutine
5. 缓冲队列已满, 新建(sudog)G,并加入到sendq阻塞队列
// 编译代码中 c <- x 入口函数
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}
// c为chan, ep为需要写入的变量的地址, block=TRUE, callerpc
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {//CASE 1: chan未初始化时为Nil,send数据将永久阻塞if c == nil {if !block {return false}//gopark函数会使goroutine休眠, 通过unlockf唤醒,但此处unlockf为nil,所以将一直休眠gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}// 未被加锁 未关闭 且 1.(队列长度cap为0(无缓冲channal) 且 外部写chan协程队列为空) 或 2.(cap>0 且 cap=len已满)返回false. 但是chansend1调用时block是true,判断跳过if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {return false}// 关于cpu调度var t0 int64if blockprofilerate > 0 {t0 = cputicks()}//获取chann的锁lock(&c.lock)//CASE 2: send已关闭chan会panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}//CASE 3: c.recvq队列中有等待接收goroutine。调用send函数将ep数据发给sg(recvGoroutine),稍后看send函数if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}//CASE 4: 有缓冲管道,并且有空间可写。将消息复制到等待goroutineif c.qcount < c.dataqsiz {// 定位写chan索引位置。c.sendx为写chan的个数位置,qp定位在 c+sendx*数据类型长度qp := chanbuf(c, c.sendx)//将需要写入的数据ep拷贝到qp指向的地址typedmemmove(c.elemtype, qp, ep)//chan每次只能写入一个元素c.sendx++//sendx为写入chann的索引地址,==dataqsiz时缓冲区写满。带缓冲chan使用环形地址,写入索引sendx变为0if c.sendx == c.dataqsiz {c.sendx = 0}//数据长度++后解锁c.qcount++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// CASE5: 缓冲队列已满,新建(sudog)G,并加入到sendq队列//获取当前goroutinegp := getg()// 创建一个等待队列抽象goroutine(sudog)mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// mySg加入到send队列c.sendq.enqueue(mysg)//然后此协程阻塞,直到获取到c.lockgoparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
}

关于send过程

// send在空chan上执行写操作。ep值被发送者copy到recv队列中的sg中。chan必须为空并且锁定,send通过unlockf函数。注意调用此函数时,对端有等待recvq阻塞,说明缓冲为空或无缓冲
// 为其解锁. sg必须被退出c的队列,ep值必须非空并指向堆或者调用栈。
// use eg: send(c, recvSg, ep, func() { unlock(&c.lock) }, 3)
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {//接收sg的elem在等待队列中时,一般会指向一片内存(栈或堆), 检测到不为空时,直接将数据写入那片内存并将指针置空if sg.elem != nil {// 将数据ep数据复制给sg的存储数据地址sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.g// 解锁chanunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}//gp 置位可运行态goready(gp, skip+1)
}

读channal操作

以下为 “从chan中读数据到ep” 情况:

  • CASE 1: 读未初始化的chan,程序将永久阻塞。此时go run 报错: fatal error: all goroutines are asleep - deadlock! ,goroutine被强制结束
  • CASE 2: 与写chan不同。读已关闭chan,不会发生panic,如果缓冲区无数据,返回true(表示读取操作成功) 和 false(表示未读到数据)
  • CASE 3: 对端有阻塞等待写goroutine 且 无缓冲区,直接将数据写入对端goroutine
  • CASE 4: 对端有阻塞等待写goroutine 且 是有缓冲chan时,此时缓冲队列已满,又因为readx和recvx索引为环形队列,所以此时两值相等。 从缓冲区recvx处取出数据复制给ep,再将写goroutine的数据写入缓冲区,过程中队列依旧满。
  • CASE 5:对端无阻塞等待写goroutine,且缓冲有数据时,从recvx处复制数据给ep。
    (recv时,ep可为nil,此时相当于清理chan缓冲中一个元素)
// 编译代码 <- c 入口函数
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {//CASE 1:读未初始化的chan,永久阻塞if c == nil {if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 {return}lock(&c.lock)//CASE 2:已关闭且数据长度为0.解锁,清理ep的数据指向内存,返回true(chan正常), false(未取到数据)if c.closed != 0 && c.qcount == 0 {unlock(&c.lock)if ep != nil {// 清理内存typedmemclr(c.elemtype, ep)}return true, false}//CASE 3: sendq队列中有阻塞gorotine时,recv数据。分为有缓冲和无缓冲情况,在recv函数中if sg := c.sendq.dequeue(); sg != nil {recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}//CASE 4: 无sendSg但chan缓冲中有数据,取数据返回true trueif c.qcount > 0 {// Receive directly from queue 找到读索引位置qp := chanbuf(c, c.recvx)//ep不为空时读走数据if ep != nil {typedmemmove(c.elemtype, ep, qp)}//ep清理读索引指向的那一个数据(即不管左端是否有ep接受数据,这个数据都没有了)typedmemclr(c.elemtype, qp)//读索引++c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}//队列中数据量-1c.qcount--unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}//CASE 4: 缓冲无数据,创建sudog加入到recvqueue中阻塞等待chan有新数据gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)releaseSudog(mysg)return true, !closed
}

recv 过程

// 此函数用于从sg(写chan的goroutine)读取数据到ep上,注意调用此函数时,说明缓冲区已满,所以对端有goroutine阻塞(sendx=datasiz  recvx=0) 或者无缓冲区
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {// CASE 1: 无缓冲chanif c.dataqsiz == 0 {//直接从sg数据区 copy数据到epif ep != nil {recvDirect(c.elemtype, sg, ep)}//CASE 2: 有缓冲chan} else {//找到读数据的索引位置qp := chanbuf(c, c.recvx)// 从chan缓冲区读索引位置读c.elemtype类型大小的内存到ep中if ep != nil {typedmemmove(c.elemtype, ep, qp)}// 然后从sendSg中取数据追加给chan缓冲区,recv++typedmemmove(c.elemtype, qp, sg.elem)// chan写数据索引++.索引地址达到最大时归零c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}// recvx++后,sendx=recvx 将内存区当做环形来用,相当于sendx++ c.sendx = c.recvx  // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}

close channal过程


func closechan(c *hchan) {//关闭未初始化chan,panicif c == nil {panic(plainError("close of nil channel"))}//close已经关闭chan, paniclock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}c.closed = 1var glist *g// 释放所有的阻塞的读goroutinefor {//获取读数据的队列,清理接收队列中的数据空间等sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {//清除sg.elem指向的内存为type的类型,然后指向空typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilgp.schedlink.set(glist)glist = gp}// 释放所有的阻塞的写goroutine(将panic) for {sg := c.sendq.dequeue()if sg == nil {break}// 未对阻塞写的空间进行清理sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilgp.schedlink.set(glist)glist = gp}unlock(&c.lock)// 依次唤醒所有阻塞的goroutinefor glist != nil {gp := glistglist = glist.schedlink.ptr()gp.schedlink = 0goready(gp, 3)}
}

遗留问题:
为什么指针类型的chan make时,hchan要和缓冲区分开分配。而非指针类型是连续的一块存储?? 好像跟gc有关系
为什么清理读goroutine 未清理写goroutine?? 一般读goroutine的数据区是等待写入数据或为nil的,一般无数据可释放。 写goroutine一般是已经初始化的数据

golang runtime源码阅读 channal实现相关推荐

  1. golangsha1解码_如何阅读Golang的源码?

    Go 的源码在安装包的 src/ 目录下.怎么看它的源码呢?直接看吧!没人教的情况下,只能自己撸了.当然,这种内容一般也不会有人教. 怎么撸? Go 源码中,应该可分为与语言息息相关的部分,和官方提供 ...

  2. Golang流媒体实战之五:lal推流服务源码阅读

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <Golang流媒体实战>系列的链接 体验 ...

  3. Golang流媒体实战之六:lal拉流服务源码阅读

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <Golang流媒体实战>系列的链接 体验 ...

  4. gh-ost大表DDL工具源码阅读

    gh-ost大表DDL工具源码阅读 最终目的 开发环境与测试数据库准备 一个简单的ddl案例 debug分析程序执行过程 vscode debug配置 变量介绍 核心处理逻辑 分析我的需求 最终目的 ...

  5. syzkaller 源码阅读笔记1(syz-extract syz-sysgen)

    文章目录 1. syz-extract 1-0 总结 1-1. `main()` 1-2 `archList()` - `1-1 (3)` 获取架构 name list 1-3 `createArch ...

  6. go 中 select 源码阅读

    Python微信订餐小程序课程视频 https://blog.csdn.net/m0_56069948/article/details/122285951 Python实战量化交易理财系统 https ...

  7. Go调度器系列(4)源码阅读与探索

    各位朋友,这次想跟大家分享一下Go调度器源码阅读相关的知识和经验,网络上已经有很多剖析源码的好文章,所以这篇文章不是又一篇源码剖析文章,注重的不是源码分析分享,而是带给大家一些学习经验,希望大家能更好 ...

  8. 源码阅读:AFNetworking(十六)——UIWebView+AFNetworking

    该文章阅读的AFNetworking的版本为3.2.0. 这个分类提供了对请求周期进行控制的方法,包括进度监控.成功和失败的回调. 1.接口文件 1.1.属性 /**网络会话管理者对象*/ @prop ...

  9. 转-OpenJDK源码阅读导航跟编译

    OpenJDK源码阅读导航 OpenJDK源码阅读导航 博客分类: Virtual Machine HotSpot VM Java OpenJDK openjdk 这是链接帖.主体内容都在各链接中.  ...

  10. android tcp socket框架_最流行的 Web 框架 Gin 源码阅读

    最近公司大部分项目开始往golang换, api的框架选定使用gin, 于是将 gin的源码看了一遍, 会用几篇文章将gin的流程及流程做一个梳理, 下面进入正题. gin框架预览 上图大概是 gin ...

最新文章

  1. Jupyter Notebook 添加目录
  2. hdu 4417 Super Mario 树状数组||主席树
  3. Go http client 连接池不复用的问题
  4. [ASP.NET MVC2 系列] ASP.NET MVC 之如何创建自定义路由约束
  5. 函数的命名空间以及作用域
  6. python安装不了jupyter_python学习笔记——Windowns下Python3之安装jupyter
  7. 学习使用 Go 的反射
  8. 深度学习 占用gpu内存 使用率为0_2020年深度学习最佳GPU一览,看看哪一款最适合你!...
  9. 百度地图和openlayers融合封装(想法)
  10. 谷歌浏览器如何长截屏
  11. MAC苹果电脑装单win10系统
  12. Ubuntu查找文件
  13. Linux手动安装JDK并配置多个版本JDK--JDK配置和Jenv的配置使用
  14. matlab将图片旋转的代码_论文写作经验分享word+mathtype+matlab
  15. 没有时间进行测试? —有关在Python中对AWS Lambda进行单元测试的12条建议
  16. mail = imaplib.IMAP4_SSL('k20gslf-0kF')
  17. Python3 pip安装-Star.hou
  18. 分支限界算法 之 A*算法(启发式搜索算法)---九宫重排游戏(也称八数码问题)
  19. 集线器、网桥、交换机、路由器
  20. PLC数据采集的方法小结及成本比较

热门文章

  1. web测试----死链检查(Xenu)
  2. 卓有成效的管理者(笔记)——要事优先
  3. Angr源码分析——DDG的生成
  4. 数据库在软件开发中的作用是什么?
  5. Acwing-873. 欧拉函数
  6. android 11.0禁用电源键(屏蔽关机短按长按事件)
  7. 计算机主板清理,电脑主板脏了如何清洗电脑主板才是正确
  8. Using ‘UTF-8‘ encoding to copy filtered resources. skip non existing resourceDirectory
  9. 可靠传输协议 rdt 1.0、rdt 2.0、rdt 2.1、rdt 2.2、rdt3.0
  10. WPF 精修篇 滑条