【转载请标明出处】https://blog.csdn.net/qq_25870633/article/details/83388952

接上篇文章 【我的区块链之路】- golang源码分析之select的底层实现 我这里因为面试的时候也有被问到过 channel的底层实现,所以就一并的去把 channel 啊,goroutine啊,go协程的调度器啊,interface啊,mutex啊,rwmutex啊,timer啊等等底层全部都去录了一遍。没办法啊,曾经我以为自己是个逆风尿三丈的耀眼王者,然而这段时间发现自己只是一只顺风尿湿鞋的不屈黑铁。。。

参考:

https://ninokop.github.io/

https://blog.csdn.net/nobugtodebug/article/details/45396507

https://studygolang.com/articles/10575?fr=sidebar

那么,我们这篇文章主要是讲channel的底层实现的。

首先,我们先看channel的实现是在哪里?在runtime包下面咯,路径为:./src/runtime/chan.go 文件中,其中主要的结构体为:

/**
定义了 channel 的结构体
*/
type hchan struct {qcount   uint           // total data in the queue   队列中的当前数据的个数dataqsiz uint           // size of the circular queue channel的大小buf      unsafe.Pointer // points to an array of dataqsiz elements  数据缓冲区,存放数据的环形数组elemsize uint16         // channel 中数据类型的大小 (单个元素的大小)closed   uint32         // 表示 channel 是否关闭的标识位elemtype *_type // element type  队列中的元素类型// send 和 recieve 的索引,用于实现环形数组队列sendx    uint   // send index    当前发送元素的索引recvx    uint   // receive index    当前接收元素的索引recvq    waitq  // list of recv waiters    接收等待队列;由 recv 行为(也就是 <-ch)阻塞在 channel 上的 goroutine 队列sendq    waitq  // list of send waiters    发送等待队列;由 send 行为 (也就是 ch<-) 阻塞在 channel 上的 goroutine 队列// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.// lock保护hchan中的所有字段,以及此通道上阻塞的sudoG中的几个字段。  //// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.// 保持此锁定时不要更改另一个G的状态(特别是,没有准备好G),因为这可能会因堆栈收缩而死锁。lock mutex
}/**
发送及接收队列的结构体
等待队列的链表实现
*/
type waitq struct {first *sudoglast  *sudog
}

然后还有在 runtime包的 ./src/runtime/runtime2.go 中定义的 sudoG对应的结构体:

/**
对 G 的封装
*/
type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on// this for sudogs involved in channel ops.g          *gselectdone *uint32 // CAS to 1 to win select race (may point to stack)next       *sudogprev       *sudogelem       unsafe.Pointer // data element (may point to stack)// The following fields are never accessed concurrently.// For channels, waitlink is only accessed by g.// For semaphores, all fields (including the ones above)// are only accessed when holding a semaRoot lock.acquiretime int64releasetime int64ticket      uint32parent      *sudog // semaRoot binary treewaitlink    *sudog // g.waiting list or semaRootwaittail    *sudog // semaRootc           *hchan // channel
}

有上述的结构体我们大致可以看出channel其实就是由一个环形数组实现的队列,用于存储消息元素;两个链表实现的 goroutine 等待队列,用于存储阻塞在 recv 和 send 操作上的 goroutine;一个互斥锁,用于各个属性变动的同步,只不过这个锁是一个轻量级锁。其中 recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。列表的实现是 sudog,其实就是一个对 g 的结构的封装

和select类似,hchan其实只是channel的头部。头部后面的一段内存连续的数组将作为channel的缓冲区,即用于存放channel数据的环形队列。qcount 和 dataqsiz 分别描述了缓冲区当前使用量【len】和容量【cap】。若channel是无缓冲的,则size是0,就没有这个环形队列了。如图:

下面我们来看看实例化一个channel的实现:

make:

make 的过程还比较简单,需要注意一点的是当元素不含指针的时候,会将整个 hchan 分配成一个连续的空间。下面就是make创建channel的代码实现:

//go:linkname reflect_makechan reflect.makechan
func reflect_makechan(t *chantype, size int64) *hchan {return makechan(t, size)
}/**
创建 chan
*/
func makechan(t *chantype, size int64) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {panic(plainError("makechan: size out of range"))}var c *hchanif elem.kind&kindNoPointers != 0 || size == 0 {// Allocate memory in one call.// Hchan does not contain pointers interesting for GC in this case:// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))if size > 0 && elem.size != 0 {c.buf = add(unsafe.Pointer(c), hchanSize)} else {// race detector uses this location for synchronization// Also prevents us from pointing beyond the allocation (see issue 9401).c.buf = unsafe.Pointer(c)}} else {c = new(hchan)c.buf = newarray(elem, int(size))}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")}return c
}

可以看出来和之前说的select一样用了 //go:linkname 技巧,把函数关联到了 reflect包中的对应函数上了,这样纸就使用反射区出发整个make的入口。骚微过一下reflect包中真正make(chan type, int) 的函数吧

// MakeChan creates a new channel with the specified type and buffer size.
func MakeChan(typ Type, buffer int) Value {if typ.Kind() != Chan {panic("reflect.MakeChan of non-chan type")}if buffer < 0 {panic("reflect.MakeChan: negative buffer size")}if typ.ChanDir() != BothDir {panic("reflect.MakeChan: unidirectional channel type")}ch := makechan(typ.(*rtype), uint64(buffer))return Value{typ.common(), ch, flag(Chan)}
}// 这个才是被runtime中 用 //go:linkname 链接的函数
func makechan(typ *rtype, size uint64) (ch unsafe.Pointer)

劫争上面继续说,make中做了什么:首先两个 if 主要是一些异常情况的判断,第三个 if 也很明显,判断 size 大小是否小于 0 或者过大。int64(uintptr(size)) != size 这句也是判断 size 是否为负。

然后接着判断,如果channel中元素类型不为指针或者channel为无缓冲通道那么就将其分配在连续的内存区域。【使用 mallocgc 函数进行分配内存空间】顺便看下 mallocgc 函数(这个函数在select那章其实也用到的)的代码吧:

// Allocate an object of size bytes.
// Small objects are allocated from the per-P cache's free lists.
// Large objects (> 32 kB) are allocated straight from the heap./**
分配大小为字节的对象。
从每个P缓存的空闲列表中分配小对象。
大型对象(> 32 kB)直接从堆中分配。
*/
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {if gcphase == _GCmarktermination {throw("mallocgc called with gcphase == _GCmarktermination")}if size == 0 {return unsafe.Pointer(&zerobase)}if debug.sbrk != 0 {align := uintptr(16)if typ != nil {align = uintptr(typ.align)}return persistentalloc(size, align, &memstats.other_sys)}// assistG is the G to charge for this allocation, or nil if// GC is not currently active.var assistG *gif gcBlackenEnabled != 0 {// Charge the current user G for this allocation.assistG = getg()if assistG.m.curg != nil {assistG = assistG.m.curg}// Charge the allocation against the G. We'll account// for internal fragmentation at the end of mallocgc.assistG.gcAssistBytes -= int64(size)if assistG.gcAssistBytes < 0 {// This G is in debt. Assist the GC to correct// this before allocating. This must happen// before disabling preemption.gcAssistAlloc(assistG)}}// Set mp.mallocing to keep from being preempted by GC.mp := acquirem()if mp.mallocing != 0 {throw("malloc deadlock")}if mp.gsignal == getg() {throw("malloc during signal")}mp.mallocing = 1shouldhelpgc := falsedataSize := sizec := gomcache()var x unsafe.Pointernoscan := typ == nil || typ.kind&kindNoPointers != 0if size <= maxSmallSize {if noscan && size < maxTinySize {// Tiny allocator.//// Tiny allocator combines several tiny allocation requests// into a single memory block. The resulting memory block// is freed when all subobjects are unreachable. The subobjects// must be noscan (don't have pointers), this ensures that// the amount of potentially wasted memory is bounded.//// Size of the memory block used for combining (maxTinySize) is tunable.// Current setting is 16 bytes, which relates to 2x worst case memory// wastage (when all but one subobjects are unreachable).// 8 bytes would result in no wastage at all, but provides less// opportunities for combining.// 32 bytes provides more opportunities for combining,// but can lead to 4x worst case wastage.// The best case winning is 8x regardless of block size.//// Objects obtained from tiny allocator must not be freed explicitly.// So when an object will be freed explicitly, we ensure that// its size >= maxTinySize.//// SetFinalizer has a special case for objects potentially coming// from tiny allocator, it such case it allows to set finalizers// for an inner byte of a memory block.//// The main targets of tiny allocator are small strings and// standalone escaping variables. On a json benchmark// the allocator reduces number of allocations by ~12% and// reduces heap size by ~20%.off := c.tinyoffset// Align tiny pointer for required (conservative) alignment.if size&7 == 0 {off = round(off, 8)} else if size&3 == 0 {off = round(off, 4)} else if size&1 == 0 {off = round(off, 2)}if off+size <= maxTinySize && c.tiny != 0 {// The object fits into existing tiny block.x = unsafe.Pointer(c.tiny + off)c.tinyoffset = off + sizec.local_tinyallocs++mp.mallocing = 0releasem(mp)return x}// Allocate a new maxTinySize block.span := c.alloc[tinySpanClass]v := nextFreeFast(span)if v == 0 {v, _, shouldhelpgc = c.nextFree(tinySpanClass)}x = unsafe.Pointer(v)(*[2]uint64)(x)[0] = 0(*[2]uint64)(x)[1] = 0// See if we need to replace the existing tiny block with the new one// based on amount of remaining free space.if size < c.tinyoffset || c.tiny == 0 {c.tiny = uintptr(x)c.tinyoffset = size}size = maxTinySize} else {var sizeclass uint8if size <= smallSizeMax-8 {sizeclass = size_to_class8[(size+smallSizeDiv-1)/smallSizeDiv]} else {sizeclass = size_to_class128[(size-smallSizeMax+largeSizeDiv-1)/largeSizeDiv]}size = uintptr(class_to_size[sizeclass])spc := makeSpanClass(sizeclass, noscan)span := c.alloc[spc]v := nextFreeFast(span)if v == 0 {v, span, shouldhelpgc = c.nextFree(spc)}x = unsafe.Pointer(v)if needzero && span.needzero != 0 {memclrNoHeapPointers(unsafe.Pointer(v), size)}}} else {var s *mspanshouldhelpgc = truesystemstack(func() {s = largeAlloc(size, needzero, noscan)})s.freeindex = 1s.allocCount = 1x = unsafe.Pointer(s.base())size = s.elemsize}var scanSize uintptrif !noscan {// If allocating a defer+arg block, now that we've picked a malloc size// large enough to hold everything, cut the "asked for" size down to// just the defer header, so that the GC bitmap will record the arg block// as containing nothing at all (as if it were unused space at the end of// a malloc block caused by size rounding).// The defer arg areas are scanned as part of scanstack.if typ == deferType {dataSize = unsafe.Sizeof(_defer{})}heapBitsSetType(uintptr(x), size, dataSize, typ)if dataSize > typ.size {// Array allocation. If there are any// pointers, GC has to scan to the last// element.if typ.ptrdata != 0 {scanSize = dataSize - typ.size + typ.ptrdata}} else {scanSize = typ.ptrdata}c.local_scan += scanSize}// Ensure that the stores above that initialize x to// type-safe memory and set the heap bits occur before// the caller can make x observable to the garbage// collector. Otherwise, on weakly ordered machines,// the garbage collector could follow a pointer to x,// but see uninitialized memory or stale heap bits.publicationBarrier()// Allocate black during GC.// All slots hold nil so no scanning is needed.// This may be racing with GC so do it atomically if there can be// a race marking the bit.if gcphase != _GCoff {gcmarknewobject(uintptr(x), size, scanSize)}if raceenabled {racemalloc(x, size)}if msanenabled {msanmalloc(x, size)}mp.mallocing = 0releasem(mp)if debug.allocfreetrace != 0 {tracealloc(x, size, typ)}if rate := MemProfileRate; rate > 0 {if size < uintptr(rate) && int32(size) < c.next_sample {c.next_sample -= int32(size)} else {mp := acquirem()profilealloc(mp, x, size)releasem(mp)}}if assistG != nil {// Account for internal fragmentation in the assist// debt now that we know it.assistG.gcAssistBytes -= int64(size - dataSize)}if shouldhelpgc {if t := (gcTrigger{kind: gcTriggerHeap}); t.test() {gcStart(gcBackgroundMode, t)}}return x
}

否则,在创建chan需要知道数据类型和缓冲区大小。channel 和 channel.buf 是分别进行分配的。对应上面的结构图 newarray 将生成这个环形队列。之所以要分开指针类型缓冲区主要是为了区分gc操作,需要将它设置为flagNoScan。并且指针大小固定,可以跟hchan头部一起分配内存,不需要先new(hchan)newarry

我们再看下 newarray函数:

func newarray(typ *_type, n int) unsafe.Pointer {if n < 0 || uintptr(n) > maxSliceCap(typ.size) {panic(plainError("runtime: allocation size out of range"))}return mallocgc(typ.size*uintptr(n), typ, true)
}

可以看出来,其实newarray函数底层也是调了 mallocgc 函数来分配内存空间的

总结:make chan 的过程是在堆上进行分配,返回是一个 hchan 的指针。

声明但不make初始化的chan是nil chan。读写nil chan会阻塞,关闭nil chan会panic。

chan的读写:

从实现中可见读写chan都要lock,这跟读写共享内存一样都有lock的开销

数据在chan中的传递方向从chansend开始从入参最终写入recvq中的goroutine的数据域,这中间如果发生阻塞可能先写入sendq中goroutine的数据域等待中转。

从gopark返回后sudog对象可重用。

首先,我们来看下对应chan的读写函数的定义:

sned:

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed.  it is easiest to loop and re-run* the operation; we'll see that it's now closed.* 通用单通道发送/接收* 如果阻塞不是nil,则将不会休眠,但如果无法完成则返回。* 当睡眠中涉及的通道关闭时,睡眠可以通过g.param == nil唤醒。 最简单的循环和重新运行操作; 我们会 * 看到它现在已经关闭了。   */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 当 channel 未初始化或为 nil 时,向其中发送数据将会永久阻塞if c == nil {if !block {return false}// gopark 会使当前 goroutine 休眠,并通过 unlockf 唤醒,但是此时传入的 unlockf 为 nil, 因此,goroutine 会一直休眠gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation.if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 获取同步锁lock(&c.lock)// 向已经关闭的 channel 发送消息会产生 panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// CASE1: 当有 goroutine 在 recv 队列上等待时,跳过缓存队列,将消息直接发给 reciever goroutineif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).// 找到了等待receiver。 我们将要发送的值直接传递给receiver,绕过通道缓冲区(如果有的话)。send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// CASE2: 缓存队列未满,则将消息复制到缓存队列上if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// CASE3: 缓存队列已满,将goroutine 加入 send 队列// 初始化 sudog// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.selectdone = nilmysg.c = cgp.waiting = mysggp.param = nil// 加入队列c.sendq.enqueue(mysg)// 休眠goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)// 唤醒 goroutine// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true
}// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if raceenabled {if c.dataqsiz == 0 {racesync(c, sg)} else {// Pretend we go through the buffer, even though// we copy directly. Note that we need to increment// the head/tail locations only when raceenabled.qp := chanbuf(c, c.recvx)raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}}if sg.elem != nil {sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}

send 有以下四种情况:【都是对不为nil的chan的情况】

  • 向已经close的chan写数据,抛panic。
  • 有 goroutine 阻塞在 channel recv 队列上,此时缓存队列( hchan.buf)为空(即缓冲区内无元素),直接将消息发送给 reciever goroutine,只产生一次复制
  • 当 channel 缓存队列( hchan.buf )有剩余空间时,将数据放到队列里,等待接收,接收后总共产生两次复制
  • 当 channel 缓存队列( hchan.buf )已满时,将当前 goroutine 加入 send 队列并阻塞。

【第一种情况】:向已经close的chan写数据,会抛panic

if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))
}

【第二种情况】:从当前 channel 的等待队列中取出等待的 goroutine,然后调用 send。goready 负责唤醒 goroutine

if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true
}// 看send 部分逻辑func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if raceenabled {if c.dataqsiz == 0 {racesync(c, sg)} else {// Pretend we go through the buffer, even though// we copy directly. Note that we need to increment// the head/tail locations only when raceenabled.qp := chanbuf(c, c.recvx)raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}}if sg.elem != nil {sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}

【第三种情况】:通过比较 qcount 和 dataqsiz 来判断 hchan.buf 是否还有可用空间。除此之后还需要调整一下 sendx 和 qcount

if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true
}

【第四种情况】:当 channel 缓存队列( hchan.buf )已满时,将当前 goroutine 加入 send 队列并阻塞。

// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 一些初始化工作mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.selectdone = nilmysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)   // 当前 goroutine 如等待队列goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)  //休眠

receive:

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return
}// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}// 从 nil 的 channel 中接收消息,永久阻塞if c == nil {if !block {return}gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not ready for receiving, we observe that the// channel is not closed. Each of these observations is a single word-sized read// (first c.sendq.first or c.qcount, and second c.closed).// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.//// The order of operations is important here: reversing the operations can lead to// incorrect behavior when racing with a close.if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 {return}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// CASE1: 从已经 close 且为空的 channel recv 数据,返回空值if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(unsafe.Pointer(c))}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// CASE2: send 队列不为空// CASE2.a: 缓存队列为空,直接从 sender recv 元素// CASE2.b: 缓存队列不为空,此时只有可能是缓存队列已满,从队列头取出元素,//并唤醒 sender 将元素写入缓存队列尾部。由于为环形队列,因此,队列满时只需要将队列头复制给 reciever,//同时将 sender 元素复制到该位置,并移动队列头尾索引,不需要移动队列元素if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}// CASE3: 缓存队列不为空,直接从队列取元素,移动头索引if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}// CASE4: 缓存队列为空,将 goroutine 加入 recv 队列,并阻塞// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.selectdone = nilmysg.c = cgp.param = nilc.recvq.enqueue(mysg)goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}if ep != nil {// copy data from senderrecvDirect(c.elemtype, sg, ep)}} else {// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)}// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}

receive 有以下四种情况:【都是对不为nil的chan的情况】

  • 从已经 close 且为空的 channel recv 数据,返回空值
  • 当 send 队列不为空,分两种情况:【一】缓存队列为空,直接从 send 队列的sender中接收数据 元素;【二】缓存队列不为空,此时只有可能是缓存队列已满,从队列头取出元素,并唤醒 sender 将元素写入缓存队列尾部。由于为环形队列,因此,队列满时只需要将队列头复制给 reciever,同时将 sender 元素复制到该位置,并移动队列头尾索引,不需要移动队列元素。【这就是为什么使用环形队列的原因】
  • 缓存队列不为空,直接从队列取队头元素,移动头索引。
  • 缓存队列为空,将 goroutine 加入 recv 队列,并阻塞。

【第一种情况】:从 closed channel 接收数据,如果 channel 中还有数据,接着走下面的流程。如果已经没有数据了,则返回默认值。使用 ok-idiom 方式读取的时候,第二个参数返回 false。

if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(unsafe.Pointer(c))}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false
}

【第二种情况】:当前有send goroutine 阻塞在 channel 上,直接调 recv函数【a】当缓存队列尾空时,直接从 send 队列的sender中接收数据 元素。【b】缓存队列不为空,此时只有可能是缓存队列已满,从队列头取出元素,并唤醒 sender 将元素写入缓存队列尾部。同时更改队列头索引。

if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}if ep != nil {// copy data from senderrecvDirect(c.elemtype, sg, ep)}} else {// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)}// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}

【第三种情况】:buf 中有可用数据。直接从队列取队头元素,移动头索引。

if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true
}

【第四种情况】:buf 为空,将当前 goroutine 加入 recv 队列并阻塞。

// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.selectdone = nilmysg.c = cgp.param = nilc.recvq.enqueue(mysg)goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

close:

下面,我们来看看关闭通道具体的实现:

//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose(c *hchan) {closechan(c)
}func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}lock(&c.lock)// 重复 close,产生 panicif c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc(unsafe.Pointer(&c))racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))racerelease(unsafe.Pointer(c))}c.closed = 1var glist *g// 唤醒所有 reciever// release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, unsafe.Pointer(c))}gp.schedlink.set(glist)glist = gp}// 唤醒所有 sender,并产生 panic// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, unsafe.Pointer(c))}gp.schedlink.set(glist)glist = gp}unlock(&c.lock)// 唤醒所哟叜glist中的goroutine// Ready all Gs now that we've dropped the channel lock.for glist != nil {gp := glistglist = glist.schedlink.ptr()gp.schedlink = 0goready(gp, 3)}
}

close channel 的工作

  • 将 c.closed 设置为 1。
  • 唤醒 recvq 队列里面的阻塞 goroutine
  • 唤醒 sendq 队列里面的阻塞 goroutine

处理方式是分别遍历 recvq 和 sendq 队列,将所有的 goroutine 放到 glist 队列中,最后唤醒 glist 队列中的 goroutine

OK上述就是channel的源码分析,我们下面通过几张图来看一下chan的工作原理:

send的流程:

send的流程:

close的流程:

以上就是对 chan的底层操作原理及讲解。

问chan是否线程安全的呢?是线程安全的,因为其hchan结构汇总内置了mutex,且send 及 recv 及close 的操作中均会去 加锁/解锁 等动作。

到这里我们对chan的底层讲解就结束了!大家手下留情了~

【我的架构师之路】- golang源码分析之channel的底层实现相关推荐

  1. golang 源码分析之channel

    channel channel分为有阻塞的和无阻塞的,其实关键就是在于是否有capacity.已关闭的channal再次关闭会触发panic注意下已关闭的channal发送消息,会触发panic 分析 ...

  2. 【我的架构师之路】- golang源码分析之协程调度器底层实现( G、M、P)

    本人的源码是基于go 1.9.7 版本的哦! 紧接着之前写的 [我的区块链之路]- golang源码分析之select的底层实现 和 [我的区块链之路]- golang源码分析之channel的底层实 ...

  3. 架构师之路:从码农到架构师你差了哪些

    转载自   架构师之路:从码农到架构师你差了哪些 Web应用,最常见的研发语言是Java和PHP. 后端服务,最常见的研发语言是Java和C/C++. 大数据,最常见的研发语言是Java和Python ...

  4. golang源码分析-调度概述

    golang源码分析-调度过程概述 本文主要概述一下golang的调度器的大概工作的流程,众所周知golang是基于用户态的协程的调度来完成多任务的执行.在Linux操作系统中,以往的多线程执行都是通 ...

  5. golang源码分析-启动过程概述

    golang源码分析-启动过程概述 golang语言作为根据CSP模型实现的一种强类型的语言,本文主要就是通过简单的实例来分析一下golang语言的启动流程,为深入了解与学习做铺垫. golang代码 ...

  6. NSQ源码分析之channel

    阅读本文之前,推荐先阅读上一篇对topic的分析. 什么是channel 结合上一篇对topic分析,channel的作用就是将topic的数据进行分流,topic会将发布的消息分配给所有连接的cha ...

  7. 呕心沥血苦战6个月,熬夜总结的这份Java0基础进阶架构师视频+笔记+课件+源码资料,快快收藏手慢无

    每天都有初学者询问该如何学习,如何快速学习,因精力有限不能一一回复请见谅,现系统整理一套java初学者最佳的学习方法.路线.大纲及视频资料,并对一些过期的知识点进行剔除!如Struts2,hibern ...

  8. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  9. paho架构_MQTT系列最终章-Paho源码分析(三)-心跳与重连机制

    写在前面 通过之前MQTT系列-Eclipse.Paho源码分析(二)-消息的发送与接收的介绍,相信仔细阅读过的小伙伴已经对Eclipse.Paho内部发送和订阅消息的流程有了一个较为清晰的认识,今天 ...

  10. 【golang源码分析】chan底层原理——附带读写用户队列的环形缓冲区

    1 环形缓冲区 1.1 环形缓冲区结构 环形缓冲区通常有一个读指针和一个写指针.读指针指向环形缓冲区中可读的数据,写指针指向环形缓冲区中可写的缓冲区.通过移动读指针和写指针就可以实现缓冲区的数据读取和 ...

最新文章

  1. 利用PostMan 模拟上传/下载文件(亲测)
  2. 利用STL离散化处理数据(unique)
  3. python--装饰器(附偏函数、断言)
  4. TOMCAT部署项目的方式
  5. python中set函数_python中set()函数简介及实例解析
  6. 使用Spring Task轻松完成定时任务
  7. 从零开始学Vue(一)—— Vue.js 入门
  8. mac 二进制安装mysql_教程方法;在mac下安装mysql二进制分发版的方法(不是dmg的)电脑技巧-琪琪词资源网...
  9. 照片视频拼接软件哪个好?一半图片一半视频的快速上手教程,朋友圈超吸赞效果
  10. Java Web之三大利器
  11. 短信验证码和邮箱验证码
  12. matlab 遗传算法 ga函数,matlab遗传算法ga函数
  13. 计算机网络技术 网络参数配置与常用网络命令使用和网络仿真工具的使用
  14. DeeCamp 2019 Exam A 纪实
  15. 数据科学分布——Beta分布
  16. 针对于win8、win10无法访问,您可能没有权限使用网络资源的解决方法
  17. ssm毕设项目基于的少儿编程学习系统2lsiy(java+VUE+Mybatis+Maven+Mysql+sprnig)
  18. 解压技巧: iPhone自带的解压缩功能好强
  19. 给本本换硬盘,直接克隆旧盘!
  20. babyos2(30) APIC, local APIC, I/O APIC

热门文章

  1. 1972年发射失败的苏联金星探测器可能今年坠落地球
  2. 怎样在PDF文件上添加印章
  3. 本博客基于Handsome主题的一些小修改教程
  4. AR和VR现在还火爆吗?
  5. js案例一 :方块拖动效果
  6. 日历 设置云端邮件服务器,ios设置MDaemon activesync同步邮件、联系人、日历
  7. Oracle (temporary tablespace)临时表空间说明
  8. 飞鱼服务器 微信,飞鱼微信客服系统软件
  9. 益聚星荣:如何有理有据地给元宇宙泼一盆冷水?
  10. 2020 消息中间件MQ与RabbitMQ面试题大全