在这篇文章中,我将介绍在 Go 中使用基本并发模式和原生原语来构建并发应用程序的一些最佳实践。模式本身适用于任何语言,但对于这些示例,我们将使用 Go。

可以下载本文的源码配合阅读。

git clone https://github.com/benjivesterby/may2021-triangle-meetup.git

01

流水线模式

流水线模式 1 2 通常由若干个阶段组成,每个阶段都通过一系列 channel 连接。第一阶段是数据来源,最后阶段是数据汇总。数据流水线就是很好的例子,第一步挖掘数据,接下来清洗数据,最后一步将清洗的数据存储到数据库中。

Example Pipeline

如上图所示,第一阶段是数据的来源,最后阶段为数据汇总。下面是流水线模式实战的示例代码。

// Example of a pipeline pattern
func main() {// Call each function passing the output of the previous function// as the input to the next functiond := []data{// ... data ...}// Create the pipelinesink(sanitize(source(d)))
}func source(in []data) <-chan data {out := make(chan data)go func(in []data) {defer close(out)for _, d := range data {// Load data into the front of the pipelineout <- d}}(in)return out
}func sanitize(in <-chan data) <-chan data {out := make(chan data)go func(in <-chan data, out chan<-data) {defer close(out)for {d, ok := <- inif !ok {return}// ... Do some work// push the data outout <- d}}(in, out)return out
}func sink(in <-chan data) {for {d, ok := <- inif !ok {return}// ... Process the sanitized data}
}

有关该模式可执行的示例代码,可以查看本文的 Github 项目代码 pipeline 目录。在项目根目录下运行下面的命令来执行流水线示例。

go run patterns/pipeline/main.go

流水线模式通过一系列 goroutine 组成,每个 goroutine 由一个 channel 传入数据,另一个 channel 输出数据。使用此模式,你可以创建任何大小和复杂性的流水线。

02

扇出模式

扇出模式是指允许多个协程来使用来自单个数据源的的模式。当你需要对多协程进行的大数据处理做负载均衡时,该模式很有用。

有关该模式可执行的示例代码,可以查看本文的 Github 项目代码 fan-out 目录。在项目根目录下运行下面的命令来执行扇出示例。

go run patterns/fanout/main.go

Example Fan-Out

下面是扇出模式的示例代码,其中数据被一系列的工作协程处理。

// Example of a fan-out pattern for processing data in parallel
func main() {dataPipeline := make(chan data)// Create three worker routinesgo process(dataPipeline)go process(dataPipeline)go process(dataPipeline)// Load the data into the pipelinefor _, d := range ReadData() {dataPipeline <- d}// Close the pipeline when all data has been read in// NOTE: This will immediately close the application regardless// of whether the workers have completed their processing.// See the section on Best Practices at the end of the post// for more information on how to handle this.close(dataPipeline)
}func ReadData() []data {// ...
}func process(in <-chan data) {for {d, ok := <- inif !ok {return}// ... Do some work}
}

上面例子中 process 函数被调用了三次,每个 goroutine 各行其事。channel in 作为入参传给每个 goroutine ,并通过一个 for 循环读取数据。当数据读取完毕时,channel in 被关闭。

03

复制模式

我们看到扇出模式不仅可以用来并行处理数据,还可以用作多线程复制数据。

// Example of a fan-out replication pattern
func main() {dataPipeline := make(chan data)// Pass the write-only channels from the three proc calls to the fan-outgo replicate(dataPipeline, proc1(), proc2(), proc3())// Load the data into the pipelinefor _, d := range ReadData() {dataPipeline <- d}
}func proc1() chan<- data {/* ... */}
func proc2() chan<- data {/* ... */}
func proc3() chan<- data {/* ... */}func replicate(in <-chan data, outgoing ...chan<- data) {for {d, ok := <- in // Read from the input channelif !ok {return}// Replicate the data to each of the outgoing channelsfor _, out := range outgoing {out <- d}}
}

当使用复制模式的时候请注意数据的类型。特别是使用指针时候,因为复制器不是在复制数据而是在传递指针。

上面是一个扇出模式的示例代码,replicate 函数通过可变参数传入了三个 channel 被调用。channel in 提供了原始数据,并将其复制到输出 channel。

04

类型扇出

最后一个扇出模型我们将介绍 类型扇出 。当处理 interface{} 类型的 channel 时非常有用。此模式允许根据数据类型将数据定向到适当的处理器。

// Example of a type fan-out pattern from the github project
// for this post
func TypeFan(in <-chan interface{}) (<-chan int,<-chan string,<-chan []byte,) {ints := make(chan int)strings := make(chan string)bytes := make(chan []byte)go func(in <-chan interface{},ints chan<- int,strings chan<- string,bytes chan<- []byte,) {defer close(ints)defer close(strings)defer close(bytes)for {data, ok := <-inif !ok {return}// Type Switch on the data coming in and// push the data to the appropriate channelswitch value := data.(type) {case nil: // Special case in type switch// Do nothingcase int:ints <- valuecase string:strings <- valuecase []byte:bytes <- valuedefault:fmt.Printf("%T is an unsupported type", data)}}}(in, ints, strings, bytes)return ints, strings, bytes
}

上面的例子展示了如何接收一个空的 interface(即 interface{})并使用类型开关来决定将数据发送到哪个 channel。

05

扇入/合并器模式

使用扇入模式时,数据会从多个 channel 读取后合并到一个 channel 输出。3 扇入模式与扇出模式刚好相反。

有关该模式可执行的示例代码,可以查看本文的 Github 项目代码 fan-in 目录。在项目根目录下运行下面的命令来执行扇入示例。

go run patterns/fanin/main.go

Example Fan-In

下面是一个扇入模式的代码示例,数据被多个工作协程挖掘并全部集中到一个单独的 channel 中。

// Example of a fan-in pattern mining data in
// parallel, but processing it all synchronously
func main() {miner1 := mine()miner2 := mine()miner3 := mine()// Take miner data and consolidate it into a single channelout := consolidate(miner1, miner2, miner3)// Process the datafor {data, ok := <- outif !ok {return}// ... Do some work}
}func consolidate(miners ...<-chan data) <-chan data {out := make(chan data)// Iterate over the miners and start a routine for// consuming each one and merging them into the outputfor _, in := range miners {go func(in <-chan data, out chan<- data) {for {d, ok := <-in // Pull data from the minerif !ok {return}out <- d // Send the data to the output}}(in, out)}return out
}func mine() <-chan data {out := make(chan data)go func() {// ... populate the channel with mined data}()return out
}

上面的例子利用扇入模式从多个模拟数据挖掘器中整合输入的数据。

06

组合和嵌套模式

这些模式中的每一个都可以组合起来以创造更复杂的模式。这会非常有用,因为大多数应用不会只使用一种并发模式。

下面是一个将所有模式组合到一个请求响应生命周期的例子。该例子中,数据来自单一来源,扇出到多个流水线,最后扇入成单个响应返回给用户。

当构建应用程序时,我建议先构建一个设计图来帮助概念化这些并发设计元素。我真的很喜欢用 diagrams.net 来画设计图。构建这些设计图的过程有助于巩固最终产品并使设计更易于理解。将设计作为流程的一部分也将有助于将设计出售给其他工程师和领导层。

尽可能使用 Go 的原生并发原语

07

最佳实践

虽然主要使用 Go 的并发原语来管理 Go 应用程序中的并发被认为是最佳实践,但是在某些情况下,需要使用 sync 包来辅助管理并发。

一个很好的例子就是当你实现了类似于io.Closer 时需要确保所有协程都退出了。例如,如果你的代码产生了 N 个协程并且希望调用 io.Closer 方法时所有的协程都正确退出,你可以使用 sync.WaitGroup 来等待所有的协程直到它们被关闭。

执行此操作方法如下所示。

// Example of using a wait group to
// ensure all routines are properly exited
type mytype struct {ctx context.Contextcancel context.CancelFuncwg sync.WaitGroupwgMu sync.Mutex
}// DoSomething spawns a go routine
// every time it's called
func (t *mytype) DoSomething() {// Increment the waitgroup to ensure// Close properly blockst.wgMu.Lock()t.wg.Add(1)t.wgMu.Unlock()go func() {// Decrement the waitgroup// when the routine exitsdefer t.wg.Done()// ... do something}()
}func (t *mytype) Close() error {// Cancel the internal contextt.cancel()// Wait for all routines to exitt.wgMu.Lock()t.wg.Wait()t.wgMu.Unlock()return nil
}

上面的代码有几个重点。首先,使用 sync.WaitGroup来增加和减少正在运行的协程数量。其次,使用 sync.Mutex 来确保只有同时只有一个协程在修改 sync.WaitGroup.Wait() 方法不需要 mutex )

单击此处阅读 Leo Lara 对此的详细解释 4

有关需要使用 sync 包的情况的功能示例,请参阅 Plex 库。

08

泛型 (即将到来!)

随着 Go 1.18 中泛型的引入,这些模式使用起来会变得更加容易,我将在下一篇文章中介绍。

  1. Pipelines - Concurrency in Go by Katherine Cox-Buday - Page 100  ↩︎

  2. Go Concurrency Patterns: Pipelines and cancellation  ↩︎

  3. Multiplexing - Concurrency in Go by Katherine Cox-Buday - Page 117  ↩︎

  4. Closing a Go channel written by several goroutines   ↩︎

原文信息

原文地址:https://benjiv.com/beginning-concurrency-patterns/

原文作者:Benjamin Vesterby

本文永久链接:https://github.com/gocn/translator/blob/master/2022/w09_Beginning_Concurrency_Patterns.md

译者:朱亚光

校对:4+6

往期回顾

BREAK AWAY

『每周译Go』Go原生并发基本原理与最佳做法

『每周译Go』以Go为例-探究并行与并发的区别

想要了解关于 Go 的更多资讯,还可以通过扫描的方式,进群一起探讨哦~

『每周译Go』开启并发模式相关推荐

  1. 『每周译Go』Go sync map 的内部实现

    目录 引言 a. 简单介绍并发性及其在此上下文中的应用 sync.RWMutex 和 map 一起使用的问题 介绍 sync.Map a. 在哪些场景使用 sync.Map? sync.Map 实现细 ...

  2. 『每周译Go』那些年我使用Go语言犯的错

    原文地址:https://henvic.dev/posts/my-go-mistakes/ 原文作者:Henrique Vicente 本文永久链接:https://github.com/gocn/t ...

  3. 『每周译Go』Rust 与 Go: 为何相得益彰

    虽然有一些人可能会将 Rust 和 Go 视为互为竞争的编程语言,但 Rust 和 Go 团队都不这么认为.恰恰相反,我们的团队非常尊重其他人正在做的事情,并将这些语言视为对整个软件开发行业现代化的共 ...

  4. 『每周译Go』写了 50 万行 Go 代码后,我明白这些道理

    原文地址:https://blog.khanacademy.org/half-a-million-lines-of-go/ 原文作者:Kevin Dangoor 本文永久链接:https://gith ...

  5. 『每周译Go』GitHub 为 Go 社区带来供应链安全功能

    Go 国际社区从一开始就拥抱 GitHub ( GitHub 即是 Go 代码协作的地方也是发布包的地方) 使得 Go 成为 如今 GitHub 上排名前 15 的编程语言.我们很高兴地宣布 GitH ...

  6. 『每周译Go』Uber 的 API 网关架构

    原文地址:https://eng.uber.com/architecture-api-gateway/ 原文作者:Madan Thangavelu, Abhishek Parwal, Rohit Pa ...

  7. 『每周译Go』Google:12 条 Golang 最佳实践

    这是直接总结好的 12 条,详细的再继续往下看: 先处理错误避免嵌套 尽量避免重复 先写最重要的代码 给代码写文档注释 命名尽可能简洁 使用多文件包 使用 go get 可获取你的包 了解自己的需求 ...

  8. 『每周译Go』Go 语言的 goroutine 性能分析

    本文档最后一次更新时所用的 Go 版本是 1.15.6,但是大多数情况下,新老版本都适用. 描述 Go 运行时在一个称为 allgs 简单切片追踪所有的 goroutines.这里面包含了活跃的和死亡 ...

  9. 『每周译Go』Go 语言中的插件

    很多年以前我就开始写一系列关于插件的文章:介绍这些插件在不同的系统和编程语言下是如何设计和实现的.今天这篇文章,我打算把这个系列扩展下,讲讲 Go 语言中一些插件的例子. 需要提醒的是,本系列头几篇的 ...

最新文章

  1. 单片机C语言知识用法之,单片机C语言知识用法之define
  2. 如何将Vision Transformer应用在移动端?
  3. 循环求100内质数 php_C8循环
  4. 3.2 选择最可能的句子-深度学习第五课《序列模型》-Stanford吴恩达教授
  5. 项目总是有红叉叉,打开又没错,Build path contains duplicate entry: '...' for project 'X
  6. 女生适合学python_Python适合女生学习吗?
  7. 《设计模式详解》软件设计原则
  8. 张一鸣、王欣和罗永浩的社交梦
  9. Chrome快捷键大全
  10. grep 多条件并行满足_grep多个条件
  11. 一个生肖一首诗,看看你是哪一首?
  12. Excel数据合并(追加写入)
  13. 全局负载均衡GSLB
  14. iOS9不受信任的企业开发人员,无法信任
  15. python(10): Signal
  16. 删除autorun.inf病毒的批处理 简单三招预防
  17. 反复调整,苹果的价格策略还奏效吗?
  18. oracle授予数据库所有权限,oracle创造新用户及授予常用权限
  19. DHU 25繁殖问题
  20. 今天收到的QQ礼品卡,差点上当……

热门文章

  1. 工贸企业重大事故隐患判定标准,自2023年5月15日起施行
  2. 百世集团今年第一季度总收入达68.7亿元 净亏损2.1亿元
  3. 广西南宁机器人比赛_南宁市两名小学生夺得世界教育机器人大赛一项冠军
  4. 物联网中的存储资源盘活
  5. 当今情势下,如何通过自媒体平台轻松赚钱
  6. 深圳电子行业的mes系统的需求分析方法~先达智控
  7. C++ 牛客网做题总结
  8. windows下如何配置AWS进行深度学习
  9. Stata常见问题解答
  10. 360安全卫士,还有这么多,小兄弟?