不同于传统的多线程并发模型使用共享内存来实现线程间通信的方式,golang 的哲学是通过 channel 进行协程(goroutine)之间的通信来实现数据共享: > Do not communicate by sharing memory; instead, share memory by communicating.

这种方式的优点是通过提供原子的通信原语,避免了竞态情形(race condition)下复杂的锁机制。
channel 可以看成一个 FIFO 队列,对 FIFO 队列的读写都是原子的操作,不需要加锁。对 channel 的操作行为结果总结如下:

操作 nil channel closed channel not-closed non-nil channel
close panic panic 成功 close
写 ch <- 一直阻塞 panic 阻塞或成功写入数据
读 <- ch 一直阻塞 读取对应类型零值 阻塞或成功读取数据

读取一个已关闭的 channel 时,总是能读取到对应类型的零值,为了和读取非空未关闭 channel 的行为区别,可以使用两个接收值:

// ok is false when ch is closed
v, ok := <-ch

golang 中大部分类型都是值类型(只有 slice / channel / map 是引用类型),读/写类型是值类型的 channel 时,如果元素 size 比较大时,应该使用指针代替,避免频繁的内存拷贝开销。

内部实现

如图所示,在 channel 的内部实现中(具体定义在 $GOROOT/src/runtime/chan.go 里),维护了 3 个队列:

  • 读等待协程队列 recvq,维护了阻塞在读此 channel 的协程列表
  • 写等待协程队列 sendq,维护了阻塞在写此 channel 的协程列表
  • 缓冲数据队列 buf,用环形队列实现,不带缓冲的 channel 此队列 size 则为 0

当协程尝试从未关闭的 channel 中读取数据时,内部的操作如下:

  1. 当 buf 非空时,此时 recvq 必为空,buf 弹出一个元素给读协程,读协程获得数据后继续执行,此时若 sendq 非空,则从 sendq 中弹出一个写协程转入 running 状态,待写数据入队列 buf ,此时读取操作 <- ch 未阻塞;
  2. 当 buf 为空但 sendq 非空时(不带缓冲的 channel),则从 sendq 中弹出一个写协程转入 running 状态,待写数据直接传递给读协程,读协程继续执行,此时读取操作 <- ch 未阻塞;
  3. 当 buf 为空并且 sendq 也为空时,读协程入队列 recvq 并转入 blocking 状态,当后续有其他协程往 channel 写数据时,读协程才会重新转入 running 状态,此时读取操作 <- ch 阻塞。

类似的,当协程尝试往未关闭的 channel 中写入数据时,内部的操作如下:

  1. 当队列 recvq 非空时,此时队列 buf 必为空,从 recvq 弹出一个读协程接收待写数据,此读协程此时结束阻塞并转入 running 状态,写协程继续执行,此时写入操作 ch <- 未阻塞;
  2. 当队列 recvq 为空但 buf 未满时,此时 sendq 必为空,写协程的待写数据入 buf 然后继续执行,此时写入操作 ch <- 未阻塞;
  3. 当队列 recvq 为空并且 buf 为满时,此时写协程入队列 sendq 并转入 blokcing 状态,当后续有其他协程从 channel 中读数据时,写协程才会重新转入 running 状态,此时写入操作 ch <- 阻塞。

当关闭 non-nil channel 时,内部的操作如下:

  1. 当队列 recvq 非空时,此时 buf 必为空,recvq 中的所有协程都将收到对应类型的零值然后结束阻塞状态;
  2. 当队列 sendq 非空时,此时 buf 必为满,sendq 中的所有协程都会产生 panic ,在 buf 中数据仍然会保留直到被其他协程读取。

使用场景

除了常规的用来在协程之间传递数据外,本节列出了一些特殊的使用 channel 的场景。

futures / promises

golang 虽然没有直接提供 futrue / promise 模型的操作原语,但通过 goroutine 和 channel 可以实现类似的功能:

package mainimport ("io/ioutil""log""net/http"
)// RequestFuture, http request promise.
func RequestFuture(url string) <-chan []byte {c := make(chan []byte, 1)go func() {var body []bytedefer func() {c <- body}()res, err := http.Get(url)if err != nil {return}defer res.Body.Close()body, _ = ioutil.ReadAll(res.Body)}()return c
}func main() {future := RequestFuture("https://api.github.com/users/octocat/orgs")body := <-futurelog.Printf("reponse length: %d", len(body))
}

条件变量(condition variable)

类型于 POSIX 接口中线程通知其他线程某个事件发生的条件变量,channel 的特性也可以用来当成协程之间同步的条件变量。因为 channel 只是用来通知,所以 channel 中具体的数据类型和值并不重要,这种场景一般用 strct {} 作为 channel 的类型。

一对一通知

类似 pthread_cond_signal() 的功能,用来在一个协程中通知另个某一个协程事件发生:

package mainimport ("fmt""time"
)func main() {ch := make(chan struct{})nums := make([]int, 100)go func() {time.Sleep(time.Second)for i := 0; i < len(nums); i++ {nums[i] = i}// send a finish signalch <- struct{}{}}()// wait for finish signal<-chfmt.Println(nums)
}

广播通知

类似 pthread_cond_broadcast() 的功能。利用从已关闭的 channel 读取数据时总是非阻塞的特性,可以实现在一个协程中向其他多个协程广播某个事件发生的通知:

package mainimport ("fmt""time"
)func main() {N := 10exit := make(chan struct{})done := make(chan struct{}, N)// start N worker goroutinesfor i := 0; i < N; i++ {go func(n int) {for {select {// wait for exit signalcase <-exit:fmt.Printf("worker goroutine #%d exit\n", n)done <- struct{}{}returncase <-time.After(time.Second):fmt.Printf("worker goroutine #%d is working...\n", n)}}}(i)}time.Sleep(3 * time.Second)// broadcast exit signalclose(exit)// wait for all worker goroutines exitfor i := 0; i < N; i++ {<-done}fmt.Println("main goroutine exit")
}

信号量

channel 的读/写相当于信号量的 P / V 操作,下面的示例程序中 channel 相当于信号量:

package mainimport ("log""math/rand""time"
)type Seat int
type Bar chan Seatfunc (bar Bar) ServeConsumer(customerId int) {log.Print("-> consumer#", customerId, " enters the bar")seat := <-bar // need a seat to drinklog.Print("consumer#", customerId, " drinks at seat#", seat)time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))log.Print("<- consumer#", customerId, " frees seat#", seat)bar <- seat // free the seat and leave the bar
}
func main() {rand.Seed(time.Now().UnixNano())bar24x7 := make(Bar, 10) // the bar has 10 seats// Place seats in an bar.for seatId := 0; seatId < cap(bar24x7); seatId++ {bar24x7 <- Seat(seatId) // none of the sends will block}// a new consumer try to enter the bar for each secondfor customerId := 0; ; customerId++ {time.Sleep(time.Second)go bar24x7.ServeConsumer(customerId)}
}

互斥量

互斥量相当于二元信号里,所以 cap 为 1 的 channel 可以当成互斥量使用:

package mainimport "fmt"func main() {mutex := make(chan struct{}, 1) // the capacity must be onecounter := 0increase := func() {mutex <- struct{}{} // lockcounter++<-mutex // unlock}increase1000 := func(done chan<- struct{}) {for i := 0; i < 1000; i++ {increase()}done <- struct{}{}}done := make(chan struct{})go increase1000(done)go increase1000(done)<-done<-donefmt.Println(counter) // 2000
}

关闭 channel

关闭不再需要使用的 channel 并不是必须的。跟其他资源比如打开的文件、socket 连接不一样,这类资源使用完后不关闭后会造成句柄泄露,channel 使用完后不关闭也没有关系,channel 没有被任何协程用到后最终会被 GC 回收。关闭 channel 一般是用来通知其他协程某个任务已经完成了。golang 也没有直接提供判断 channel 是否已经关闭的接口,虽然可以用其他不太优雅的方式自己实现一个:

package mainfunc isClosed(ch chan int) bool {select {case <-ch:return truedefault:}return false
}

不过实现一个这样的接口也没什么必要。因为就算通过 isClosed() 得到当前 channel 当前还未关闭,如果试图往 channel 里写数据,仍然可能会发生 panic ,因为在调用 isClosed() 后,其他协程可能已经把 channel 关闭了。
关闭 channel 时应该注意以下准则:

  • 不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;
  • 有多个写入端时,不要再写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;
  • 如果只有一个写入端,可以在这个写入端放心关闭 channel 。

关闭 channel 粗暴一点的做法是随意关闭,如果产生了 panic 就用 recover 避免进程挂掉。稍好一点的方案是使用标准库的 sync 包来做关闭 channel 时的协程同步,不过使用起来也稍微复杂些。下面介绍一种优雅些的做法。

一写多读

这种场景下这个唯一的写入端可以关闭 channel 用来通知读取端所有数据都已经写入完成了。读取端只需要用 for range 把 channel 中数据遍历完就可以了,当 channel 关闭时,for range 仍然会将 channel 缓冲中的数据全部遍历完然后再退出循环:

package mainimport ("fmt""sync"
)func main() {wg := &sync.WaitGroup{}ch := make(chan int, 100)send := func() {for i := 0; i < 100; i++ {ch <- i}// signal sending finishclose(ch)}recv := func(id int) {defer wg.Done()for i := range ch {fmt.Printf("receiver #%d get %d\n", id, i)}fmt.Printf("receiver #%d exit\n", id)}wg.Add(3)go recv(0)go recv(1)go recv(2)send()wg.Wait()
}

多写一读

这种场景下虽然可以用 sync.Once 来解决多个写入端重复关闭 channel 的问题,但更优雅的办法设置一个额外的 channel ,由读取端通过关闭来通知写入端任务完成不要再继续再写入数据了:

package mainimport ("fmt""sync"
)func main() {wg := &sync.WaitGroup{}ch := make(chan int, 100)done := make(chan struct{})send := func(id int) {defer wg.Done()for i := 0; ; i++ {select {case <-done:// get exit signalfmt.Printf("sender #%d exit\n", id)returncase ch <- id*1000 + i:}}}recv := func() {count := 0for i := range ch {fmt.Printf("receiver get %d\n", i)count++if count >= 1000 {// signal recving finishclose(done)return}}}wg.Add(3)go send(0)go send(1)go send(2)recv()wg.Wait()
}

多写多读

这种场景稍微复杂,和上面的例子一样,也需要设置一个额外 channel 用来通知多个写入端和读取端。另外需要起一个额外的协程来通过关闭这个 channel 来广播通知:

package mainimport ("fmt""sync""time"
)func main() {wg := &sync.WaitGroup{}ch := make(chan int, 100)done := make(chan struct{})send := func(id int) {defer wg.Done()for i := 0; ; i++ {select {case <-done:// get exit signalfmt.Printf("sender #%d exit\n", id)returncase ch <- id*1000 + i:}}}recv := func(id int) {defer wg.Done()for {select {case <-done:// get exit signalfmt.Printf("receiver #%d exit\n", id)returncase i := <-ch:fmt.Printf("receiver #%d get %d\n", id, i)time.Sleep(time.Millisecond)}}}wg.Add(6)go send(0)go send(1)go send(2)go recv(0)go recv(1)go recv(2)time.Sleep(time.Second)// signal finishclose(done)// wait all sender and receiver exitwg.Wait()
}

总结

channle 作为 golang 最重要的特性,用起来还是比较爽的。传统的 C 里要实现类型的功能的话,一般需要用到 socket 或者 FIFO 来实现,另外还要考虑数据包的完整性与并发冲突的问题,channel 则屏蔽了这些底层细节,使用者只需要考虑读写就可以了。 channel 是引用类型,了解一下 channel 底层的机制对更好的使用 channel 还是很用必要的。虽然操作原语简单,但涉及到阻塞的问题,使用不当可能会造成死锁或者无限制的协程创建最终导致进程挂掉。
channel 除在可以用来在协程之间通信外,其阻塞和唤醒协程的特性也可以用作协程之间的同步机制,文中也用示例简单介绍了这种场景下的用法。
关闭 channel 并不是必须的,只要没有协程没用引用 channel ,最终会被 GC 清理。所以使用的时候要特别注意,不要让协程阻塞在 channel 上,这种情况很难检测到,而且会造成 channel 和阻塞在 channel 的协程占有的资源无法被 GC 清理最终导致内存泄露。
channle 方便 golang 程序使用 CSP 的编程范形,但是 golang 是一种多范形的编程语言,golang 也支持传统的通过共享内存来通信的编程方式。终极的原则是根据场景选择合适的编程范型,不要因为 channel 好用而滥用 CSP 。

golang channel 管道 通道 信道 使用总结相关推荐

  1. golang channel 管道 有无缓存的区别

    无缓冲的与有缓冲channel有着重大差别,那就是一个是同步的 一个是非同步的. 比如 c1:=make(chan int)         无缓冲 c2:=make(chan int,1)      ...

  2. golang channel 管道

    channel是Go中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication). 它的操作符是箭头 <- . ch <- v ...

  3. 专科 java转go 翱翔之路(二)基础语法:匿名组合,方法,接口,map,json,异常处理,channel管道,select用法

    2.4 面向对象编程 2.4.1匿名组合 type Person struct {id intname stringage int }type Student struct {Person //只有类 ...

  4. Golang channel 快速入门

    文章目录 1.简介 2.缓冲 channel 3.range 和 close 操作 4.select 操作 5.注意要点 6.常见用法 参考文献 1.简介 channel 提供了一种通信机制,通过发送 ...

  5. Java语言进阶:Channel(通道)

    Java语言进阶:Channel(通道) Channel概述 Channel(通道):Channel是一个接口,可以通过它读取和写入数据, 可以把它看做是IO中的流,不同的是:Channel是双向的, ...

  6. golang 协程 通道channel阻塞

    说到channel,就一定要说一说线程了.任何实际项目,无论大小,并发是必然存在的.并发的存在,就涉及到线程通信.在当下的开发语言中,线程通讯主要有两种,共享内存与消息传递.共享内存一定都很熟悉,通过 ...

  7. golang协程——通道channel阻塞

    说到channel,就一定要说一说线程了.任何实际项目,无论大小,并发是必然存在的.并发的存在,就涉及到线程通信.在当下的开发语言中,线程通讯主要有两种,共享内存与消息传递.共享内存一定都很熟悉,通过 ...

  8. 卷毛0基础学习Golang-并发编程-03 channel管道

    channel channel是Go语言中的一个核心类型,可以把它看成管道.并发核心单元通过它就可以发送或者接收数据进行通讯,这在一定程度上又进一步降低了编程的难度. channel是一个数据类型,主 ...

  9. Golang channel 源码分析

    以下源码都摘自 golang 1.16.15 版本. 1. channel 底层结构 Golang 中的 channel 对应的底层结构为 hchan 结构体(channel的源码位置在Golang包 ...

最新文章

  1. icinga2+postgresql
  2. 现代密码学5.4--对哈希函数的攻击
  3. Netty(一)——Netty入门程序
  4. boost::safe_numerics模块实现混合类型产生令人惊讶的结果的测试程序
  5. 前端学习(1720):前端系列javascript之生命周期下
  6. OJ1068: 二进制数(C语言)
  7. [Python] L1-040. 最佳情侣身高差 团体程序设计天梯赛GPLT
  8. MindManager中读图工具的使用
  9. 怎么看服务器是实体机还是虚拟机,如何判断一台机器是物理机还是虚拟机
  10. html在线取色,JS实现的RGB网页颜色在线取色器完整实例
  11. 利用eclipse自定义模板创建日志打印模板
  12. 通过双网卡电脑将网络共享到路由器
  13. 伏临扰雨(北京的雨季)
  14. 剑侠世界职业优缺点简介
  15. 如何找回Nessus密码?
  16. Java开发指南!java生成word文档修改样式
  17. 苹果xr怎么截屏_手机资讯:iPhone XR更新系统后无信号怎么办iPhone XR无信号解决办法...
  18. 关于av_freep
  19. el-select动态清除value、el-input回车上传数据
  20. 基于图像的三维重建研究

热门文章

  1. Nova Conductor 与 Versioned Object Model 机制
  2. 解决apt-get /var/lib/dpkg/lock-frontend 问题
  3. vmlinux、 Image, zImage、 uImage 的区别
  4. stm32 Boot0,Boot1引脚设置
  5. 当我们在讨论奢侈品行业时,人工智能可以做什么?
  6. DPM 2007SRT及DPM 2010 BMR祼金属还原总结
  7. 50个photoshop网页设计教程-整体布局篇
  8. nginx连接php-fpm sock文件失败502
  9. wxruby框架例子1
  10. 在SQL Server 2005中实现表的行列转换()