Go编程模式--流水线模式
流水线工作模型在工业领域内十分常见,它将工作流程分为多个环节,每个环节根据工作强度安排合适的人员数量。良好的流水线设计尽量让各环节的流通率平衡,最大化提高产能效率。
Go 是一门实用性语言,流水线工作模型与 Go 融合地非常融洽,只不过我们一般使用另一个名词来表示流水线:pipeline。
pipeline
pipeline 由多个环节组成,具体在 Go 中,环节之间通过 channel 通信,同一个环节任务可以由多个 goroutine 来同时处理。
pipeline 的核心是数据,通过 channel 来保证数据流动,每个环节的数据处理由 goroutine 完成。
除了开始环节和结束环节,每个环节都有任意数量的输入 channel 和输出 channel。开始环节被称为发送者或生产者,结束环节被称为接收者或消费者。
下面我们来看一个简单的 pipeline 例子,分为三个环节。
第一个环节,generate
函数:它充当生产者角色,将数据写入 channel,并把该 channel 返回。当所有数据写入完毕,关闭 channel。
func generate(nums ...int) <-chan int {out := make(chan int)go func() {for _, n := range nums {out <- n}close(out)}()return out
}
第二个环节,square
函数:它是数据处理的角色,从开始环节中的 channel 取出数据,计算平方,将结果写入新的 channel ,并把该新的 channel 返回。当所有数据计算完毕,关闭该新 channel。
func square(in <-chan int) <-chan int {out := make(chan int)go func() {for n := range in {out <- n * n}close(out)}()return out
}
main
函数负责编排整个 pipeline ,并充当消费者角色:读取第二个环节的 channel 数据,打印出来。
func main() {// Set up the pipeline.c := generate(2, 3)out := square(c)// Consume the output.for n := range out {fmt.Println(n)}
}
Fan-out,fan-in
在上述例子中,环节之间通过非缓冲的 channel 传递数据,节点中的数据都是单个 goroutine 处理与消费。
这种工作模式并不高效,会让整个流水线的效率取决于最慢的环节。因为每个环节中的任务量是不同的,这意味着我们需要的机器资源是存在差异的。任务量小的环节,尽量占有少量的机器资源,任务量重的环节,需要更多线程并行处理。
以汽车组装为例,我们可以将组装轮胎的工作分发给 4 个人一起干,当轮胎组装完毕之后,再交由剩下的环节。
多个 goroutine 可以从同一个 channel 读取数据,直到该通道关闭,这称为 fan-out(扇出)。
这个称呼比较形象,它将数据进行分散,所以被称为扇出。扇出是一种分发任务的模式。
单个 goroutine 可以从多个输入 channel 中读取数据,直到所有输入都关闭。具体做法是将输入 channel 多路复用到同一个 channel 上,当所有输入 channel 都关闭时,该 channel 也关闭,这称为 fan-in(扇入)。
它将数据进行聚合,所以被称为扇入。扇入是一种整合任务结果的模式。
在汽车组装的例子中,分发轮胎任务给每个人是 Fan-out,合并轮胎组装结果就是 Fan-in。
channel 的多路复用
扇出的编码模型比较简单,本文不多研究,我们提供一个扇入编程示例。
创建一个生成器函数 generate
,通过 interval
参数控制消息生成频率。生成器返回消息 channel mc
与停止 channel sc
,停止 channel 用于停止生成器任务。
func generate(message string, interval time.Duration) (chan string, chan struct{}) {mc := make(chan string)sc := make(chan struct{})go func() {defer func() {close(sc)}()for {select {case <-sc:returndefault:time.Sleep(interval)mc <- message}}}()return mc, sc
}
stopGenerating
函数通过通过向 sc
中传入空结构体,通知 generate
退出,调用 close(mc)
关闭消息 channel
func stopGenerating(mc chan string, sc chan struct{}) {sc <- struct{}{}close(mc)
}
多路复用函数 multiplex
创建并返回整合消息 channel 和控制并发的 wg
。
func multiplex(mcs ...chan string) (chan string, *sync.WaitGroup) {mmc := make(chan string)wg := &sync.WaitGroup{}for _, mc := range mcs {wg.Add(1)go func(mc chan string, wg *sync.WaitGroup) {defer wg.Done()for m := range mc {mmc <- m}}(mc, wg)}return mmc, wg
}
在 main
函数中,创建两个消息 channel 并复用它们生成 mmc
,打印来自 mmc
的每条消息。另外,我们还实现了接收系统断信号(终端上执行 CTRL+C 即可发送中断信号)的优雅的关闭机制。
func main() {// create two sample message and stop channelsmc1, sc1 := generate("message from generator 1", 200*time.Millisecond)mc2, sc2 := generate("message from generator 2", 300*time.Millisecond)// multiplex message channelsmmc, wg1 := multiplex(mc1, mc2)// create errs channel for graceful shutdownerrs := make(chan error)// wait for interrupt or terminate signalgo func() {sc := make(chan os.Signal, 1)signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM)errs <- fmt.Errorf("%s signal received", <-sc)}()// wait for multiplexed messageswg2 := &sync.WaitGroup{}wg2.Add(1)go func() {defer wg2.Done()for m := range mmc {fmt.Println(m)}}()// wait for errorsif err := <-errs; err != nil {fmt.Println(err.Error())}// stop generatorsstopGenerating(mc1, sc1)stopGenerating(mc2, sc2)wg1.Wait()// close multiplexed messages channelclose(mmc)wg2.Wait()
}
总结
本文简单介绍了流水线编程模式,它和我们熟悉的生产者-消费者模式非常相似。
具体到 Go 编程实践中,pipeline 将数据流分为多个环节,channel 用于数据流动,goroutine 用于处理数据。fan-out 用于分发任务,fan-in 用于数据整合,通过 FAN 模式可以让流水线更好地并发。
当然,还有些细节需要注意,例如停止通知机制,可参照本文 channel 的多路复用章节示例中的 stopGenerating
函数;如何通过 sync.WaitGroup
做好并发控制,这些都是需要读者在实际编码中去体会掌握的。
参考
Go Concurrency Patterns: Pipelines and cancellation:https://go.dev/blog/pipelines
Multiplexing Channels In Go:https://medium.com/@ermanimer/multiplexing-channels-in-go-a7dccdcc4134
Go编程模式--流水线模式相关推荐
- Pipeline(流水线)模式
模式名称 Pipeline(流水线)模式 原文:http://www.uml.org.cn/j2ee/201909271.asp 模式解决的问题 有时一些线程的步奏比较冗长,而且由于每个阶段的结果与下 ...
- python的编程模式-实例解析Python设计模式编程之桥接模式的运用
这篇文章主要介绍了Python设计模式编程之桥接模式的运用,桥接模式主张把抽象部分与它的实现部分分离,需要的朋友可以参考下 我们先来看一个例子: #encoding=utf-8 # #by panda ...
- javascript 面向对象编程(工厂模式、构造函数模式、原型模式)
javascript 面向对象编程(工厂模式.构造函数模式.原型模式) CreateTime--2018年3月29日17:09:38 Author:Marydon 一.工厂模式 /*** 工厂模式*/ ...
- 一些实用的编程模式 | Options模式
今天开个新系列,讲一些实用的编程模式,每个编程模式学完后,都能马上在实战中应用起来,让我们写出更富表达力.易维护.好扩展.优雅亿点点的代码. 这些编程模式的示例我会用Go来演示,但其实这些模式大多与语 ...
- Java多线程编程中Future模式的详解
转载自 https://www.cnblogs.com/winkey4986/p/6203225.html Java多线程编程中,常用的多线程设计模式包括:Future模式.Master-Worker ...
- 对《生产流水线模式》讨论的总结性回复
我的上一篇文章<生产流水线模式>发布以后,引起了很多朋友的关注,大家发表了很多意见,现在我针对留言中大家提得比较多的问题,做一个总结性的回复. 问题一:我的敏捷开发架构是不是只实现了简单的 ...
- Java并发编程实战~Thread-Per-Message模式
我们曾经把并发编程领域的问题总结为三个核心问题:分工.同步和互斥.其中,同步和互斥相关问题更多地源自微观,而分工问题则是源自宏观.我们解决问题,往往都是从宏观入手,在编程领域,软件的设计过程也是先从概 ...
- Qt:Qt实现Winsock网络编程—非阻塞模式下的简单远程控制的开发(WSAAsyncSelect)
Qt实现Winsock网络编程-非阻塞模式下的简单远程控制的开发(WSAAsyncSelect) 前言 这边博客应该是 Qt实现Winsock网络编程-TCP服务端和客户端通信(多线程) 的姐妹篇,上 ...
- 设计模式 — 行为型模式 — 解释器模式
目录 文章目录 目录 解释器模式 应用场景 代码示例 解释器模式 解释器模式,开发者自定义一种 "有内涵" 的语言(或者叫字符串),并设定相关的解释规则,输入该字符串后可以输出公认 ...
最新文章
- cookie的作用域
- 一文全览机器学习建模流程(Python代码)
- SAP QM 模块的弊端?
- 小白信用卡提额攻略,2年轻松提额20万!
- Firefox 检测到该服务器正在将此地址的请求循环重定向。 此问题可能是因为禁用或拒绝 Cookie 导致。...
- python __xxxitem__
- 【Cocos2d-X开发学习笔记】第05期:渲染框架之布景层类(CCLayer)的使用
- js+运行+php+文件,php中运行JS
- Word2003和2007如何隐藏去掉回车符
- C#对IE使用Proxy(代理)
- android person类_骚操作:不重启 JVM,如何替换掉已经加载的类?
- 太极团队首发:iOS 8.3完美越狱工具发布
- div盒子边框圆角_div css圆角边框怎么设置
- Windows打开文件后提示,文件或目录损坏无法读取。
- Android图片加载框架最全解析(五),Glide强大的图片变换功能
- MongoDB windows 局域网连接
- 带你走进T-Pot多蜜罐平台革命:简述、安装、使用、优化、更新
- 内积、外积、元素积、克罗内克积的区分及用法【python】
- 使用Python读取raw格式图像并显示
- ios mj_refresh 上拉、下拉、自定义header、footer、afn子类化