在Go语言中处理任何stream数据时,我已经深陷io.Reader和io.Writer的灵活性中不能自拔。同时我在有一点上又或多或少的受了些折磨,挑战我的reader interface在你看来可能会觉得很简单:那就是怎么样拆分读操作。

我甚至不知道使用“拆分(split)”这个词是否正确,我就是想通过io.Reader多次读取接收到的东西,有时候可能还需要并行操作。但是由于readers不一定会暴露Seek方法重置读取位置,我需要一个方法来复制它。或者可以算是clone或fork么?

现状

假设你有一个web服务允许用户上传一个文件。这个服务将会把文件存储在云端。但是在存储前需要对这个文件进行一些简单的处理。对于接下来的所有请求,你都不得不使用io.Reader去处理。

解决方案

当然,有不止一种方法可以处理这种情况。根据文件的类型,服务的吞吐量,以及文件需要的处理方式的不同有些方式可能比其他的更合适。下面,我给出了5中不同复杂度和灵活性的方法。可以想象还会有更多的方法,但是这几个会是一个不错的起点。

Solution #1:简单的bytes.Reader

如果源reader没有Seek方法,为什么不自己实现一个呢?你可以把所有的内容都读取到一个bytes.Reader中,然后你想分多少次读取都可以,只要你开心:

func handleUpload(u io.Reader)(err error) {

//capture all bytes from upload

b, err := ioutil.ReadAll(u)

if err != nil {

return err

}

//wrap the bytes in a ReaderSeeker

r := bytes.NewReader(b)

//process the metadata

err = processMetaData(r)

if err != nil {

return err

}

r.Seek(0, 0)

//upload the data

err = uploadFile(r)

if err != nil {

return err

}

return nil

}

如果数据足够小,这可能是最方便的选择;你可以完全忘掉bytes.Reader并使用*byte slice的方式代替工作。但是假如是大文件,如视频文件或RAW格式的照片等。这些庞然大物将吞噬你的内存,特别是如果服务还具有高流量特征时。更何况(not to mention)你不能并行执行这些操作。

优点:最简单的方案

缺点:同步,无法适应你期望的很多、很大的文件。

Solution #2:可靠的文件系统

OK,那么将数据放到磁盘中的文件如何(借助ioutil.TempFile),并且可以避免将数据存储在内存中带来的隐患。

func handleUpload(u io.Reader)(err error) {

//create a temporary file for the upload

f, err := ioutil.TempFile("", "upload")

if err != nil {

return err

}

//destroy the file once done

defer func() {

n := f.name()

f.Close()

os.Remove(n)

}()

//transfer the bytes to the file

_, err := io.Copy(f, u)

if err != nil {

return err

}

//rewind the file

f.Seek(0.0)

//upload the file

err = uploadFile(f)

if err != nil{

return err

}

return nil

}

如果最终是要将文件存储在service运行的文件系统中,这种方法可能是最好的选择(尽管会产生一个真实的临时文件),但是我们假设它最终将落在云上。继续,如果这个文件同样很大,则将产生显著的,但是不必要的IO。同时,你还将面临机器上单个文件错误或宕机的风险,所以如果你的数据比较敏感,我也不推荐这种方式。

优点:避免大量内存占用保存整个文件

缺点:同步,潜在的占用大量IO、磁盘空间以及数据单点故障

Solution #3:The Duct-Tape io.MultiReader

有些情况下,你需要的metadata存在于文件最开始的几个字节。例如,识别一个JPEG格式的文件只需要检查文件的前两个字节是否是0xFF 0xD8。这个可以通过使用io.MultiReader同步处理。io.MultiReader将一组readers组织起来使他们看起来像一个一样。如下是我们的JPEG示例:

func handleUpload(u io.Reader)(err error) {

//read in the first 2 bytes

b := make([]byte, 2)

_, err := u.Read(b)

if err != nil {

return err

}

//check that they match the JPEG header

jpg := []byte{0xFF, 0xD8}

if !bytes.Equal(b, jpg) {

return errors.New("not a JPEG")

}

//glue those bytes back onto the reader

r := io.MultiReader(bytes.NewReader(b), u)

//upload the file

err = uploadFile(r)

if err != nil {

return err

}

return nil

}

如果你只打算上传JPEG文件,这是一个很好的技术。只需要两个字节,你就可以停止传输(注:此处的传输不是文件上传的传输,而是将文件拷贝到内存或磁盘进行处理的传输过程),而不必将整个文件拷贝到内存或存放到磁盘上。你应该也会发现,有些场景这个方法也并不适用。比如你需要读取更多的文件内容去收集数据,如通过计算统计单词个数等。这个过程会阻塞文件上传,对任务密集型可能也不是理想的处理方式。最后,大多数第三方包(和大部分标准库)将完整的消耗掉一个reader,以防止你以这种方式使用io.MultiReader.

另一种方案是使用bufio.Reader.Peek。本质上它执行相同的操作,但是你可以避开MultiReader。也就是说,它还可以让你访问Reader上的其他的有用的方法。

优点:快速且是对文件头的脏读,可以作为文件上传的门槛。

缺点:不适用于不定长读取,处理整个文件,密集任务,或和很多第三方包一同使用。

Solution #4:The Single-Split io.TeeReader and io.Pipe

回到前面讨论的大视频文件的情况,我们稍微修改一下故事情节。你的用户只会上传单一格式的视频文件,但是你希望这些视频文件能够被你的服务以不同格式播放。比如说,你有一个第三方转码器可以将io.Reader读取的MP4格式数据转换成WebM格式的数据输出。你的服务将会把原始的MP4和转码的WebM文件都上传到云端。前面的方案必须同步的执行这些操作,现在你想要并行的完成这件事情。

看看io.TeeReader,它的函数签名是这样的:func TeeReader(r Reader, w Writer) Reader。文档中是这样描述的:TeeReader将从Reader r读取的数据返回一个写到Writer w的Reader。这个正是你所需要的!现在你怎么确保写到w的数据可读?这个是通过io.Pipe实现的,它在io.PipeWriter和io.PipeReader之间建立了一个连接(即栈,后入先出)。看看代码是怎么实现的:

func HandleUpload(u io.Reader) (err error) {

//create the pipe and tee reader

pr, pw := io.Pipe()

tr := io.TeeReader(u, pw)

//Create channels to synchronize

done := make(chan bool)

errs := make(chan error)

defer close(done)

defer close(errs)

go func() {

//close the PipeWriter after the

//TeeReader completes to trigger EOF

defer pw.Close()

//upload the original MP4 data

err := uploadFile(tr)

if err != nil {

errs

return

}

done

}()

go func() {

//transcode to WebM

webmr, err := transcode(pr)

if err != nil {

errs

return

}

//upload to storage

err = uploadFile(webmr)

if err != nil {

errs

return

}

done

}()

//wait until both are done

//or an error occurs

for c := 0; c < 2; {

select {

case err :=

return err

case

c++

}

}

return nil

}

因为uploader将要消费tr,transcoder在将数据存储前接收并处理相同的数据。所有的操作不需要额外的buffer,并且并行的执行。注意这里使用goroutine来执行这两天路径。io.Pipe处于阻塞状态直到有程序向它写或从它读取数据。如果尝试在同一个线程中执行相同的io.Pipe,将会得到一个致命错误:fatal error;all goroutines are asleep - deadlock。panic。另一个需要注意的点是:使用Pipe时,你需要在一个合适的时间显示的触发一个EOF来关闭io.PipeWriter。在这个实力中,需要在TeeReader结束后关闭它。

这个示例同样采用了channel来进行goroutines之间的“doneness”和error的同步。如果你期望在执行过程中有一些更具体的值返回,你可以使用更合适的类型替换chan bool。

优点:完全独立的,并行的处理相同的数据流

缺点,使用goroutines和channel增加了复杂度

Solution #5:The Multi-Split io.MultiWriter and io.Copy

io.TeeReader在只有一个其他的流消费者时,能够非常好的解决问题。由于service可能还需要并行的处理更多的任务(如,转换成更多的格式),使用tee的叠加将使代码变得臃肿。看看io.MultiWriter的解释:“一个将writes复制并提供给多个writers的writer”。它也像前面的方法一样使用pipes来传播数据,不同的是,不是使用io.TeeReader,而是使用io.Copy将数据分发到所有的Pipes。示例代码如下:

func handleUpload(u io.Reader)(err error) {

//create the pipes

mp4R, mp4W := io.Pipe()

webmR, webmW := io.Pipe()

oggR, oggW := io.Pipe()

wavR, wavW := io.Pipe()

//create channels to syschronize

done := make(chan bool)

errs := make(chan error)

defer close(done)

defer close(err)

//spawn all the task goroutines. these looks identical to

//the TeeReader example, but pulled out into separate

//methods for clarity

go uploadMP4(mp4R, done, errs)

go transcodeAndUploadWebM(webmR, done, errs)

go transcodeAndUploadOgg(webmR, done, errs)

go transcodeAndUploadWav(webmR, done, errs)

go func() {

// after completing the copy, we need to close

// the PipeWriters to propagate the EOF to all

// PipeReaders to avoid deadlock

defer mp4W.Close()

defer webmW.Close()

defer oggW.Close()

defer wavW.Close()

//build the multiwriter for all the pipes

mw := io.MultiWriter(mp4W, webmW, oggW, wavW)

//copy the data into the multiwriter

_, err := io.Copy(mw, u)

if err != nil {

errs

}

}()

// wait until all are done

// or an error occurs

for c := 0; c < 4; c++ {

select {

case err :=

return err

case

}

}

return nil

}

这个方法和前面的方法有点类似,但是当数据需要被克隆多次时,这种方法明显的更加简洁。因为使用了PIPEs,同样需要使用goroutines和同步channel,以防止死锁。我们在copy完成了关闭了所有的pipes。

优点:可以根据需要fork多份原始数据

缺点:过多的依赖goroutines和channel进行协调。

关于channels?

Channels是Go提供的独特的,强大的并发工具之一。它是goroutines之间的桥梁,同时兼顾了通信和同步。你可以创建带buffer和不带buffer的channel,来实现数据共享。那么,为什么我不提供一个充分利用Channels的解决方案,而不仅仅是用作同步呢?

查阅了一些标准库的top-level包,发现channels很少出现在函数签名中:

time: 用于select timeout

reflect: …cause reflection

fmt: for formatting it as a pointer

builtin: expose the close function

io.Pipe的实现中放弃了channel,而使用sync.Mutex来安全的在reader和writer之间移动数据。我怀疑这是因为Channel的性能并不好,所以在这里才被Mutex替代。

当开发一个可重复利用的包的时候,我会像标准库一样在我公开的API中避免使用Channels,但是会在内部使用它们用作同步。如果复杂度足够的低,使用mutex替代channel也许更加理想。这也就是说,在程序开发中,channel是更完美的抽象,比lock更好使用,更加灵活。

抛砖迎玉

我在这里只是抛出了屈指可数的几种方法处理从io.Reader获取的数据,毫无疑问,肯定还有更多的方法。Go的隐式接口模型(implicit interface model)+ 标准库的大量使用允许创造性的将不同组件组合而不用担心数据。我希望我在这里的一些探讨对你有所帮助,正如它们对我有用一样.

go语言io reader_Go语言中异步拆分io.Reader相关推荐

  1. go语言io reader_go语言之IO操作(待补充)

    前言 在Go中,输入和输出操作都是使用原语实现的,原语将数据模拟成可读的或者可写的字节流. 而Go的io包提供了io.Reader(将数据从某个资源读取到传输缓冲区被流式传输和使用)和io.Write ...

  2. Pipelines - .NET中的新IO API指引(二)

    原文:Pipelines - a guided tour of the new IO API in .NET, part 2 作者:marcgravell 在上一章,我们讨论了以往的StreamAPI ...

  3. 细节分析Linux中五种IO模型和三种实现方式

    I/O介绍 操作系统分为两种I/O 网络IO:本质是socket读取 磁盘IO:DMA操作读取 C/C++Linux服务器开发知识点 内容包括C/C++,Linux,Nginx,ZeroMQ,MySQ ...

  4. Linux中5种IO模型

    在了解IO模型时需要清楚什么是同步和异步,什么是阻塞和非阻塞 同步/异步 阻塞/非阻塞 当IO操作发生时,一定是两方参与的,分别是调用方和被调用方.阻塞和非阻塞相对于的事调用方,同步和异步相对于的好似 ...

  5. c语言中的标准IO以及文件IO

    1.标准IO: (1)标准IO是c语言标准提供的一系列进行输入输出的函数 (2)标准IO具有缓冲区 (3)标准IO是在系统调用之上构建的 2.缓冲区 缓冲区是系统在内存中为正在使用的文件自动开辟的一片 ...

  6. go语言io reader_【已解决】go语言中如何使用io的MultiWriter

    [背景] 折腾: 期间,需要去搞懂: 如果新建和设置MultiWriter. [折腾过程] 1.参考: 去看看: 2.然后去试试代码:var filenameOnly string filenameO ...

  7. Node的异步与java的异步_node中异步IO的理解

    解释性语言和编译型语言的区别: 计算器不能直接的理解高级语言,只能理解机器语言,所以必须把高级语言翻译为机器语言,翻译的方式有两种,一个是编译,一个是解释. 解释性语言的程序不需要编译,它是在运行程序 ...

  8. c 语言如何处理表格文件中的数据库,C#程序从Excel表格中读取数据并进行处理

    今天做了一个Excel表格数据处理的事情,因为数据量表较大(接近7000条)所以处理起来有点麻烦,于是写了一个程序, 先将程序记下以便将来查找. using System; using System. ...

  9. node中异步IO的理解

    解释性语言和编译型语言的区别: 计算器不能直接的理解高级语言,只能理解机器语言,所以必须把高级语言翻译为机器语言,翻译的方式有两种,一个是编译,一个是解释. 解释性语言的程序不需要编译,它是在运行程序 ...

最新文章

  1. python相关性分析特征过滤_特征选择-Filter过滤法后续(相关,互信息法)
  2. boost::process::args相关的测试程序
  3. java.lang.math.trunc,java – JPA/Hibernate返回BigDecimal不长
  4. SpringCloud与SpringConfig分布式配置中心
  5. python效率numpy_Python数据处理性能对比,原生,Pandas,Numpy哪个更优秀
  6. Julia: Array 很不同!
  7. A[1080]Graduate Admission 两个cmp比较函数两个struct结构体
  8. java的log计算_Java普通对数(log)计算方法
  9. Packet Tracer安装包及安装教程(8.0版本)
  10. mysql sql 语法错误_执行SQL查询时出现MySQL语法错误
  11. 打印纸张尺寸换算_纸张的尺寸规格对照
  12. tinyserver小型服务器
  13. 亚马逊这样做竞争大的产品更有优势
  14. wf显示远端服务器无反应,无线路由器连接不上网络,一直提示网络服务器远端无响应!...
  15. 使用JS进行版本号比较方法
  16. 摩拜单车用户行为数据分析报告
  17. win10浏览器闪退_win10系统打开360浏览器就闪退是什么情况?
  18. PT100(RTD)三线制测量方案
  19. 减少用户投诉,就选中国移动二次号查询
  20. 微服务框架之微软Service Fabric

热门文章

  1. RISC-V V拓展的实验测试
  2. 如何用计算机符号常量计算圆的周长,编写程序,输入圆的半径,计算并输出其周长和面积。常量pi的值取3.1.._简答题试题答案...
  3. imadjust使用opencv实现
  4. zynq sdk 开发之通过 BRAM 进行 PL 与 PS 的数据交互
  5. 自动化篇 | 你想要的闲鱼日常操作,Python 给你实现了
  6. 名帖75 柳公权 楷书《教弟子言》
  7. android文件传输到电视,快牙及时分享 安卓手机传输文件最佳方案
  8. linux 录像,Linux的录像与回放
  9. 玩客云做为NAS接入Ubuntu16.04.3
  10. XTU OJ 1217 A+B VII