Golang并发模式--管线
管线
管线的作用类似于linux的管道,我们利用channel把数据一级级的传递,最终获取数据的结果。管线适用于流处理的方式,而非批处理;流处理可以实时的获取数据。
给出最基础的管线示例,串联加法器和乘法器的管线。
我们使用channel
来表示图中的管线。给出代码示例:
package mainimport ("fmt""math/rand""time"
)func main() {rand.Seed(time.Now().UTC().UnixNano())generator := func(done <-chan interface{}) <-chan int {genChan := make(chan int) // 管线1go func() {defer close(genChan)for {select {case <-done:returncase genChan <- rand.Intn(100):time.Sleep(time.Second * time.Duration(rand.Intn(3) + 1))}}}()return genChan}adder := func(done <-chan interface{}, intChan <-chan int, factor int) <-chan int {addChan := make(chan int) // 管线2go func() {defer close(addChan)for n := range intChan {select {case <-done:returncase addChan <- (n + factor):}}}()return addChan}multiplier := func(done <-chan interface{}, addChan <-chan int, factor int) <-chan int {mulChan := make(chan int) // 管线3go func() {defer close(mulChan)for n := range addChan {select {case <-done:returncase mulChan <- (n * factor):}}}()return mulChan}done := make(chan interface{})res := multiplier(done, adder(done, generator(done), 3), 2)st := time.Now()since := time.Second * 10for n := range res {if time.Since(st) > since {close(done)fmt.Println("\nclose pipeline")break} else {fmt.Printf("%v ", n)}}fmt.Println("exit")
}
这种方式不太适用于并发的情况,因为管线内部是串行的,如果有某些处理单元运行慢,则整体速度慢。
常用的生成器
repeat生成器
该生成器用于连续产生一组相同的数据,给出代码示例:
package main
import "fmt"
func main() {repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {valueStream := make(chan interface{})go func() {defer close(valueStream)for { // 无限循环产生数据for _, v := range values { // 这里内部遍历select {case <-done:returncase valueStream <- v:}}}}()return valueStream}take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {takeStream := make(chan interface{})go func() {defer close(takeStream)for i := 0; i < num; i ++ { // 获取批次数据select {case <-done:returncase n := <-valueStream:takeStream <- n}}}()return takeStream}done := make(chan interface{})defer close(done)for num := range take(done, repeat(done, 1, 3, 5, 7, 9), 10) {fmt.Printf("%d ", num)}fmt.Println("")
}
// 1 3 5 7 9 1 3 5 7 9
repeatFn生成器
该生成器可以调用外部函数,完成循环重复生成的功能。代码如下:
package main
import ("fmt""math/rand"
)
func main() {myFn := func() interface{} { return rand.Intn(10000000) } // 生成器函数repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {valueStream := make(chan interface{})go func() {defer close(valueStream)for {select {case <-done:returncase valueStream <- fn():}}}()return valueStream}take := func(done <-chan interface{}, valueStream <-chan interface{}, n int) <-chan interface{} {takeStream := make(chan interface{})go func() {for i := 0; i < n; i++ {select {case <-done:returncase num := <-valueStream:takeStream <- num}}}()return takeStream}done := make(chan interface{})defer close(done)stream := take(done, repeatFn(done, myFn), 10)for n := range stream {fmt.Println(n)}
}
Golang并发模式--管线相关推荐
- Golang并发模式基础
for-select循环 最基本的模式: for {select {case condition 1:// ...case condition 2:// ...case condition n:// ...
- Golang并发模式--channel高级使用
or-done channel 再pipeline的情况中,我们通过done来通知goroutine结束.但是,如果我们处理的channel来自系统其它分散的部分,则无法通过done来控制,因为我们不 ...
- Golang并发模型:轻松入门流水线FAN模式
前一篇文章<Golang并发模型:轻松入门流水线模型>,介绍了流水线模型的概念,这篇文章是流水线模型进阶,介绍FAN-IN和FAN-OUT,FAN模式可以让我们的流水线模型更好的利用Gol ...
- Golang并发模型:轻松入门协程池
goroutine是非常轻量的,不会暂用太多资源,基本上有多少任务,我们可以开多少goroutine去处理.但有时候,我们还是想控制一下. 比如,我们有A.B两类工作,不想把太多资源花费在B类务上,而 ...
- Golang并发模型:合理退出并发协程
goroutine作为Golang并发的核心,我们不仅要关注它们的创建和管理,当然还要关注如何合理的退出这些协程,不(合理)退出不然可能会造成阻塞.panic.程序行为异常.数据结果不正确等问题.这篇 ...
- Golang并发:再也不愁选channel还是选锁
周末又到了,为大家准备了一份实用干货:如何使用channel和Mutex解决并发问题,利用周末的好时光,配上音乐,思考一下吧?. 来,问自己个问题:面对并发问题,是用channel解决,还是用Mute ...
- Go语言中常见的并发模式
Go语言最吸引人的地方是它内建的并发支持.Go语言并发体系的理论是C.A.R Hoare在1978年提出的通信顺序进程(Communicating Sequential Process,CSP).CS ...
- golang 并发同步(sync 库)-- 单例模式的实现(六)
文章目录 golang 单例模式 非线程安全的单例模式 加锁的单例模式 sync.Once 更优雅的方式实现单例模式 更优雅的方式防止主 go 程提前退出 sync.Cond--golang 指挥家 ...
- 第09章 Go语言并发,Golang并发
并发指在同一时间内可以执行多个任务.并发编程含义比较广泛,包含多线程编程.多进程编程及分布式程序等.本章讲解的并发含义属于多线程编程. Go 语言通过编译器运行时(runtime),从语言上支持了并发 ...
最新文章
- 7-2 然后是几点 (Java)
- Elasticsearch 7.0中引入的新集群协调子系统如何使用?
- element ui 表格中的字太长,想要把多余的字变成...解决方法,一个属性即可
- NYOJ 990 蚂蚁感冒
- pypthon3精要(11)-try,except,else异常处理
- 今天刚查到的宏,学习
- 如何通过7个Logback调整立即改善Java日志记录
- Tornado 错误 Global name 'memoryview' is not defined
- [2-sat][topsort输出解] POJ 3648 Wedding
- 推荐一本学Python的好书《Python程序设计(第2版)》
- d盾web查杀 linux,D盾Web查杀
- git报错:fatal: unable to access ‘...‘: Failed to connect to github port 443: Timed out
- 原生开发、H5开发和混合开发的区别
- 软件著作权 php代码行数,申报软件著作权时,如何快捷计算源码行数
- 李彦宏:离破产永远只有30天
- 【rfc5506】RTCP mode
- 20220525商汤算法岗实习面试经历
- 感谢米老师,感谢提高班,做个骄傲的自己
- 全球主流云桌面传输协议
- git push 报错 Empty reply from server 或 Failed to connect to github.com port 443: Time out
热门文章
- 操作系统 第二章 进程管理
- 蓝桥杯2016年C/C++ 混搭
- STL常用函数总结-stack
- 对于electron-react-boilerplate(ERB)的学习笔记(legacy)
- hihocoder第218周:AC自动机
- 【Qt教程】1.8 - Qt5-Lambda表达式
- 敏捷开发生态系统系列之一:序言及需求管理生态(客户价值导向-可工作软件-响应变化)...
- 运行jar包提示找不到.properties文件的问题
- ASP.net用法系列:如何从基类调用LINQ/EF类的属性
- Office - Word 2013