Go并发模式:管道与取消
关键字:Go语言,管道,取消机制,并发,sync.WaitGroup,包引用,通道,defer,select
GO并发模式:管道与取消
简介
Go的并发能力可以使构建一个流数据管道变得非常容易,并且可以高校地使用机器I/O和多核处理器。这篇文章展示了一些例子,包括管道,对操作失败的处理技术。
管道的概念
在Go里,并没有正式的管道的定义,它只是众多并发程序其中的一个。通俗来讲,一个管道是一系列由通道连接的阶段,每个阶段都是一组运行着同样函数的goroutine。在每个阶段里,goroutine在干着:
- 通过接入通道(inbound channels)接收上游流下来的值
- 对这些数据执行某个函数,通常会产生新的值
- 通过导出通道(outbound channels)下游发送值
- 第一个阶段也叫source或者producer
- 最后一个阶段也叫sink或者consumer
以上这两个阶段都只能有一个通道,或者是接入通道或者是导出通道,不能同时拥有这两种。而其他每个阶段都可以共同拥有任意数量的接入通道和导出通道。
一个用来学习的例子
下面我们将展开一个简单的管道例子,来阐述其中的思想和技术,后面会有实际的例子。
平方函数
直接看代码中注释。
注意goroutine是函数体内并发,有一个壳sandbox扣着它。
// 要想run,必须package main,默认是文件夹目录名,要更改一下
package mainimport "fmt"// 设想一个拥有三个阶段的管道/** First Stage: gen* params: 一个以逗号分隔的整数列表,数量不限* return: 一个通道,包含参数中整数列表的通道*/
func gen(nums ... int) <-chan int {out := make(chan int)// 通过一个goroutine来将参数中的每个整数发送到通道中去。go func() {for _, n := range nums {out <- n}close(out) // close方法作为上面的for循环的终止条件,不能省略。}()return out
}/** Second Stage: sq* params: 一个包含参数中整数列表的通道* return: 一个包含将参数通道中每个整数平方后的列表的通道* note: 因为参数和返回值的类型都是相同的整型通道,所以可以反复嵌套该方法。*/
func sq(in <-chan int) <-chan int {out := make(chan int)go func() {for n := range in {out <- n * n // 平方}close(out)}()return out
}/** Final Stage: main* 是一个main函数,没有参数也没有返回值,它相当于客户端调用*/
func main() {c := gen(2, 3) // 建立通道out := sq(c) // 通道处理// 上面传入两个值2和3,那么这里就要对应的消费两次输出fmt.Println(<-out)fmt.Println(<-out)// 嵌套sqfor n := range sq(sq(gen(1, 2, 4, 5))) {fmt.Println(n)}
}// output:
// 4
// 9
// 1
// 16
// 256
// 625
Fan-out和Fan-in
- Fan-out,扇出。多个函数可以读取同一个通道直到该通道关闭。可让一群工人并用CPU和IO
- Fan-in,扇入。一个函数可以读取多个输入,每个输入被多路复用到一个独立的通道上,当所有输入被关闭时,这个通道也会被关闭,同时它也会关掉这个函数的使用权。
下面我们将运行连个sq函数的实例,都会读取同一个输入通道,我们将使用一个新函数,叫做merge,来扇入多个结果。
向一个已关闭的通道发送值,会引起通道panic错误,所以引入了sync.WaitGroup功能来控制当所有发送行为结束以后关闭通道。
sync.WaitGroup
sync.WaitGroup像java的倒计时锁,首先我们定义它的Wait方法设置一个锁到某个并发程序中,然后通过Add方法定义计数器大小CounterSize,该大小为最多发送数据到通道的执行次数,每次执行结束要通过Done方法来使CounterSize减一,直到CounterSize为0,上面我们定义的Wait才会释放锁。
注意,WaitGroup的计数器大小CounterSize在初始化时默认为1,也就是说没调用Add之前,需要一次Done方法执行以后,Wait锁才会释放。
merge函数
func merge(cs ...<-chan int) <-chan int {var wg sync.WaitGroup// 定义一个独立通道out,将接收所有值out := make(chan int)// 将通道中所有值转到outoutput := func(c <-chan int) {for n := range c {out <- n}wg.Done()}wg.Add(len(cs))// 将merge参数中所有通道的值都合到唯一通道out上去for _, c := range cs {go output(c)}// 启动一个额外的goroutine(不会按照代码顺序执行,而是一进到merge就会启动)来等待直到所有通道Done以后关闭那个唯一通道out。go func() {wg.Wait()// 直到wg全都Done了才会继续执行。close(out)}()return out
}
Go的包引用问题
当我们要使用其他Go文件内部的函数时,会有两种处理方法:
- 将函数绑定到某个type下,然后在调用时创建那个type的实例即可调用。
- 将函数名首字母大写,我们就可以通过包名调用了。
以上两种方法都会存在一个问题,就是包引用问题,如果你找不到源码位置,调用其函数就无从谈起,那么如何正确的引用包呢?
注意,最容易引发混乱的是main函数,因为main函数是可执行Go文件的必须元素,同时必须是指定package也为main,因此我们尽量不要在main函数所在的Go文件中添加与main无关的内容,否则我们很难通过包名或者文件名定位函数的意思。
注意,Go中最没用的就是Go文件名了,包引用都是通过package。
正确的引用包是:将被调用函数所在文件,声明package为其所在文件夹名字,
注意,所有的该文件夹下的Go文件的package声明必须为同一个,不能出现第二个值,对外部调用者来讲,这些文件好似在一起,都是从一个package读取,并无区分。
然后在调用函数的地方import进来被调用函数声明的package即可。
所以总结一下,文件夹名即包名,文件夹内给Go文件起名要能够解释清楚文件内容,main函数文件指定到有意义的文件夹下,导入所需函数包。
main函数
func main() {in := pipeline.Gen(2, 3)c1 := pipeline.Sq(in)c2 := pipeline.Sq(in)// 将c1和c2通道内的值合并到一起for n := range merge(c1, c2) {fmt.Println(n)}
}// Output:
// 4
// 9
等价于
out:= merge(c1,c2)
fmt.Println(<-out)
fmt.Println(<-out)
fmt.Println(<-out)// 第三次输出,通道已无值,输出零值,如果通道输出完毕时即关闭的话,这一行会报错
// Output:
// 4
// 9
// 0
发现问题?发送次数少于接收次数
上面的管道函数有一个模式:
- 所有的发送操作完成时,阶段会关闭他们的导出通道。
- 阶段会一直从导入通道中接收值,直到那些通道被关闭。
这个模式允许每个接收的阶段可以被作为一个range循环写入,并且保证一旦所有的值都已经成功发送下游,所有的goroutine退出。
但是在真实的管道里,阶段不会总是能接收到所有的导入值。有时候这是由于一个设计:
接收者可能只需要一个值的子集来取得进展。
更常见的是,一个阶段早早退出是因为一个导入值代表了一个更早阶段的error。在这两种情况下,接收方不应该等待其余值的到达,并且我们想要更早的阶段来停止那些后期阶段不需要的生产时期的值。
在我们的例子中,
out:= merge(c1,c2)
fmt.Println(<-out)
// Output:
// 4
实际上out通道中还有一个9没有被输出,通道的值此时没有被完全消费,这时goroutine就会不断尝试发送该值,并且会被无限期阻塞。
这是一个资源泄露,goroutine消耗内存和运行时资源,并且在goroutine栈中的堆引用会一直防止数据被垃圾收集器回收。goroutine不可被垃圾收集,只能必须靠自己exit。
所以,我们需要找到方式,能够在下游阶段接收所有导入值失败的时候,上游阶段的管道仍旧能够退出:
- 一种方式是改变导出通道让它又有一个buffer缓冲区,一个缓冲区能够持有一个固定数量的值,如果缓冲区内仍有空间,发送操作就立即完成。
缓冲区的内容我们在前面的文章中有仔细介绍。
总之就是可以释放goroutine的发送操作到缓冲区,不会被无限期阻塞。
在我们的管道中,返回到被阻塞的goroutine,我们可能考虑到添加一个缓冲区到merge函数返回的导出通道:
out := make(chan int, 1)// 增加一个缓冲区,可以存放通道中未发送的值
但是问题仍在发生,我们这里是因为知道我们上面只写了一遍发送,而通道已知有两次接收值,所以我们可以这么干,但是这个代码是不好的,易碎的,一旦条件发生改变,就要对代码进行调整。
因此,我们需要为下游阶段提供一种方式来象征发送者,来停止接收输入。
明确的取消机制
当main函数决定退出,而不再接收任何out通道的值的时候,它必须告诉上游的goroutine,放弃他们试图发送的值。
在一个通道中如此操作发送值,被称作done。
它发送两个值因为有两个潜在的阻塞发送者。
我们修改merge,给它加入一个参数是struct{}结构体通道。然后修改merge中的output函数,将原来的
out <- n:
替换为:
select {
case out <- n:
case <-done:
}
意思是:如果n还有未发送的值,就正常发送,如果done有未发送的值就发送done。然后我们再修改一下main函数:
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out)
done <- struct{}{}
done <- struct{}{}
当out只被输出一次的时候,此时循环还剩两次(总共三次,因为merge函数的参数有三个通道,会循环三次),为了避免循环阻塞在out输出的位置,我们给done通道传入了结构体零值,merge函数中那个循环就会放弃发送out值,而去执行done的发送。
但是问题仍在继续,这里仍旧是因为我们预知通道接收次数,以及发送放空次数,所以可以写出这个顺序和次数,这仍旧是易碎的,本质上除了让我们学习了一下这种写法,与上面发生的无异。
我们需要一种方式,可以在未知goroutine数量,未知通道大小的情况下,随时按需阻止下游阶段发送未发送完毕的通道。
因为接收操作在一个封闭的通道可以总是立即执行,产生类元素的零值。
这就意味着main函数能够对所有被done通道关闭的发送者解除阻塞。这实际上是一个广播信号发送者。我们扩展管道功能的each来接收done作为一个参数来安排通过defer来延迟关闭,以便所有的main函数的返回路径能够发送信号到管道阶段去退出。
先看merge函数:
defer wg.Done()
for n := range c {select {case out <- n:case <-done:return}
}
我们在for循环前面加入了一行sync.WaitGroup的Done的延迟方法,然后修改了select内部当done可被输出时,直接结束merge函数(更别提跳出循环了),直接执行defer的wg.Done去掉一次计数器数值。然后看main函数:
func main() {done := make(chan struct{})defer close(done)in := pipeline.Gen(2, 3)c1 := pipeline.Sq(in)c2 := pipeline.Sq(in)out := merge(done, c1, c2)fmt.Println(<-out)
}
首先我们去掉了done通道的缓冲区,加了一行关闭done通道的延迟操作。当代码执行玩fmt的一次输出以后,main函数执行完毕,会调用defer关闭done通道,回到merge函数中,done通道被关闭以后,case ->done被执行merge函数执行完毕,执行wg.Done()。
总结
本文详细阐述了Go管道的概念,是有三组动作:生产通道,处理通道,使用通道,这三组动作实现了Go的管道。通过一个例子我们搞清楚了管道的含义,接着又介绍了Fan-out,是关于多个函数对同一个通道的操作,以及一个函数对多个通道的操作(例子中使用了merge,将多个通道合并为一个)。这期间,我们研究了sync.WaitGroup以及Go语言中的包引用特性。最后,我们在例子中发现了管道并发的问题,并循序渐进地找到了解决方法,在此期间,让我们加深了对defer,管道,通道,select的理解。
参考资料
- Go官方文档
源码位置
更多请转到醒者呆的博客园
转载于:https://www.cnblogs.com/Evsward/p/goPipeline.html
Go并发模式:管道与取消相关推荐
- 设计模式——管道模式(并发模式)
1.pipeline简介 pipeline又称为管道,是一种在计算机普遍使用的技术.举个最普遍的例子,如下图所示cpu流水线,一个流水线分为4部分,每个部分可以独立工作,于是可以处理多个数据流.lin ...
- Linux网络编程 | 并发模式:半同步/半异步模式、领导者/追随者模式
文章目录 同步与异步 半同步/半异步模式 变体:半同步/半反应堆模式 改进:更高效的半同步/半异步模式 领导者/追随者模式 组件 :句柄集.线程集.事件处理器 并发模式是指I/O处理单元和多个逻辑单元 ...
- Linux服务器 | 服务器模型与三个模块、两种并发模式:半同步/半异步、领导者/追随者
文章目录 两种服务器模型及三个模块 C/S模型 P2P模型 I/O处理单元.逻辑单元.存储单元 并发 同步与异步 半同步/半异步模式 变体:半同步/半反应堆模式 改进:高效的半同步/半异步模式 领导者 ...
- 并发编程 07—— 任务取消
Java并发编程实践 目录 并发编程 01-- ThreadLocal 并发编程 02-- ConcurrentHashMap 并发编程 03-- 阻塞队列和生产者-消费者模式 并发编程 04-- 闭 ...
- 通关GO语言11 并发模式:Go 语言中即学即用的高效并发模式
上节课我为你讲解了如何通过 Context 更好地控制多个协程,课程最后的思考题是:如何通过 Context 实现日志跟踪? 要想跟踪一个用户的请求,必须有一个唯一的 ID 来标识这次请求调用了哪些函 ...
- Go语言中常见的并发模式
Go语言最吸引人的地方是它内建的并发支持.Go语言并发体系的理论是C.A.R Hoare在1978年提出的通信顺序进程(Communicating Sequential Process,CSP).CS ...
- 高并发模式———领导者/追随者模式和半同步/半异步模式
高并发模式 高并发模式的意义:对于IO密集型的操作,因为IO操作速度远远小于CPU的计算速度,所以,如果程序阻塞于IO操作将大量浪费CPU.但是如果多线程的情况下,被IO阻塞的线程可以放弃CPU(或操 ...
- Linux网络编程——Day12 两种高效的并发模式
今天继续学习高性能服务器框架,上一篇关于高性能服务器的基础知识连接如下: Linux网络编程-Day11 高性能服务器程序框架_Jane_Librastar的博客-CSDN博客https://blog ...
- [WCF编程]13.并发:服务并发模式
一.概述 传入的客户端调用消息会分发给Windows I/O线程池(线程默认为1000)上的服务实例.多个客户端可以发起多个并发的调用,并且服务可以在多个线程上处理这些请求.如果传入的调用分发给同一个 ...
- windows下多进程加协程并发模式
好久没更新博客了.正好最近要整理一下最近这段时间做过的项目以及学习python的一些心得.如标题所示,今天就来说说windows下多进程加协程并发模式.其实网上还是蛮多在linux下的多进程加协程并发 ...
最新文章
- 【python图像处理】彩色映射(续篇)
- 2020CCPC(长春) - Strange Memory(树上启发式合并+位运算)
- http请求POST方式发送获得返回值
- kubernetes 数据_为什么数据科学家喜欢Kubernetes
- SVN-功能介绍之切换
- 飘了!英特尔 2 年内要发布高效芯片超过苹果 M1
- 谷歌升级云数据库:更多的储存及更快的读取
- c++编游戏-扫雷-c++游戏将彩色化-windows7自带扫雷游戏休闲娱乐
- 贝叶斯分析之利用线性回归模型理解并预测数据(三)
- 【黑帽SEO系列】网页劫持
- 华为交换机如何清除console口密码
- bellman ford java_Bellman-Ford算法
- 预防ddos攻击常用方法有哪些
- matlab 龙格-库塔 法求解常微分方程
- 2015年App Store审核被拒的23个理由
- MAC上安装Ubantu双系统
- OJ考试特别版,数组模拟链表(比正儿八经用链表简单,结果还对,何乐而不为)
- STM32F10xxx中文板参考手册PDF(内有英文版链接)
- [小技巧]chrome 标签切换快捷键
- ECharts x,y轴分别添加自定义的滚动条
热门文章
- 帆软层次坐标常用公式整理
- iphone开机白苹果_iphone白苹果原因是什么 iphone白苹果解决方法【介绍】
- 西门子step7安装注册表删除_不用重装系统就能完全卸载西门子PLC编程软件STEP 7...
- 安卓开发 实现文字渐变效果_AI教程!用网格工具做渐变字效
- 史上最全的面试宝典,让你轻松入职
- 10、Android--技巧
- MySQL8的新特性ROLE
- 过滤器 拦截器 controller 页面 的执行顺序
- django 学习个人总结 之many_to_one
- Redis Cluster 的安装和配置(1)