前言

select是Golang在语言层面提供的多路IO复用的机制。与switch语句稍微有点相似,也会有case和最后的default选择支。每一个case代表一个通信操作(在某个channel上进行发送或者接收)并且会包含一些语句组成一个语句块。

基本用法

select会等待case中能够执行的case时才去执行。当满足条件时。select才回去通信并执行case之后的语句,这时候其他通信是不执行的。如果有多个case同时就绪,select会随机选择一个执行。一个没有任何case的select语句,会永远等待下去。

非阻塞select

select {case <-ch:default:
}

阻塞select

select {case <-ch:
}

注意:对于读channel的case来说,如elem, ok := <-chan1:, 如果channel有可能被其他协程关闭的情况下,一定要检测读取是否成功,因为close的channel也有可能返回,此时ok == false,且不会阻塞

源码分析

注意:我会把源码中每个方法的作用都注释出来,可以参考注释进行理解。

数据结构

Golang实现select时,定义了一个数据结构表示每个case语句(含defaut,default实际上是一种特殊的case),select执行过程可以类比成一个函数,函数输入case数组,输出选中的case,然后程序流程转到选中的case块。
我们先看一下case数据结构

runtime\select.go

type scase struct {c           *hchan         // 当前case语句所操作的channel指针elem        unsafe.Pointer // data element数据元素kind        uint16 //表示该case的类型pc          uintptr // race pc (for race detector / msan)releasetime int64
}
//scase.kind values.
const (caseNil = iotacaseRecvcaseSendcaseDefault
)

scase.kind表示该case的类型,分别为:

  • caseRecv:case语句中尝试读取scase.c中的数据,case <-Chan;
  • caseSend:case语句中尝试向scase.c中写入数据,case Chan <- Send;
  • caseDefault:default

select实现

// cas0为scase数组的首地址,selectgo()就是从这些scase中找出一个返回
// order0指向一个[2 * ncases] uint16类型的数组
// ncases表示scase数组的长度//返回值:
//int :选中case的编号
//bool:是否成功从channle中读取了数据
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {if debugSelect {print("select: cas0=", cas0, "\n")}cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))//将cas1从第一个元素开始切片,长度为ncases,容量为ncases//a[x:y:z] 切片长度: y-x 切片容量:z-xscases := cas1[:ncases:ncases]//所有case轮询顺序,占了前面ncasepollorder := order1[:ncases:ncases]//所有case语句中channel序列,占了后面ncaselockorder := order1[ncases:][:ncases:ncases]// 将涉及零个通道的发送/接收案例替换为caseNil,因此以下逻辑可以假定为非nil通道。for i := range scases {cas := &scases[i]if cas.c == nil && cas.kind != caseDefault {*cas = scase{}}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()for i := 0; i < ncases; i++ {scases[i].releasetime = -1}}// 生成排列顺序,pollorder重新排序for i := 1; i < ncases; i++ {j := fastrandn(uint32(i + 1))pollorder[i] = pollorder[j]pollorder[j] = uint16(i)}//通过Hchan地址进行排序以获得锁定顺序//采用简单的堆排序,以确保n log n个时间和恒定的堆栈占用量for i := 0; i < ncases; i++ {j := i// 替换相同channel 的case,以达到去重防止对channel加锁时重复加锁的目的c := scases[pollorder[i]].cfor j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {k := (j - 1) / 2lockorder[j] = lockorder[k]j = k}lockorder[j] = pollorder[i]}for i := ncases - 1; i >= 0; i-- {o := lockorder[i]c := scases[o].clockorder[i] = lockorder[0]j := 0for {k := j*2 + 1if k >= i {break}if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {k++}if c.sortkey() < scases[lockorder[k]].c.sortkey() {lockorder[j] = lockorder[k]j = kcontinue}break}lockorder[j] = o}if debugSelect {for i := 0; i+1 < ncases; i++ {if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")throw("select: broken sort")}}}//锁住所有的channelsellock(scases, lockorder)var (gp     *gsg     *sudogc      *hchank      *scasesglist *sudogsgnext *sudogqp     unsafe.Pointernextp  **sudog)loop:// pass 1 - look for something already waiting// 按照随机顺序检测scase中的channel是否readyvar dfli intvar dfl *scasevar casi intvar cas *scasevar recvOK bool//开始遍历case数组了for i := 0; i < ncases; i++ {casi = int(pollorder[i])cas = &scases[casi]c = cas.cswitch cas.kind {case caseNil:continue// 接收chancase caseRecv:sg = c.sendq.dequeue()// 当chan的等待写队列不为空,需要等待if sg != nil {goto recv}//当chan的缓存队列存在元素时,不需要等待if c.qcount > 0 {goto bufrecv}// 当chan关闭时if c.closed != 0 {goto rclose}//发送chancase caseSend:if raceenabled {racereadpc(c.raceaddr(), cas.pc, chansendpc)}// 当chan关闭时if c.closed != 0 {goto sclose}//当chan的等待读消息的队列不为空,需要等待sg = c.recvq.dequeue()if sg != nil {goto send}// chan的缓存队列的元素少于缓存容量时,还有位置,不需要等待if c.qcount < c.dataqsiz {goto bufsend}case caseDefault:dfli = casidfl = cas}}if dfl != nil {selunlock(scases, lockorder)casi = dflicas = dflgoto retc}// pass 2 - enqueue on all chans//所有case都未ready,且没有default语句//将当前协程加入到所有channel的等待队列gp = getg()if gp.waiting != nil {throw("gp.waiting != nil")}nextp = &gp.waitingfor _, casei := range lockorder {casi = int(casei)cas = &scases[casi]if cas.kind == caseNil {continue}c = cas.csg := acquireSudog()sg.g = gpsg.isSelect = true// No stack splits between assigning elem and enqueuing// sg on gp.waiting where copystack can find it.sg.elem = cas.elemsg.releasetime = 0if t0 != 0 {sg.releasetime = -1}sg.c = c// Construct waiting list in lock order.*nextp = sgnextp = &sg.waitlinkswitch cas.kind {case caseRecv:// 加入等待接收队列c.recvq.enqueue(sg)case caseSend:// 加入等待发送队列c.sendq.enqueue(sg)}}// wait for someone to wake us up//当将协程转入阻塞,等待被唤醒gp.param = nilgopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)sellock(scases, lockorder)gp.selectDone = 0sg = (*sudog)(gp.param)gp.param = nil// pass 3 - dequeue from unsuccessful chans// otherwise they stack up on quiet channels// record the successful case, if any.// We singly-linked up the SudoGs in lock order.//唤醒后返回channel对应的case indexcasi = -1cas = nilsglist = gp.waiting// Clear all elem before unlinking from gp.waiting.for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {sg1.isSelect = falsesg1.elem = nilsg1.c = nil}gp.waiting = nilfor _, casei := range lockorder {k = &scases[casei]if k.kind == caseNil {continue}if sglist.releasetime > 0 {k.releasetime = sglist.releasetime}if sg == sglist {// sg has already been dequeued by the G that woke us up.casi = int(casei)cas = k} else {c = k.cif k.kind == caseSend {c.sendq.dequeueSudoG(sglist)} else {c.recvq.dequeueSudoG(sglist)}}sgnext = sglist.waitlinksglist.waitlink = nil//释放所有的锁releaseSudog(sglist)sglist = sgnext}//没找到case,重新循环if cas == nil {goto loop}c = cas.cif debugSelect {print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")}if cas.kind == caseRecv {recvOK = true}if raceenabled {if cas.kind == caseRecv && cas.elem != nil {raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)} else if cas.kind == caseSend {raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)}}if msanenabled {if cas.kind == caseRecv && cas.elem != nil {msanwrite(cas.elem, c.elemtype.size)} else if cas.kind == caseSend {msanread(cas.elem, c.elemtype.size)}}selunlock(scases, lockorder)goto retcbufrecv:// 可以从缓冲区接收if raceenabled {if cas.elem != nil {raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)}raceacquire(chanbuf(c, c.recvx))racerelease(chanbuf(c, c.recvx))}if msanenabled && cas.elem != nil {msanwrite(cas.elem, c.elemtype.size)}recvOK = trueqp = chanbuf(c, c.recvx)if cas.elem != nil {// 将chan缓存中的数据拷贝到 case.elem。  eg: a := <-ch, a就是case.elemtypedmemmove(c.elemtype, cas.elem, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--selunlock(scases, lockorder)goto retcbufsend://可以发送到缓冲区if raceenabled {raceacquire(chanbuf(c, c.sendx))racerelease(chanbuf(c, c.sendx))raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)}if msanenabled {msanread(cas.elem, c.elemtype.size)}// 将cas.elem拷贝到chan的缓存中。eg: ch <- a, a 就是 cas.elemtypedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++selunlock(scases, lockorder)goto retcrecv://可以从休眠的发件人(sg)接收recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)if debugSelect {print("syncrecv: cas0=", cas0, " c=", c, "\n")}recvOK = truegoto retcrclose://在封闭channel的末尾读取selunlock(scases, lockorder)recvOK = falseif cas.elem != nil {typedmemclr(c.elemtype, cas.elem)}if raceenabled {raceacquire(c.raceaddr())}goto retcsend://可以发送到休眠的接收器(sg)if raceenabled {raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)}if msanenabled {msanread(cas.elem, c.elemtype.size)}send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)if debugSelect {print("syncsend: cas0=", cas0, " c=", c, "\n")}goto retcretc:if cas.releasetime > 0 {blockevent(cas.releasetime-t0, 1)}return casi, recvOKsclose://在关闭的channel上发送selunlock(scases, lockorder)panic(plainError("send on closed channel"))
}

这个函数看起来很复杂,我们总结一下上面的过程:

  1. 初始化pollorder切片和lockorder切片,其中pollorder是存放所有case的,lockorder是存放所有channel
  2. 对pollorder进行重排序,打乱所有case的顺序
  3. 锁定scase语句中所有的channel
  4. 按照随机顺序检测pollorder中case对应的channel是否ready:
    4.1 如果case可读,则读取channel中数据,解锁所有的channel,然后返回(case index, true)
    4.2 如果case可写,则将数据写入channel,解锁所有的channel,然后返回(case index, false)
    4.3 所有case都未ready,则解锁所有的channel,然后返回(default index, false)
  5. 所有case都未ready,且没有default语句
    5.1 将当前协程加入到所有channel的等待队列
    5.2 当将协程转入阻塞,等待被唤醒
  6. 唤醒后返回channel对应的case index
    6.1 如果是读操作,解锁所有的channel,然后返回(case index, true)
    6.2 如果是写操作,解锁所有的channel,然后返回(case index, false)

其中,很多涉及到channel代码部分可参考之前的文章: Golang中channel的实现原理

总结

  • select语句中除default外,各case执行顺序是随机的
  • select语句中如果没有default语句,则会阻塞等待任意一个case
  • select语句中读操作要判断是否成功读取,关闭的channel也可以读取
  • select语句中除default外,每个case只能操作一个channel,要么读要么写

Golang中select的实现原理相关推荐

  1. Golang 中的 Goroutine 调度原理与 Chanel 通信

    简介   在 Go 中,每一个并发的活动称为一个 Goroutine 或者 协程.当一个程序启动时,只有一个 Goroutine 来调用 main 函数,称之为 主Goroutine.新的 Gorou ...

  2. linux中select和epoll原理,Linux下selectpollepoll的实现原理(一)

    最近简单看了一把 linux-3.10.25 kernel中select/poll/epoll这个几个IO事件检测API的实现.此处做一些记录. 其基本的原理是相同的,流程如下 先依次调用fd对应的s ...

  3. golang goroutine实现_golang中的Mutex设计原理详解(一)

    Mutex系列是根据我对晁岳攀老师的<Go 并发编程实战课>的吸收和理解整理而成,如有偏差,欢迎指正~ 目标 本系列除了希望彻底学习和了解 golang 中 sync.Mutex 的原理和 ...

  4. Golang中panic与recover的实现原理

    今天我们讲讲golang中panic异常,以及recover对异常的捕获,由于panic.recover.defer之间非常亲密,所以今天就放在一起讲解,这里会涉及到一些defer的知识,有兴趣可以看 ...

  5. golang中的信号量的实现原理

    概述 我们前面讲过 操作系统的信号量,以及 golang中的Mutex原理解析,就抛出了一个问题,操作系统的信号量的管理对象是线程,而 Mutex 中使用的信号量是针对协程的,那么这就意味着golan ...

  6. mysql pmt函数怎么用_在Golang中如何正确地使用database/sql包访问数据库

    本文记录了我在实际工作中关于数据库操作上一些小经验,也是新手入门golang时我认为一定会碰到问题,没有什么高大上的东西,所以希望能抛砖引玉,也算是对这个问题的一次总结.其实我也是一个新手,机缘巧合几 ...

  7. go中select语句

    在golang语言中,select语句 就是用来监听和channel有关的IO操作,当IO操作发生时,触发相应的case动作.有了 select语句,可以实现 main主线程 与 goroutine线 ...

  8. go每日新闻(2021-02-05)——Golang 中 nil==nil 是对是错?

    每日一谚: Go对OO提供了另类的支持:有方法(method)无类(class),有接口(interface)但无类型体系,代码可重用,但不通过继承的方式. go中文网每日资讯–2021-02-05 ...

  9. DockOne微信分享(一一二):Flannel中vxlan backend的原理和实现

    本文讲的是DockOne微信分享(一一二):Flannel中vxlan backend的原理和实现[编者的话]Overlay网络是kubernetes网络模型的重要解决方案之一,而Flannel作为焦 ...

最新文章

  1. 爬虫之xpath语法-常用节点选择语法
  2. 保存和加载pytorch模型
  3. Android基础(三) UI开发 Part 1
  4. 【BZOJ1185】【HNOI2007】最小矩形覆盖(凸包+旋转卡壳)
  5. OPENSSL_1.0.2' not found
  6. java垃圾回收机制算法分析
  7. 使用mintty(_如何使用Mintty改善Cygwin控制台
  8. 人工智能生态环境预测_2020年全球人工智能芯片发展趋势及市场规模预测
  9. python制作图_Python做图的方法
  10. 定义下一代存储,打造全新一代数据基础设施
  11. stm32f103r8t6的晶振频率_STM32F103R8T6[1]
  12. [原]CentOS 6.5 上安装 MySQL 5.6
  13. 一起谈.NET技术,ASP.NET MVC 通过 FileResult 向浏览器发送文件
  14. n1 linux wifi,【教程】N1在EMMC安装LINUX和HASSIO实现智能家居中枢
  15. 40家央企数字化转型路线图公布(2022最新版)
  16. android内存脚本教程,安卓内存
  17. Docker安装Tomcat镜像并部署web项目简述
  18. Android 超高仿微信图片选择器
  19. 硬件相关技术资料分享
  20. ant-design实现主题暗黑主题 和 亮色主题的 切换(实现网站黑白皮肤)

热门文章

  1. pytorch入门与实践学习笔记:chapter9 pytorch 实现CharRNN
  2. 家中搭远程ftp服务器,快速搭建 FTP 服务器
  3. 盖雅案例入选「首届人力资源服务国际贸易交流合作大会20项创新经验」
  4. numpy手写mlp
  5. 记录一次uefi代码修复的过程 0xc0000000e/f
  6. win10 启动出错0xc000000e
  7. Quora使用虚拟信用卡投放广告教程
  8. 中兴Axon40 Ultra配置怎么样 中兴Axon40 Ultra值得入手吗
  9. PostgreSQL手册之资源消耗
  10. 【pacing 1】PACER模块、PacedSender 及关键参数