管线

管线的作用类似于linux的管道,我们利用channel把数据一级级的传递,最终获取数据的结果。管线适用于流处理的方式,而非批处理;流处理可以实时的获取数据。

给出最基础的管线示例,串联加法器和乘法器的管线。

管线1
管线2
管线3
数据生成器
加法器B
乘法器
数据获取器

我们使用channel来表示图中的管线。给出代码示例:

package mainimport ("fmt""math/rand""time"
)func main() {rand.Seed(time.Now().UTC().UnixNano())generator := func(done <-chan interface{}) <-chan int {genChan := make(chan int)  // 管线1go func() {defer close(genChan)for {select {case <-done:returncase genChan <- rand.Intn(100):time.Sleep(time.Second * time.Duration(rand.Intn(3) + 1))}}}()return genChan}adder := func(done <-chan interface{}, intChan <-chan int, factor int) <-chan int {addChan := make(chan int)  // 管线2go func() {defer close(addChan)for n := range intChan {select {case <-done:returncase addChan <- (n + factor):}}}()return addChan}multiplier := func(done <-chan interface{}, addChan <-chan int, factor int) <-chan int {mulChan := make(chan int)  // 管线3go func() {defer close(mulChan)for n := range addChan {select {case <-done:returncase mulChan <- (n * factor):}}}()return mulChan}done := make(chan interface{})res := multiplier(done, adder(done, generator(done), 3), 2)st := time.Now()since := time.Second * 10for n := range res {if time.Since(st) > since {close(done)fmt.Println("\nclose pipeline")break} else {fmt.Printf("%v ", n)}}fmt.Println("exit")
}

这种方式不太适用于并发的情况,因为管线内部是串行的,如果有某些处理单元运行慢,则整体速度慢。

常用的生成器

repeat生成器

该生成器用于连续产生一组相同的数据,给出代码示例:


package main
import "fmt"
func main() {repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {valueStream := make(chan interface{})go func() {defer close(valueStream)for {  // 无限循环产生数据for _, v := range values {  // 这里内部遍历select {case <-done:returncase valueStream <- v:}}}}()return valueStream}take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {takeStream := make(chan interface{})go func() {defer close(takeStream)for i := 0; i < num; i ++ {  // 获取批次数据select {case <-done:returncase n := <-valueStream:takeStream <- n}}}()return takeStream}done := make(chan interface{})defer close(done)for num := range take(done, repeat(done, 1, 3, 5, 7, 9), 10) {fmt.Printf("%d ", num)}fmt.Println("")
}
// 1 3 5 7 9 1 3 5 7 9

repeatFn生成器

该生成器可以调用外部函数,完成循环重复生成的功能。代码如下:


package main
import ("fmt""math/rand"
)
func main() {myFn := func() interface{} { return rand.Intn(10000000) }  // 生成器函数repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {valueStream := make(chan interface{})go func() {defer close(valueStream)for {select {case <-done:returncase valueStream <- fn():}}}()return valueStream}take := func(done <-chan interface{}, valueStream <-chan interface{}, n int) <-chan interface{} {takeStream := make(chan interface{})go func() {for i := 0; i < n; i++ {select {case <-done:returncase num := <-valueStream:takeStream <- num}}}()return takeStream}done := make(chan interface{})defer close(done)stream := take(done, repeatFn(done, myFn), 10)for n := range stream {fmt.Println(n)}
}

Golang并发模式--管线相关推荐

  1. Golang并发模式基础

    for-select循环 最基本的模式: for {select {case condition 1:// ...case condition 2:// ...case condition n:// ...

  2. Golang并发模式--channel高级使用

    or-done channel 再pipeline的情况中,我们通过done来通知goroutine结束.但是,如果我们处理的channel来自系统其它分散的部分,则无法通过done来控制,因为我们不 ...

  3. Golang并发模型:轻松入门流水线FAN模式

    前一篇文章<Golang并发模型:轻松入门流水线模型>,介绍了流水线模型的概念,这篇文章是流水线模型进阶,介绍FAN-IN和FAN-OUT,FAN模式可以让我们的流水线模型更好的利用Gol ...

  4. Golang并发模型:轻松入门协程池

    goroutine是非常轻量的,不会暂用太多资源,基本上有多少任务,我们可以开多少goroutine去处理.但有时候,我们还是想控制一下. 比如,我们有A.B两类工作,不想把太多资源花费在B类务上,而 ...

  5. Golang并发模型:合理退出并发协程

    goroutine作为Golang并发的核心,我们不仅要关注它们的创建和管理,当然还要关注如何合理的退出这些协程,不(合理)退出不然可能会造成阻塞.panic.程序行为异常.数据结果不正确等问题.这篇 ...

  6. Golang并发:再也不愁选channel还是选锁

    周末又到了,为大家准备了一份实用干货:如何使用channel和Mutex解决并发问题,利用周末的好时光,配上音乐,思考一下吧?. 来,问自己个问题:面对并发问题,是用channel解决,还是用Mute ...

  7. Go语言中常见的并发模式

    Go语言最吸引人的地方是它内建的并发支持.Go语言并发体系的理论是C.A.R Hoare在1978年提出的通信顺序进程(Communicating Sequential Process,CSP).CS ...

  8. golang 并发同步(sync 库)-- 单例模式的实现(六)

    文章目录 golang 单例模式 非线程安全的单例模式 加锁的单例模式 sync.Once 更优雅的方式实现单例模式 更优雅的方式防止主 go 程提前退出 sync.Cond--golang 指挥家 ...

  9. 第09章 Go语言并发,Golang并发

    并发指在同一时间内可以执行多个任务.并发编程含义比较广泛,包含多线程编程.多进程编程及分布式程序等.本章讲解的并发含义属于多线程编程. Go 语言通过编译器运行时(runtime),从语言上支持了并发 ...

最新文章

  1. 7-2 然后是几点 (Java)
  2. Elasticsearch 7.0中引入的新集群协调子系统如何使用?
  3. element ui 表格中的字太长,想要把多余的字变成...解决方法,一个属性即可
  4. NYOJ 990 蚂蚁感冒
  5. pypthon3精要(11)-try,except,else异常处理
  6. 今天刚查到的宏,学习
  7. 如何通过7个Logback调整立即改善Java日志记录
  8. Tornado 错误 Global name 'memoryview' is not defined
  9. [2-sat][topsort输出解] POJ 3648 Wedding
  10. 推荐一本学Python的好书《Python程序设计(第2版)》
  11. d盾web查杀 linux,D盾Web查杀
  12. git报错:fatal: unable to access ‘...‘: Failed to connect to github port 443: Timed out
  13. 原生开发、H5开发和混合开发的区别
  14. 软件著作权 php代码行数,申报软件著作权时,如何快捷计算源码行数
  15. 李彦宏:离破产永远只有30天
  16. 【rfc5506】RTCP mode
  17. 20220525商汤算法岗实习面试经历
  18. 感谢米老师,感谢提高班,做个骄傲的自己
  19. 全球主流云桌面传输协议
  20. git push 报错 Empty reply from server 或 Failed to connect to github.com port 443: Time out

热门文章

  1. 操作系统 第二章 进程管理
  2. 蓝桥杯2016年C/C++ 混搭
  3. STL常用函数总结-stack
  4. 对于electron-react-boilerplate(ERB)的学习笔记(legacy)
  5. hihocoder第218周:AC自动机
  6. 【Qt教程】1.8 - Qt5-Lambda表达式
  7. 敏捷开发生态系统系列之一:序言及需求管理生态(客户价值导向-可工作软件-响应变化)...
  8. 运行jar包提示找不到.properties文件的问题
  9. ASP.net用法系列:如何从基类调用LINQ/EF类的属性
  10. Office - Word 2013