文章目录

  • 1、channel 是什么
    • channel的特点
  • 2、channel 的数据结构
    • hchan
    • 等待队列和发送队列的类型包装 sudog
  • 3、channel 分类
    • 有缓冲channel
    • 无缓冲channel
  • 4、channel 的创建
    • 4.1 分配 hchan
    • 4.2 分配buf
    • 4.3 makeChan 源码
  • 5、发送数据
    • 5.1 对channel加锁
    • 5.2 将数据拷贝到 buf
    • 5.3 对channel 解锁
  • 6、接收数据
    • 6.1 先对channel上锁
    • 6.2 从 buf 中拷出接收的数据
    • 6.3 解锁
  • 7、如果发送时,buf 数据已满?
    • 7.1 将发送 goroutine 入队
    • 7.2 被唤醒时
      • 7.2.1 send 出队
      • 7.2.2 数据拷贝
      • 7.2.3 G1 重新加入调度队列
    • 7.3 发送数据源码分析
  • 8、如果接收时,buf 为空?
    • 8.1 将接收 goroutine 入队
    • 8.2 被唤醒时
      • 8.2.1 出队
      • 8.2.2 直接将数据 拷贝到 G2 中
      • 8.2.3 G2重新加入到调度队列
    • 8.3 接收数据源码分析
  • 9、关闭channel
  • 10、读取一个已经关闭的channel
  • 11、关闭channel 源码分析

1、channel 是什么

  • channel 我们也叫管道,顾名思义,他是用来传输某些东西的。
  • 没错,channel 主要用于 go中 不同gorouting 之间消息的传输

channel的特点

  1. channel 是 先进先出
  2. 并发安全

2、channel 的数据结构

hchan

type hchan struct {qcount   uint           // 队列中得所有数据数dataqsiz uint           // 环形队列的大小buf      unsafe.Pointer // 指向大小为 dataqsiz 的数组  。ring buf 数据结构,循环队列elemsize uint16            // 元素大小closed   uint32          // 是否关闭elemtype *_type      // 元素类型sendx    uint        // 发送索引recvx    uint        // 接受索引recvq    waitq       // recv 等待队列,即(<-chan)sendq    waitq        // send 等待队列,即(ch<-)lock mutex              // lock 保护了 hchan 的所有字段,以及在此 channel 上阻塞的 sudog 的一些字段。当持有此锁时不改变其他 goroutine 的状态(不ready 的 goroutine),因为他会在栈收缩时发送死锁

等待队列和发送队列的类型包装 sudog

type sudog struct {g *g              // 由 sudog 阻塞的通道的hchan.lock 进行保护isSelect bool       // 表示g正在参与 一个select,因此 g.selectDone 必须已 CAS 的方式避免唤醒时候的 data racenext *sudog          // 后指针      prev *sudog         // 前指针elem unsafe.Pointer // 数据元素,(可能指向栈)// 下面的字段永远不会被并发访问,只有 waitlink 会被 g 访问// 对于 semaphores,所有的字段(包括上面的)只会在持有 semaRoot 锁时被访问acquiretime int64releasetime int64ticket      uint32isSelect boolparent   *sudog // semaRoot 二叉树waitlink *sudog // g.waiting 列表 或 semaRootwaittail *sudog // semaRootc        *hchan // channel
}

3、channel 分类

有缓冲channel

  • 创建时指定的缓冲大小大于等1
ch := make(chan int,5)
  • 有缓存的,发的时候不是必须有协程来接收,不会阻塞(接收者和发送者不需要同时就绪)

无缓冲channel

  • 创建channel 时指定的缓冲为0或者不指定
ch := make(chan int)
  • 无缓存,发的时候必须要有协程接收,否则会阻塞(接收者和发送者必须同时就绪)

4、channel 的创建

make(chan interface{},4)

4.1 分配 hchan

  • 找go 的运行时内存分配器来分配内存
  • 在堆中进行分配,所有字段均为零值

4.2 分配buf

  • 根据指定的缓冲大小来分配缓冲,ring buf
  • buf 是一个数组,但是内部处理的时候是根据 ring buf 的逻辑来处理的
  • buf 实际是 一个数组指针,该数组实际上是拼在这个hchan的内存后面,hchan和数组实际上是一段连续的内存
  • 创建无缓冲的channel 时,此处不做处理,不分配

4.3 makeChan 源码

func makechan64(t *chantype, size int64) *hchan {if int64(int(size)) != size {panic(plainError("makechan: size out of range"))}return makechan(t, int(size))
}

申请内存时控制传入的缓冲大小

func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.// 编译器相关的检查if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}// 计算内存大小,元素大小加上缓冲区,看是否溢出mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.// 通过 mallocgc 申请内存,mallocgc 是 go的运行时内存分配器// 无缓冲的channelc = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.// hchanSize chan的大小加上 ring buf 的大小(mem)。连续内存// 元素不包含指针。指针这块是关于gc的处理c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.// 元素包含指针c = new(hchan)c.buf = mallocgc(mem, elem, true)}// 初始化一些字段c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)    lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}

5、发送数据

ch <-1

5.1 对channel加锁

  • 锁住 ring buf ,保证数据安全

5.2 将数据拷贝到 buf

  • sendx 会加一,表示数据已发送

5.3 对channel 解锁

  • 有缓存的channel 是不阻塞的,解锁完成后 G1 就可以继续做其他的事情

6、接收数据

<-chan

6.1 先对channel上锁

6.2 从 buf 中拷出接收的数据

  • 接收数据的时候 recvx 会加一,表示数据已接收
  • sendxrecvx 数量相等时,表明 buf 中是无数据的

6.3 解锁

7、如果发送时,buf 数据已满?

7.1 将发送 goroutine 入队

  • gopark ,将G1从调度队列取出,进入阻塞状态,直到buf 有空位再唤醒

7.2 被唤醒时

7.2.1 send 出队

7.2.2 数据拷贝

  • 此处 6 直接被拷贝到了队首
  • 其他元素的位置是不用调整的,因为这是一个ring buf的处理逻辑

7.2.3 G1 重新加入调度队列

7.3 发送数据源码分析

func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}

本质还是调用了 chansend

// callerpc  运行时调试用的,不需太关注
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// fast pathif c == nil {if !block {return false}// 向一个已经关闭的channel 发送数据,会死锁gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, funcPC(chansend))}if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 持有锁lock(&c.lock)// 获取锁之后需要再检查是否被 closeif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 判断是否有正在 recvq 中 阻塞的接收协程,有的话直接进行数据的拷贝if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 如果buf 中有剩余空间,直接将数据写入到 buf中if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.// 入队操作qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)// 修改 sendx c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++// 释放锁unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// Block on the channel. Some receiver will complete our operation for us.// 阻塞在 channel,等待接收者 接收数据gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 放入到 sendq 中c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(&gp.parkingOnChan, 1)// park 结束之后 解锁,增加了回调gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.// 防止gc 清除要使用的数据KeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true
}

8、如果接收时,buf 为空?

8.1 将接收 goroutine 入队

  • 入队后,G2 从调度队列取出,不再被 P 调度到

8.2 被唤醒时

8.2.1 出队

8.2.2 直接将数据 拷贝到 G2 中

  • G2 不再调度队列中,不存在竞争,写操作是安全的
  • 此处是一个优化过程,直接将发送的数据放入到接收数据的G2中.
  • 操作 ring buf 需要进行加锁,性能消耗比较大

8.2.3 G2重新加入到调度队列

  • 当G2 接收到数据后,就会重新加入到 P 的调度队列中,等待系统的调用,具体调用时间我们不关心

这种情况下的接收数据没有 buf 的参与,和无缓存的channel 类似

8.3 接收数据源码分析

func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return
}

一个处理了返回值,本质还是调用了通用的 chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}if c == nil {if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.// 非阻塞情况下,如果失败,直接返回,不需要持有锁if !block && empty(c) {if atomic.Load(&c.closed) == 0 {return}if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 持有锁lock(&c.lock)if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// 看看有没有发送方阻塞在这上面。如果有的话,直接向这个goroutine 拿数据即可,不需要经过 ring bufif sg := c.sendq.dequeue(); sg != nil {recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}// 如果buf 中海油数据,直接从buf 中拿if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)}if ep != nil {// 拿出数据typedmemmove(c.elemtype, ep, qp)}// 清理数据typedmemclr(c.elemtype, qp)// 增加索引c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}// no sender available: block on this channel.// 如果接收的时候buf 为空,且没人发数据,就入队 recvq,阻塞等待被唤醒gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}

9、关闭channel

  1. 加锁
  2. closed 标志位置为1
  3. 将阻塞的 sendq 和 recvq 中得 G 临时保存到 glist 中。等待操作系统调度
  4. 解锁
  • 需要尽快释放锁,因为 此时buf 可能部位空,其他协程需要拿数据,如果一直加锁的话会 影响其他协程

10、读取一个已经关闭的channel

  • 因为关闭的时候,已经将sendq 和 recvq 中得g全部存到 glist 中,所以此时的 sendq 和 recvq 一定是 nil 的
  • buf 中可能还有数据,有数据的话就正常读取
  • buf 没有数据的话,会返回 零值

11、关闭channel 源码分析

func closechan(c *hchan) {// 不可关闭已关闭的channelif c == nil {panic(plainError("close of nil channel"))}// 获取锁lock(&c.lock)// 再次判断channel 是否被关闭if c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, funcPC(closechan))racerelease(c.raceaddr())}// 关闭 channelc.closed = 1var glist gList// 将 sendq 和 recvq 中所有的g 都临时存放在 gList中// release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 将所有的G都存放在gList 中时,就可释放锁// 拿着锁去read 比较费性能unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.// 依次将 所有的g 就绪for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}
}

深入了解golang 的channel相关推荐

  1. golang 中 channel 的详细使用、使用注意事项及死锁分析

    什么是 channel 管道 它是一个数据管道,可以往里面写数据,从里面读数据. channel 是 goroutine 之间数据通信桥梁,而且是线程安全的. channel 遵循先进先出原则. 写入 ...

  2. GoLang之channel底层的数据结构是什么、channel的创建(2)

    文章目录 GoLang之channel底层的数据结构是什么.channel的创建(2) 1.数据结构 2.创建 GoLang之channel底层的数据结构是什么.channel的创建(2) 1.数据结 ...

  3. golang中channel使用

    1 golang中channel使用 文章目录 1 golang中channel使用 1.1 channel介绍 1.2 channel使用 1.2.1 channel声明和初始化 1.2.2 cha ...

  4. golang中Channel通道(二)

    golang中Channel通道(二) 一.带缓冲和不带缓冲的通道的区别 1.非缓冲通道 一次发送操作对应一次接收操作,对于一个goroutine来讲,它的一次发送,在另一个goroutine接收之前 ...

  5. golang的channel实现原理

    golang的channel实现原理 chan结构 src/runtime/chan.go type hchan struct {qcount uint // 当前队列中剩余元素个数dataqsiz ...

  6. GoLang之channel 在什么情况下会引起资源泄漏(10)

    文章目录 GoLang之channel 在什么情况下会引起资源泄漏(10) GoLang之channel 在什么情况下会引起资源泄漏(10) Channel 可能会引发 goroutine 泄漏. 泄 ...

  7. 初学Golang:channel的使用

    Code Speaks: package main import "fmt"/*** 发送消息* @param pings chan<- string, msg string ...

  8. Golang笔记——channel(管道)

    推荐首先阅读:Golang笔记--goroutine(协程) 为什么需要 channel 前面使用全局变量加锁同步来解决 goroutine 的通讯,但不完美 主线程在等待所有 goroutine 全 ...

  9. golang的Channel初始化的有缓存与无缓存解释

    首先编程的时候遇到疑问,输出跟我所想预想不一样,后来查到了golang社区的帖子,其中一篇帖子 :健哥大人  做出了一些解释. 我摘抄重点过来: 无缓冲的与有缓冲channel有着重大差别,那就是一个 ...

  10. Golang之Channel的理解与应用

    博客参考自:https://golangbot.com/buffered-channels-worker-pools/ 基础应用 使用channel的阻塞性质作为延时函数. package maini ...

最新文章

  1. ACMNO.42 C语言-第几天 定义一个结构体变量(包括年、月、日)。计算该日在本年中是第几天,注意闰年问题。利用结构体的在最下面
  2. java1.8之supplier
  3. 关于重构之Switch的处理【一】如果是有序的话,如何处理
  4. 【方法杂谈】你真的了解CVPR吗?
  5. python | 三种可变参数简述
  6. 欢乐纪中某B组赛【2019.1.28】
  7. 关于Kafka中的再均衡
  8. 【X264系列】之命令参数解析
  9. kali linux无法启动服务,不好了!出问题了!在安装Kali Linux之后启动系统时
  10. MinGW —— Minimalist GNU for Windows、Cygwin —— Windows 下的类 unix 系统
  11. 《面向模式的软件体系结构2-用于并发和网络化对象模式》读书笔记(10)--- 接受器 - 连接器...
  12. JavaScript中内存使用规则--堆和栈
  13. 信捷plc485通信上位机_常用通信接口汇总
  14. 机器学习笔记 - 学习使用TensorFlow和张量处理单元 (TPU) 构建图像分类模型
  15. 球员题(查询+添加+排序)
  16. C语言将十进制输出二进制、八进制、十六进制的方法总结
  17. 为什么要劝退分子科学与工程?
  18. LeetCode每日一题打卡组队监督!刷题群!
  19. 深入浅出Spring Aop
  20. 初识MySQL数据库【基操篇】

热门文章

  1. 三分屏课件制作_制作微课的几种方式,你知道吗?
  2. 博管家话费充值接⼝API接口
  3. 职场难题如何有效沟通、应对压力、提升能力,实现职场成功
  4. JS 作用域和作用域链
  5. 边缘计算如何为工业物联网提供便利和好处
  6. Maven 打jar包部署到生产环境的pom文件
  7. 高德导航语音播报服务区服务器,把地图导航语音播报设置成自己的声音!
  8. 优思学院|什么是六西格玛?一文解答你对六西格玛最常见的疑问
  9. 医院病人设备定位管理系统
  10. Latex 会议模板基础上编辑中文