本文会尝试解释 go runtime 中 channel 和 select 的具体实现,部分内容来自 gophercon2017。Go版本为1.8.3

channel

第一部分讲述一下 channel 的用法。channel 可以看做一个队列,用于多个goroutine之间的通信,例如下面的例子,一个goroutine发送msg,另一个msg接受消息。channel 分为带缓冲和不带缓冲,差别不是很大,具体请自行google。看一个简单的例子,了解一下channel的使用。

package mainimport "fmt"func main() {// Create a new channel with `make(chan val-type)`.// Channels are typed by the values they convey.messages := make(chan string)// Send a value into a channel using the `channel <-`// syntax. Here we send `"ping"`  to the `messages`// channel we made above, from a new goroutine.go func() { messages <- "ping" }()// The `<-channel` syntax receives a value from the// channel. Here we'll receive the `"ping"` message// we sent above and print it out.msg := <-messagesfmt.Println(msg)
}

channel的功能点:

  1. 队列
  2. 阻塞
  3. 当一端阻塞,可以被另一个端唤醒

我们围绕这3点功能展开,讲讲具体的实现。

channel结构

注释标注了几个重要的变量,从功能上大致可以分为两个功能单元,一个是 ring buffer,用于存数据; 一个是存放 goroutine 的队列。

type hchan struct {qcount   uint           // 当前队列中的元素个数dataqsiz uint           // 缓冲队列的固定大小buf      unsafe.Pointer // 缓冲数组elemsize uint16closed   uint32elemtype *_type // element typesendx    uint   // 下一次发送的 indexrecvx    uint   // 下一次接收的 indexrecvq    waitq  // 接受者队列sendq    waitq  // 发送者队列// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex
}

Ring Buffer

主要是以下变量组成的功能, 一个 buf 存储实际数据,两个指针分别代表发送,接收的索引位置,配合 size, count 在数组大小范围内来回滑动。

qcount   uint           // 当前队列中的元素个数
dataqsiz uint           // 缓冲队列的固定大小
buf      unsafe.Pointer // 缓冲数组
sendx    uint   // 下一次发送的 index
recvx    uint   // 下一次接收的 index

举个例子,假设我们初始化了一个带缓冲的channel, ch := make(chan int, 3), 那么它初始状态的值为:

qcount   = 0
dataqsiz = 3
buf      = [3]int{0, 0, 0} // 表示长度为3的数组
sendx    = 0
recvx    = 0

第一步,向 channel 里 send 一个值, ch <- 1, 因为现在缓冲还没满,所以操作后状态如下:

qcount   = 1
dataqsiz = 3
buf      = [3]int{1, 0, 0} // 表示长度为3的数组
sendx    = 1
recvx    = 0

快进两部,连续向 channel 里 send 两个值 (2, 3),状态如下:

qcount   = 3
dataqsiz = 3
buf      = [3]int{1, 2, 3} // 表示长度为3的数组
sendx    = 0 // 下一个发送的 index 回到了0
recvx    = 0

从 channel 中 receive 一个值, <- ch, 状态如下:

qcount   = 2
dataqsiz = 3
buf      = [3]int{1, 2, 3} // 表示长度为3的数组
sendx    = 0 // 下一个发送的 index 回到了0
recvx    = 1 // 下一个接收的 index

阻塞

我们看下,如果 receive channel 时,channel 的 buffer中没有数据是怎么处理的。逻辑在 chanrecv 这个方法中,它的大致流程如下,仅保留了阻塞操作的代码。

func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 检查 channdel 是否为 nil// 当不阻塞时,检查buffer大小,当前大小,检查chennel是否关闭,看看是否能直接返回// 检查发送端是否有等待的goroutine,下部分会提到// 当前buffer中有数据,则尝试取出。// 如果非阻塞,直接返回// 没有sender等待,buffer中没有数据,则阻塞等待。gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.selectdone = nilmysg.c = cgp.param = nilc.recvq.enqueue(mysg)//关键操作:设置 goroutine 状态为 waiting, 把 G 和 M 分离goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)// someone woke us up// 被唤醒,清理 sudogif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}

这里的操作就是 创建一个 当前 goroutine 的 sudog, 然后把这个 sudog 放入 channel 的接受者等待队列;设置当前 G 的状态,和 M分离,到这里当前G就阻塞了,代码不会执行下去。
当被唤醒后,执行sudog的清理操作。这里接受buffer中的值的指针是 ep 这个变量,被唤醒后好像没有向 ep 中赋值的操作。这个我们下部分会讲。

sudog

还剩最后一个疑问,当一个goroutine因为channel阻塞,另一个goroutine是如何唤醒它的。

channel 中有两个 waitq 类型的变量, 看下结构发现,就是sudog的链表,关键是 sudog。sudog中包含了goroutine的引用,注意一下 elem这个变量,注释说可能会指向stack。

type waitq struct {first *sudoglast  *sudog
}type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on// this.g          *gselectdone *uint32 // CAS to 1 to win select race (may point to stack)next       *sudogprev       *sudogelem       unsafe.Pointer // data element (may point to stack)// The following fields are never accessed concurrently.// waitlink is only accessed by g.acquiretime int64releasetime int64ticket      uint32waitlink    *sudog // g.waiting listc           *hchan // channel
}

讲阻塞部分的时候,我们看到goroutine被调度之前,有一个 enqueue操作,这时,当前G的sudog已经被存入recvq中,我们看下发送者这时的操作。

这里的操作是,sender发送的值 直接被拷贝到 sudog.elem 了。然后唤醒 sudog.g ,这样对面的receiver goroutine 就被唤醒了。具体请下面的注释。

func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 检查工作// 如果能从 chennel 的 recvq 弹出 sudog, 那么直接sendif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) })return true}// buffer有空余空间,返回; 阻塞操作
}func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {// 处理 index// 关键if sg.elem != nil {// 这里是根据 elemtype.size 复制内存sendDirect(c.elemtype, sg, ep)sg.elem = nil}// 一些处理// 重新设置 goroutine 的状态,唤醒它goready(gp, 4)
}func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {// src is on our stack, dst is a slot on another stack.// Once we read sg.elem out of sg, it will no longer// be updated if the destination's stack gets copied (shrunk).// So make sure that no preemption points can happen between read & use.dst := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)memmove(dst, src, t.size)
}// memmove copies n bytes from "from" to "to".
// in memmove_*.s
//go:noescape
func memmove(to, from unsafe.Pointer, n uintptr)

select

在看 chanrecv()方法 时,发现了一个 block 参数,代表操作是否阻塞。一般情况下,channel 都是阻塞的(不考虑buffer),那什么时候非阻塞呢?

第一个想到的就是 select, 在写了default case的时候,其他的channel是非阻塞的。

还有一个可能不常用,就是 channel 的反射 value, 可以是非阻塞的,这个方法是public的,我们先看下简单的。

func (v Value) TryRecv() (x Value, ok bool)
func (v Value) TrySend(x Value) bool

select 就复杂一点点,首先在源码中发现一段注释:

// compiler implements
//
//    select {
//    case c <- v:
//        ... foo
//    default:
//        ... bar
//    }
//
// as
//
//    if selectnbsend(c, v) {
//        ... foo
//    } else {
//        ... bar
//    }
//
func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
}// compiler implements
//
//    select {
//    case v = <-c:
//        ... foo
//    default:
//        ... bar
//    }
//
// as
//
//    if selectnbrecv(&v, c) {
//        ... foo
//    } else {
//        ... bar
//    }
//
func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {selected, _ = chanrecv(t, c, elem, false)return
}

如果是一个 case + default 的模式,那么编译器就调用以上方法来实现。

如果是多个 case + default 的模式呢?select 在runtime到底是如何执行的?写个简单的select编译一下。

package mainfunc main() {var ch chan intselect {case <-ch:case ch <- 1:default:}
}

go tool compile -S -l -N test.go > test.s 结果中找一下关键字,例如:

0x008c 00140 (test.go:5)    CALL    runtime.newselect(SB)
0x00ad 00173 (test.go:6)    CALL    runtime.selectrecv(SB)
0x00ec 00236 (test.go:7)    CALL    runtime.selectsend(SB)
0x0107 00263 (test.go:8)    CALL    runtime.selectdefault(SB)
0x0122 00290 (test.go:5)    CALL    runtime.selectgo(SB)

这里 selectgo 是实际运行的方法,找一下,注意注释。先检查channel是否能操作,如果不能操作,就走 default 逻辑。

loop:// pass 1 - look for something already waitingvar dfl *scasevar cas *scasefor i := 0; i < int(sel.ncase); i++ {cas = &scases[pollorder[i]]c = cas.cswitch cas.kind {// 接受数据case caseRecv:sg = c.sendq.dequeue()// 如果有 sender 在等待if sg != nil {goto recv}// 当前buffer中有数据if c.qcount > 0 {goto bufrecv}// 关闭的channelif c.closed != 0 {goto rclose}case caseSend:if raceenabled {racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)}// 关闭if c.closed != 0 {goto sclose}// 有 receiver 正在等待sg = c.recvq.dequeue()if sg != nil {goto send}// 有空间接受if c.qcount < c.dataqsiz {goto bufsend}// 走defaultcase caseDefault:dfl = cas}}if dfl != nil {selunlock(scases, lockorder)cas = dflgoto retc}

Go语言channel与select原理相关推荐

  1. Go语言 channel

    介绍 Channel是Go中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication). 要想理解 channel 要先知道 CSP 模型. ...

  2. python epoll 并发_Python语言之python并发原理(阻塞、非阻塞、epoll)

    本文主要向大家介绍了Python语言之python并发原理(阻塞.非阻塞.epoll),通过具体的内容向大家展示,希望对大家学习Python语言有所帮助. 在Linux系统中 01 阻塞服务端 特征: ...

  3. channel 的底层原理

    三.channel 的底层原理 前面提及过 channel 创建后返回了 hchan 结构体,现在我们来研究下这个结构体,它的主要字段如下: type hchan struct {qcount uin ...

  4. 深入php内核,从底层c语言剖析php实现原理

    深入php内核,从底层c语言剖析php实现原理 非常好的电子书:http://www.cunmou.com/phpbook/preface.md 这是它的目录: PHP的生命周期 让我们从SAPI开始 ...

  5. 9、Go语言channel的操作

    "学习或者了解人工智能小伙伴福利来了,前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家." 想了解或者学习人工智能可以看一下,我觉得学习起来挺有趣的 ...

  6. 二级C语言上机考试评分标准,浅谈二级C语言上机考试评分原理

    浅谈二级C语言上机考试评分原理 C语言的表现能力和处理能力极强.它不仅具有丰富的运算符和数据类型,便于实现各类复杂的数据结构.下面是小编整理的关于二级C语言上机考试评分原理,希望大家认真阅读! 上机考 ...

  7. linux用c语言模拟抢票系统,C语言-抢火车票软件原理及笔记

    原标题:C语言-抢火车票软件原理及笔记 //今晚内容:C语言-抢火车票软件原理 //讲课老师:范志军 QQ:208824435 #include #include int a=50;//初始化50张火 ...

  8. 语言的定义——编译原理

    语言的定义--编译原理 给定文法G=(VT,VN,P,S),如果α→β∈P,那么可以将符号串中的γαδ中的α替换为β,记作 γαδ⇒γβδ,此时称γαδ直接推导出γβδ. 推导(derivation) ...

  9. Go语言核心知识点和原理详解

    go核心原理 本人在一家go技术栈工作2年有余,因此梳理一下我认为比较重要的go语言技术知识,一些基础的概念,比如function, interface这些就忽略了. https://dravenes ...

最新文章

  1. hdu4287 水题
  2. ContentType的集中数据编码格式
  3. PyCharm-缩进 格式化代码
  4. 块级,行内(内联)对比笔记
  5. 【嵌入式Linux】嵌入式Linux应用开发基础知识之I2C应用编程和SMBus协议及AP3216C应用编程
  6. C++读写文件总结 .
  7. strchr,wcschr 及strrchr, wcsrchr,_tcschr,_tcsrchr函数
  8. Android模拟器中添加SD卡(转)
  9. 打算开源一个低代码平台,第三天,包含【工作流,业务流,财务,APQC】。技术站 React,typescript,java,mysql
  10. matlab fft时域采样,信号时域采样 谱分析(matlab).doc
  11. 从0开始,如何设计一个社交电商产品
  12. python 源代码 macd双底 高 低_通达信双底选股公式-MACD底背离通达信选股公式
  13. 中国微单相机市场深度研究分析报告
  14. noip2016普及组初赛中山市成绩表及分数线
  15. 【STM32学习 自制STM32游戏机】
  16. parrot linux iso下载,Parrot Security OS 4.0发布下载,面向安全的操作系统
  17. FFmpeg 安装与使用
  18. QGIS 1. qgis的下载和安装(Windows和macOS)
  19. Eboot代码流程 [转]
  20. 阿里云轻量服务器 利用宝塔面板 为域名部署SSL证书

热门文章

  1. mysql创建独立表空间_InnoDB独立表空间
  2. Eclipse如何卸载插件
  3. 使用 CXF 做 webservice 简单例子
  4. sh.k7p.work/index.php,Laowang's Blogs
  5. 《Java8实战》笔记(09):默认方法
  6. 判断非负整数是否是3的倍数_五年级数学因数与倍数知识点汇总与解题方法技巧...
  7. 如何用texstudio下载ctex_公众号素材库视频如何下载,用这种方法就可以哦
  8. clion在使用sqlite3的时候,显示Undefined symbols for architecture x86_64错误的解决办法
  9. codeforces 266B-C语言解题报告
  10. 云计算技术背后的天才程序员:Open VSwitch鼻祖Martin Casado