文章目录

  • channel实现方式
  • 说明
  • 思考包含的主要场景
  • 源码实现
    • hchan 介绍
    • 创建 hchan
    • 往chan里面发送数据
      • 函数 chansend
      • 函数 send
    • 从chan里面获取数据
      • 函数 chanrecv
      • 函数 recv
    • 关闭 chan
  • 总结

channel实现方式

说明

  1. chan怎么使用不是本文的主题
  2. 本文的chan的是基于golang 1.13,系统是mac os
  3. 本文的思路:自己思考chan有哪些点,他是哪些特点,带着这些特点,去代码中寻找答案

思考包含的主要场景

从过程来看,chan主要涉及到三个大点,
第一怎么创建一个chan
第二怎么发送和接受数据
第三怎么关闭chan
具体上,应该包含这些

  1. 创建channel

    • 有缓存
    • 无缓存
  2. 往channel里面发送数据
    • 有缓存
    • 无缓存
    • 有接收者
    • 无接受者
  3. 从channel里面读取数据
    • 有缓存
    • 无缓存
    • 有数据
    • 无数据
  4. 关闭chan

源码实现

当前代码从/runtime/hchan.go

hchan 介绍

hchan结构体,主要是缓冲区的存贮buf,是一个数组,以及两个队列(双向链表实现)sendq和recvq,主要是利用队列的先进先出的特性,完成chan的发送和接受的顺序,通过lock保证并发正确性

type hchan struct {qcount   uint           // total data in the queue,当前data数dataqsiz uint           // size of the circular queue //缓存的大小buf      unsafe.Pointer // points to an array of dataqsiz elements 存放缓存的一个数组,通过index实现的环形数组elemsize uint16  //chan里面的元素的大小,如chan int,则表示int的大小elemtype *_type // element type 元素的类型信息closed   uint32  //是否已经关闭,1表示已经关闭,0表示开着sendx    uint   // send index 发送者的index, 有缓冲区使用recvx    uint   // receive index 接收者的index,有缓冲区使用recvq    waitq  // list of recv waiters  接受者的队列(使用双向链表实现),当缓冲区无数据,会放到里面sendq    waitq  // list of send waiters 发送者的队列(使用双向链表实现),缓冲区已经满了或者无缓冲区的时候,放到里面// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex //锁,保护hchan的所有元素已经sudogs的元素
}

创建 hchan

代码入口:
函数:makechan64或者makechan,我们从makechan入手

func makechan(t *chantype, size int) *hchan {...
}

size即缓存的大小
chantype是通过编译过来的chan的类型,包含chan类型信息

func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}// elem.size*size算出缓冲区的大小,以及是否溢出mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0: //无缓冲区// Queue or element size is zero.//直接分配hchan size的内存,c指向它c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers. 元素不包含指针// Allocate hchan and buf in one call.  直接一次分配hchan和bufc = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers. 包含指针,使用new先把hchan分配好,在分配buf的内存c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)  //元素大小c.elemtype = elem //元素信息c.dataqsiz = uint(size) //缓冲区的大小if debugChan { // 可以自己设置debugChan为true,自己想打印源代码,可以使用print函数print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")}return c
}

往chan里面发送数据

函数 chansend

  • 无缓冲区

    • 如果有recvq里面有receiver,直接把值给它
    • 如果没有recvq,将数据封装成sudog,放到sendq里,将当前g挂起来
  • 有缓冲区
    • 如果有recvq里面有receiver,直接把值给它
    • 如果没有recvq
      • 如果缓冲区没有满,直接放到缓冲区buf里面
      • 如果缓冲区满了,将数据封装成sudog,放到 sendq里面,将当前g挂起来
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
}
// c hchan的指针,
// block是否阻塞,select  有个default是不阻塞的即selectnbsend函数的实现
//ep 发送的数据的指针
// callerpc 调用者的程序计数器func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation.//针对不阻塞的场景,未关闭,无缓冲区,则直接返回false,走select的default场景//或者有缓冲区,但是缓冲区已经满了场景直接返回falseif !block && c.closed == 0 && (c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 开始上锁,保证并发安全性lock(&c.lock)//如果已经关闭,再发送数据,直接panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 在recvq的双向链表里面获取第一个sudog,如果获取到,表明当前已经有接受者,则进行发送数据,send将讲述怎么发送,在send函数的解释里面说if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 有缓冲区的场景if c.qcount < c.dataqsiz {//下面的操作是ep位置的数据copy到buf sendx位置,即实现数据的缓冲,然后unlock,返回true// Space is available in the channel buffer. Enqueue the element to send. buf对应sendx的指针qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}//把ep地址指向的数据move到qp地址上面typedmemmove(c.elemtype, qp, ep)c.sendx++//循环列表if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}// 如果是不阻塞的,直接返回,因为下面的逻辑都是缓冲区是空的,所以直接返回if !block {unlock(&c.lock)return false}//  缓冲区为空,则将ep放到sudog里面,然后将sudog放到sendq队列里面去,然后将当前g挂起// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)//这之前当前的g是挂住的,后续代码不会进行,一直到接受者,接受这个数据,才会将会将g恢复到运行状态// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true
}

函数 send

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {//表示是否启用数据竞争检测if raceenabled {if c.dataqsiz == 0 {racesync(c, sg)} else {// Pretend we go through the buffer, even though// we copy directly. Note that we need to increment// the head/tail locations only when raceenabled.qp := chanbuf(c, c.recvx)raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}}if sg.elem != nil {//如果要返回值的话,将ep的数据move到receiver的elem去sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}// 释放receiver对应的g,让对应的g继续运行goready(gp, skip+1)
}

从chan里面获取数据

函数 chanrecv

  • 无缓冲区

    • 如果sendq里面有数据,直接sendq获取第一个sudog,获取值
    • 如果没有sendq,将数据封装成sudog,放到recvq里面
  • 有缓冲区
    • 如果sendq里面有数据,取出一个sudog,从缓冲区位置recvx取出值, 放置到返回值里面,然后将sudog里面的值放到刚刚从缓冲区取值的位置recvx里面
    • 如果缓冲区有数据,从缓冲区取值
    • 如果缓冲区无数据,封装成sudog,放到recvq里面
// c 当前hchan
// ep返回的值的地址,如果为nil,则忽略返回值
// block是否阻塞,跟send的block一个含义
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}if c == nil {if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}//如果不阻塞,无缓冲去为0 或者当前缓冲区已经满了 且未关闭,直接返回,因为这些都是阻塞式的if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 {return}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 开始上锁lock(&c.lock)// 当前chan已经关闭,并且已经没有数据了,直接返回if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// 从发送队列里面获取一个sudog,如果获取不为空,则进行接收数据,具体recv的说明在后面,这个分为两个场景,一个为缓冲区已经满了,放到了sendq队列,一个是缓冲区为0,直接放到sendq,需要先把sendq里面的数据取出来,将buf里第recv个数据赋值给ep(如果有返回值的话),然后将取出来的sudog里的elem放到buf里的第recv,保证是队列的先进先出 if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}// 缓冲区里面有数据,分为两个场景,一个为chan已经关闭,一个chan正常if c.qcount > 0 {// Receive directly from queue// 从buf里面获取当前recvx的数据qp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)}// 有返回值,则将qp指针指向的数据拷贝到ep指针位置if ep != nil {typedmemmove(c.elemtype, ep, qp)}//然后将buf revcx位置的数据清空typedmemclr(c.elemtype, qp)// recvx加1,如果buf数组都读取完了,直接将recvx设置为0,实现一个循环数组c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}// 当前数据个数减1,unlock,返回c.qcount--unlock(&c.lock)return true, true}// 无缓冲区,不阻塞的读数据,直接返回if !block {unlock(&c.lock)return false, false}//如果缓冲区为0或者缓冲区里面没有数据,则将当前ep封装为sudog,然后放到队列recvq里面,然后将当前g挂起// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}

函数 recv

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}// 无缓冲区,也要返回elem,则把sg里面存的值赋值给ep,返回if ep != nil {// copy data from senderrecvDirect(c.elemtype, sg, ep)}} else {// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)}// copy data from queue to receiverif ep != nil {// 要返回值,则将buf里面的第recvx个里面对应值,赋值给eptypedmemmove(c.elemtype, ep, qp)}// copy data from sender to queue// 然后把sg里面的elem值赋值到buf里面的第recvx个里面(即qp),环形的buf,保证先把buf里面的值读取完,然后读取sendq里面的sudog里面的值typedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {//环形数组c.recvx = 0}// 下个要往缓冲区写入的indexc.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}// 将sender的g不在阻塞goready(gp, skip+1)
}

关闭 chan

close chan,主要是两个释放和将closed设置为1,sendq里面都是为发送成功的数据,直接丢弃,缓冲里面的数据,还可以继续读,如果recvq有等待者,表面chan里面是无数据的,直接释放。

func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}lock(&c.lock)if c.closed != 0 {// chan已经关闭,再关闭,panicunlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, funcPC(closechan))racerelease(c.raceaddr())}//将closed设置为1c.closed = 1var glist gList// release all readers,释放所有的readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {//要返回值,将值设置为niltypedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, c.raceaddr())}放置到glistglist.push(gp)}// release all writers (they will panic),释放所有挂住的的senderfor {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}
}

总结

主旨:使用队列的先进先出能力,进行channel发送和接受任务管理
有缓冲则使用 环形数组+双向链表实现带缓存功能的channel
不带缓存功能的:双向链表

channel源代码实现相关推荐

  1. Netty之Channel源代码分析

    由于Netty只是一个接口,没有实现,所以这些接口的作用也只能从注释上一探究竟,具体的用法需要在实现类中研究 1.api 2.各个api的作用 2.API功能说明 - 1)Channel read() ...

  2. 在.NET Core中使用Channel(一)

    我最近一直在熟悉.net Core中引入的新Channel<T>类型.我想在它第一次发布的时候我了解过它,但是有关文章非常非常少,我不能理解它们与其他队列有什么不同. 在使用了一段时间后, ...

  3. 理解 .NET Core中的Channel篇之一——通道入门

    这篇文章是.NET中有关Channel的系列文章的一部分.当然,最好从第1部分开始,但是您可以使用下面的链接跳过任何想要的地方.这系列文章均是本人翻译,翻译也是随性而至,并非直译,英文好的可以去看原文 ...

  4. 区块链教程Fabric1.0源代码分析Peer peer channel命令及子命令实现

    区块链教程Fabric1.0源代码分析Peer peer channel命令及子命令实现,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实 ...

  5. Fabric 1.0源代码分析(33) Peer #peer channel命令及子命令实现

    # Fabric 1.0源代码笔记 之 Peer #peer channel命令及子命令实现 ## 1.peer channel create子命令实现(创建通道) ### 1.1.初始化Ordere ...

  6. 【原创】Kakfa utils源代码分析(三)

    Kafka utils包最后一篇~~~ 十五.ShutdownableThread.scala 可关闭的线程抽象类! 继承自Thread同时还接收一个boolean变量isInterruptible表 ...

  7. 区块链教程Fabric1.0源代码分析scc(系统链码)

    区块链教程Fabric1.0源代码分析scc(系统链码),2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实上,正是初期泡沫的渐退,让人们更 ...

  8. 修改openssh源代码,添加操作记录审记功能

    为什么80%的码农都做不了架构师?>>>    这个是之前一年前研究搞过,当时记保存了源代码,本想直接用当时的代码写编文章,中间电脑换过几次,不知是丢了,还是没有找到.写这编博文是凭 ...

  9. 56. Netty源代码分析-服务器初始化 NioEventLoopGroup实例化

    一. 代码下载 Netty代码下载和编译参考前一篇Netty文章 https://blog.51cto.com/483181/2112163 二. 服务器代码分析 2.1 服务器代码编写 一般Nett ...

最新文章

  1. c语言随机抽奖小程序,小程序抽奖实现
  2. s5720找mac 华为交换机_【基础】交换机堆叠模式
  3. windows10 环境下的amqp安装步骤(图文)
  4. 最全整理浏览器兼容性问题与解决方案(转)
  5. 汉能:让人类像叶绿素一样利用太阳能
  6. iOS当中的设计模式
  7. flutter 拖拽布局_Flutter 史上最牛拖动控件 Draggable
  8. 从 Poisson 分布到服务器的访问
  9. python中pprint是干什么的_Python中的pprint折腾记
  10. 线上支付之----网关支付、银联代扣通道、快捷支付、银行卡支付等网上常见支付方式接口说明!!
  11. Postman下载教程
  12. 计算二叉树叶子结点个数 C/C++
  13. python 主函数传参_Python函数传参方法超级大汇总
  14. 测试必经之路(探索性测试)
  15. Java设计模式之观察者模式应用与实战
  16. 设计原则之依赖倒置原则详解
  17. 工控行业学什么编程语言比较好_中国工控|想学PLC编程?先弄清5种PLC专用语言 !...
  18. windows的IPAM无法独立安装,需要域环境才能使用
  19. 单片机位寻址举例_基于80C51单片机位寻址编程
  20. Linux(6)RedHat7 基本命令五-hwclock(clock)命令详解

热门文章

  1. 抖音开放平台-视频切片-视频分片上传-不合法的参数ID-不合法的对象ID
  2. iOS 10 之 网络权限带来的坑
  3. 中秋被各大厂的月饼秀到了!快和小编看看这些大厂到底发的什么月饼吧!!!不知道有没有被惊讶到的?
  4. Data Mesh的原则和逻辑架构
  5. 疫情之下,Nutanix推创新性解决方案助力企业发展
  6. html 页面记事本打开方法,网页查看源代码,如何设置默认为记事本打开
  7. css实现一闪而过的光影效果
  8. [解决方案] java.lang.OutOfMemoryError异常解决方法 [复制链接]
  9. 搞IT要具备的特质,像沙僧一样~
  10. orangepi-lite2 在mac上烧录系统到tf卡