文章目录

  • fanIn

    • 协程版

    • 递归版

    • 反射版

  • fanOut

    • 同步版

    • 协程异步版

    • 反射版

今天回顾下常用的两种channel应用模式: fanInfanOut,

分别对应了,对一组相同类型chan的合并和广播。

fanIn

将全部输入chan都聚合到一个out chan中,在全部聚合完成后,关闭out chan.

协程版

func fanIn(chans ...<-chan interface{}) <-chan interface{} {out := make(chan interface{})go func() {var wg sync.WaitGroupwg.Add(len(chans))for _, ch := range chans {go func(ch <-chan interface{}) {for v := range ch {out <- v}wg.Done()}(ch)}// 等待协程全部结束wg.Wait()close(out)}()return out
}

这里用waitGroup是防止关闭out时还有写入(out <- v),避免panic

递归版

2 分递归并合并。

其中合并mergeTwo主要用了nil chan对读写均阻塞。

chan关闭时,设置为nil,阻塞读取。

func fanInRecur(chans ...<-chan interface{}) <-chan interface{} {switch len(chans) {case 0:c := make(chan interface{})close(c)// 无可聚合chan,返回一个已关闭chan,可读不可写return ccase 1:return chans[0]case 2:return mergeTwo(chans[0], chans[1])default:// 一分为二,递归m := len(chans) / 2return mergeTwo(fanInRecur(chans[:m]...),fanInRecur(chans[m:]...))}
}func mergeTwo(a, b <-chan interface{}) <-chan interface{} {c := make(chan interface{})go func() {defer close(c)for a != nil || b != nil { // 只要还有可读的chanselect {case v, ok := <-a:if !ok { // a 已关闭,设置为nila = nilcontinue}c <- vcase v, ok := <-b:if !ok { // b 已关闭,设置为nilb = nilcontinue}c <- v}}}()return c
}

反射版

利用reflect.SelectCase构造批量可Select的发送chan

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {out := make(chan interface{})go func() {defer close(out)// 构造SelectCase slicevar cases []reflect.SelectCasefor _, c := range chans {cases = append(cases, reflect.SelectCase{Dir:  reflect.SelectRecv,Chan: reflect.ValueOf(c),})}// 循环,从cases中选择一个可用的for len(cases) > 0 {i, v, ok := reflect.Select(cases)if !ok {// 此channel已经close, 从切片移除cases = append(cases[:i], cases[i+1:]...)continue}out <- v.Interface()}}()return out
}

附上压测数据

性能对比

fanOut

同步版

最直观的方式,直接向每一个chan都同步发送一遍 返回前关闭这组chan, 即不再写入

func fanOut(ch <-chan interface{}, out []chan interface{}) {go func() {defer func() { // 退出时关闭所有的输出chanfor i := range out {close(out[i])}}()for v := range ch { // 从输入chan中读取数据v := vfor i := range out {i := iout[i] <- v // 放入到输出chan中,同步方式}}}()
}

协程异步版

发送这里用起协程的方式,实现异步,发送操作耗时情况下无需阻塞等待

可是有个问题,不知道你看出来没。

func fanOut(ch <-chan interface{}, out []chan interface{}) {go func() {defer func() { // 退出时关闭所有的输出chanfor i := range out {close(out[i])}}()for v := range ch { // 从输入chan中读取数据v := vfor i := range out {i := i// 协程异步go func(){}out[i] <- v}()}}}()
}

乍一看好像没什么问题, 但退出时关闭时,很可能发送的协程写入还没完成,

毕竟这里out之前写入的要有人读才能继续写。

这里加waitGroup可以等待全部发送完毕在关闭

func fanOutAsync(ch <-chan interface{}, out []chan interface{}) {go func() {var wg sync.WaitGroupdefer func() { // 退出时关闭所有的输出chanwg.Wait()for i := range out {close(out[i])}}()for v := range ch { // 从输入chan中读取数据v := vfor i := range out {i := iwg.Add(1)go func() { // 异步,避免一个out阻塞的时候影响其他outout[i] <- vwg.Done()}()}}}()
}

反射版

构造一票chan send case, 遍历select,发送完成的将其置为nil阻塞,避免再次发送

不得不说,nil chan出镜率很高啊

func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {go func() {defer func() { // 退出时关闭所有的输出chanfor i := range out {close(out[i])}}()cases := make([]reflect.SelectCase, len(out))// 构造SelectCase slicefor i := range cases {cases[i].Dir = reflect.SelectSend}for v := range ch {v := v// 先完成send case构造for i := range cases {cases[i].Chan = reflect.ValueOf(out[i])cases[i].Send = reflect.ValueOf(v)}// 遍历selectfor range cases {chosen, _, _ := reflect.Select(cases)// 已发送过,用nil阻塞,避免再次发送cases[chosen].Chan = reflect.ValueOf(nil)}}}()
}

附上压测数据

性能对比

具体测试代码详见:concurrency[1]

参考资料

[1]

concurrency: https://github.com/NewbMiao/Dig101-Go/tree/master/concurrency/channel/schedule


如果有用,点个 在看 ,让更多人看到

外链不能跳转,戳 阅读原文 查看参考资料

说说fanIn和fanOut相关推荐

  1. 数字电路基本概念 —— fan-in/fan-out

    0. 从模拟电路到数字电路 数字电路抗干扰能力强: 模拟电路会随着信号的传输而放大,这是因为模拟电路中信号几乎完全将真实信号按比例表现为电压或者电流的形式: 模拟电路是数字电路的基础 74LS283 ...

  2. 单张GPU搞定GPT-3超参数!先训练小模型,再“一键迁移” | 已开源

    点击上方"视学算法",选择加"星标"或"置顶" 重磅干货,第一时间送达 丰色 发自 凹非寺 量子位 | 公众号 QbitAI "一 ...

  3. 如何在golang中关闭bufio.reader_Golang 并发模型系列:1. 轻松入门流水线模型

    Go语言中文网,致力于每日分享编码.开源等知识,欢迎关注我,会有意想不到的收获! Golang作为一个实用主义的编程语言,非常注重性能,在语言特性上天然支持并发,它有多种并发模型,通过流水线模型系列文 ...

  4. 大数据flume日志采集系统详解

    一.flume介绍 flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统.Flume支持日志系统中定制各类数据发送方,用于收集数据.同时flume提供对数据进行简单 ...

  5. 2、Flume1.7.0入门:安装、部署、及flume的案例

    一.什么是Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用. flume的特点: flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的 ...

  6. Verdi-ug --- nschema Tutorial

    文章目录 1.Change the Schematic View Among Instances 2.Find an Instance/Signal and Manipulate it 2.Tranc ...

  7. Flume日志收集系统架构详解--转

    2017-09-06朱洁大数据和云计算技术 任何一个生产系统在运行过程中都会产生大量的日志,日志往往隐藏了很多有价值的信息.在没有分析方法之前,这些日志存储一段时间后就会被清理.随着技术的发展和分析能 ...

  8. google Guava包的ListenableFuture解析

    原文地址  译者:罗立树  校对:方腾飞 并发编程是一个难题,但是一个强大而简单的抽象可以显著的简化并发的编写.出于这样的考虑,Guava 定义了 ListenableFuture接口并继承了JDK ...

  9. Flume NG 简介及配置实战

    2019独角兽企业重金招聘Python工程师标准>>> Flume NG 简介及配置实战 博客分类: 分布式计算 1.Flume 的一些核心概念: 1.1 数据流模型 1.2 高可靠 ...

最新文章

  1. udacity 项目3人脸检测
  2. iOS标准库中常用数据结构和算法之内存池
  3. 易评:软银收购ARM会扼住中国芯发展的咽喉吗?
  4. 计算机控制直流电机闭环调速实验,最小拍控制系统及直流电机闭环调速控制系统设计和实现实验报告...
  5. C#序列化反序列化对象为base64字符串
  6. 对URLEncode的解码
  7. HarmonyOS之数据管理·轻量级偏好数据库的应用
  8. 59 javabean的创建
  9. c语言 main函数
  10. python中valueerror是什_Python:ValueError和Exception的区别?
  11. python实现屏幕录制_GitHub - Sijiu/record-camera-and-screen: 录制摄像头和录制屏幕,两者之间可以轻易切换...
  12. [转]char数组赋值
  13. adobe audition cs6 能打开mpcm文件吗?_单根32G内存靠谱吗?阿斯加特 W2 32G DDR4 2666内存测试...
  14. 多图详解freeBSD8.2安装过程
  15. PyTorch 和 TensorFlow 引领未来
  16. 【CPI指数预测】基于matlab BP神经网络CPI指数预测【含Matlab源码 662期】
  17. 用c语言将学生系统插入音效,增加音效.cpp
  18. Codeforces - Garland
  19. Write-back,Write-through及write allocate
  20. 决策规划算法三:DP与分层状态机2种决策算法的对比

热门文章

  1. 卷积深层网络+图像定位
  2. bcache / 如何使用bcache构建LVM,软RAID / 如何优化bcache
  3. 程序员迎娶白富美的唯一出路是什么? 认真用心写代码
  4. 【ParaView教程】第四章 常见问题 —— 怎样计算正面投影面积?
  5. CVE-2010-2729(MS10-061)
  6. java public main_JAVA:public static void main(String args[]) 详解
  7. webpack中publicPath的深入解析
  8. 线性滤波和非线性滤波
  9. SpringBoot是如何动起来的
  10. [1-6] 把时间当做朋友(李笑来)Chapter 6 【更多思考】 摘录