Scalaz(47)- scalaz-stream: 深入了解-Source
scalaz-stream库的主要设计目标是实现函数式的I/O编程(functional I/O)。这样用户就能使用功能单一的基础I/O函数组合成为功能完整的I/O程序。还有一个目标就是保证资源的安全使用(resource safety):使用scalaz-stream编写的I/O程序能确保资源的安全使用,特别是在完成一项I/O任务后自动释放所有占用的资源包括file handle、memory等等。我们在上一篇的讨论里笼统地解释了一下scalaz-stream核心类型Process的基本情况,不过大部分时间都用在了介绍Process1这个通道类型。在这篇讨论里我们会从实际应用的角度来介绍整个scalaz-stream链条的设计原理及应用目的。我们提到过Process具有Emit/Await/Halt三个状态,而Append是一个链接stream节点的重要类型。先看看这几个类型在scalaz-stream里的定义:
case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]case class Await[+F[_], A, +O](req: F[A], rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance, preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing])) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]case class Append[+F[_], +O](head: HaltEmitOrAwait[F, O], stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance) extends Process[F, O]
我们看到Process[F,O]被包嵌在Trampoline类型里,所以Process是通过Trampoline来实现函数结构化的,可以有效解决大量stream运算堆栈溢出问题(StackOverflowError)。撇开Trampoline等复杂的语法,以上类型可以简化成以下理论结构:
1 rait Process[+F[_],+O] 2 case object Cause 3 4 case class Emit[O](out: O) extends Process[Nothing, O] 5 6 case class Halt(cause: Cause) extends Process[Nothing,Nothing] 7 8 case class Await[+F[_],E,+O]( 9 req: F[E], 10 rcv: E => Process[F,O], 11 preempt: E => Process[F,Nothing] = Halt) extends Process[F,O] 12 13 case class Append[+F[_],+O]( 14 head: Process[F,O], 15 stack: Vector[Cause => Process[F,O]]) extends Process[F,O]
我们来说明一下:
Process[F[_],O]:从它的类型可以推断出scalaz-stream可以在输出O类型元素的过程中进行可能含副作用的F类型运算。
Emit[O](out: O):发送一个O类型元素;不可能进行任何附加运算
Halt(cause: Cause):停止发送;cause是停止的原因:End-完成发送,Err-出错终止,Kill-强行终止
Await[+F[_],E,+O]:这个是运算流的核心Process状态。先进行F运算req,得出结果E后输入函数rcv转换到下一个Process状态,完成后执行preempt这个事后清理函数。这不就是个flatMap函数结构版嘛。值得注意的是E类型是个内部类型,由F运算产生后输入rcv后就不再引用了。我们可以在preepmt函数里进行资源释放。如果我们需要构建一个运算流,看来就只有使用这个Await类型了
Append[+F[_],+O]:Append是一个Process[F,O]链接类型。首先它不但担负了元素O的传送,更重要的是它还可以把上一节点的F运算传到下一个节点。这样才能在下面节点时运行对上一个节点的事后处置函数(finalizer)。Append可以把多个节点结成一个大节点:head是第一个节点,stack是一串函数,每个函数接受上一个节点完成状态后运算出下一个节点状态
一个完整的scalaz-stream由三个类型的节点组成Source(源点)/Transducer(传换点)/Sink(终点)。节点间通过Await或者Append来链接。我们再来看看Source/Transducer/Sink的类型款式:
上游:Source >>> Process0[O] >>> Process[F[_],O]
中游:Transduce >>> Process1[I,O]
下游:Sink/Channel >>> Process[F[_],O => F[Unit]], Channel >>> Process[F[_],I => F[O]]
我们可以用一个文件处理流程来描述完整scalaz-stream链条的作用:
Process[F[_],O],用F[O]方式读取文件中的O值,这时F是有副作用的
>>> Process[I,O],I代表从文件中读取的原始数据,O代表经过筛选、处理产生的输出数据
>>> O => F[Unit]是一个不返回结果的函数,代表对输入的O类型数据进行F运算,如把O类型数据存写入一个文件
/>> I => F[O]是个返回结果的函数,对输入I进行F运算后返回O,如把一条记录写入数据库后返回写入状态
以上流程简单描述:从文件中读出数据->加工处理读出数据->写入另一个文件。虽然从描述上看起来很简单,但我们的目的是资源安全使用:无论在任何终止情况下:正常读写、中途强行停止、出错终止,scalaz-stream都会主动关闭开启的文件、停止使用的线程、释放占用的内存等其它资源。这样看来到不是那么简单了。我们先试着分析Source/Transducer/Sink这几种类型的作用:
1 import Process._ 2 emit(0) //> res0: scalaz.stream.Process0[Int] = Emit(Vector(0)) 3 emitAll(Seq(1,2,3)) //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3)) 4 Process(1,2,3) //> res2: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3)) 5 Process() //> res3: scalaz.stream.Process0[Nothing] = Emit(List())
以上都是Process0的构建方式,也算是数据源。但它们只是代表了内存中的一串值,对我们来说没什么意义,因为我们希望从外设获取这些值,比如从文件或者数据库里读取数据,也就是说需要F运算效果。Process0[O] >>> Process[Nothing,O],而我们需要的是Process[F,O]。那么我们这样写如何呢?
1 val p: Process[Task,Int] = emitAll(Seq(1,2,3)) 2 //> p : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3)) 3 4 emitAll(Seq(1,2,3)).toSource 5 //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3)) 6
类型倒是匹配了,但表达式Emit(...)里没有任何Task的影子,这个无法满足我们对Source的需要。看来只有以下这种方式了:
1 await(Task.delay{3})(emit) 2 //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@57855c9a,<function1>,<function1>) 3 eval(Task.delay{3}) 4 //> res6: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@63e2203c,<function1>,<function1>)
现在不但类型匹配,而且表达式里还包含了Task运算。我们通过Task.delay可以进行文件读取等带有副作用的运算,这是因为Await将会运行req:F[E] >>> Task[Int]。这正是我们需要的Source。那我们能不能用这个Source来发出一串数据呢?
1 def emitSeq[A](xa: Seq[A]):Process[Task,A] = 2 xa match { 3 case h :: t => await(Task.delay {h})(emit) ++ emitSeq(t) 4 case Nil => halt 5 } //> emitSeq: [A](xa: Seq[A])scalaz.stream.Process[scalaz.concurrent.Task,A] 6 val es1 = emitSeq(Seq(1,2,3)) //> es1 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Await(scalaz.concurrent.Task@2d6eabae,<function1>,<function1>),Vector(<function1>)) 7 val es2 = emitSeq(Seq("a","b","c")) //> es2 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Await( 8 scalaz.concurrent.Task@4e7dc304,<function1>,<function1>),Vector(<function1>)) 9 es1.runLog.run //> res7: Vector[Int] = Vector(1, 2, 3) 10 es2.runLog.run //> res8: Vector[String] = Vector(a, b, c)
以上示范中我们用await运算了Task,然后返回了Process[Task,?],一个可能带副作用运算的Source。实际上我们在很多情况下都需要从外部的源头用Task来获取一些数据,通常这些数据源都对数据获取进行了异步(asynchronous)运算处理,然后通过callback方式来提供这些数据。我们可以用Task.async函数来把这些callback函数转变成Task,下一步我们只需要用Process.eval或者await就可以把这个Task升格成Process[Task,?]。我们先看个简单的例子:假如我们用scala.concurrent.Future来进行异步数据读取,可以这样把Future转换成Process:
1 def getData(dbName: String): Task[String] = Task.async { cb => 2 import scala.concurrent._ 3 import scala.concurrent.ExecutionContext.Implicits.global 4 import scala.util.{Success,Failure} 5 Future { s"got data from $dbName" }.onComplete { 6 case Success(a) => cb(a.right) 7 case Failure(e) => cb(e.left) 8 } 9 } //> getData: (dbName: String)scalaz.concurrent.Task[String] 10 val procGetData = eval(getData("MySQL")) //> procGetData : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@dd3b207,<function1>,<function1>) 11 procGetData.runLog.run //> res9: Vector[String] = Vector(got data from MySQL)
我们也可以把java的callback转变成Task:
1 import com.ning.http.client._ 2 val asyncHttpClient = new AsyncHttpClient() //> asyncHttpClient : com.ning.http.client.AsyncHttpClient = com.ning.http.client.AsyncHttpClient@245b4bdc 3 def get(s: String): Task[Response] = Task.async[Response] { callback => 4 asyncHttpClient.prepareGet(s).execute( 5 new AsyncCompletionHandler[Unit] { 6 def onCompleted(r: Response): Unit = callback(r.right) 7 8 def onError(e: Throwable): Unit = callback(e.left) 9 } 10 ) 11 } //> get: (s: String)scalaz.concurrent.Task[com.ning.http.client.Response] 12 val prcGet = Process.eval(get("http://sina.com")) 13 //> prcGet : scalaz.stream.Process[scalaz.concurrent.Task,com.ning.http.client.Response] = Await(scalaz.concurrent.Task@222545dc,<function1>,<function1>) 14 prcGet.run.run //> 12:25:27.852 [New I/O worker #1] DEBUG c.n.h.c.p.n.r.NettyConnectListener -Request using non cached Channel '[id: 0x23fa1307, /192.168.200.3:50569 =>sina.com/66.102.251.33:80]':
如果直接按照scalaz Task callback的类型款式 def async(callback:(Throwable \/ Unit) => Unit):
1 def read(callback: (Throwable \/ Array[Byte]) => Unit): Unit = ??? 2 //> read: (callback: scalaz.\/[Throwable,Array[Byte]] => Unit)Unit 3 val t: Task[Array[Byte]] = Task.async(read) //> t : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@1a677343 4 val t2: Task[Array[Byte]] = for { 5 bytes <- t 6 moarBytes <- t 7 } yield (bytes ++ moarBytes) //> t2 : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@15de0b3c 8 val prct2 = Process.eval(t2) //> prct2 : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@15de0b3c,<function1>,<function1>) 9 10 def asyncRead(succ: Array[Byte] => Unit, fail: Throwable => Unit): Unit = ??? 11 //> asyncRead: (succ: Array[Byte] => Unit, fail: Throwable => Unit)Unit 12 val t3: Task[Array[Byte]] = Task.async { callback => 13 asyncRead(b => callback(b.right), err => callback(err.left)) 14 } //> t3 : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@489115ef 15 val t4: Task[Array[Byte]] = t3.flatMap(b => Task(b)) 16 //> t4 : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@3857f613 17 val prct4 = Process.eval(t4) //> prct4 : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@3857f613,<function1>,<function1>)
我们也可以用timer来产生Process[Task,A]:
1 import scala.concurrent.duration._ 2 implicit val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(3) 3 //> scheduler : java.util.concurrent.ScheduledExecutorService = java.util.concurrent.ScheduledThreadPoolExecutor@516be40f[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 4 val fizz = time.awakeEvery(3.seconds).map(_ => "fizz") 5 //> fizz : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@5762806e,<function1>,<function1>) 6 val fizz3 = fizz.take(3) //> fizz3 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>)) 7 fizz3.runLog.run //> res9: Vector[String] = Vector(fizz, fizz, fizz)
Queue、Top和Signal都可以作为带副作用数据源的构建器。我们先看看Queue是如何产生数据源的:
1 type BigStringResult = String 2 val qJobResult = async.unboundedQueue[BigStringResult] 3 //> qJobResult : scalaz.stream.async.mutable.Queue[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.mutable.Queue$$anon$1@25d250c6 4 def longGet(jobnum: Int): BigStringResult = { 5 Thread.sleep(2000) 6 s"Some large data sets from job#${jobnum}" 7 } //> longGet: (jobnum: Int)demo.ws.blogStream.BigStringResult 8 9 // multi-tasking 10 val start = System.currentTimeMillis() //> start : Long = 1468556250797 11 Task.fork(qJobResult.enqueueOne(longGet(1))).unsafePerformAsync{case _ => ()} 12 Task.fork(qJobResult.enqueueOne(longGet(2))).unsafePerformAsync{case _ => ()} 13 Task.fork(qJobResult.enqueueOne(longGet(3))).unsafePerformAsync{case _ => ()} 14 val timemill = System.currentTimeMillis() - start 15 //> timemill : Long = 17 16 Thread.sleep(3000) 17 qJobResult.close.run 18 val bigData = { 19 //multi-tasking 20 val j1 = qJobResult.dequeue 21 val j2 = qJobResult.dequeue 22 val j3 = qJobResult.dequeue 23 for { 24 r1 <- j1 25 r2 <- j2 26 r3 <- j3 27 } yield r1 + ","+ r2 + "," + r3 28 } //> bigData : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Await(scalaz.concurrent.Task@778d1062,<function1>,<function1>) 29 30 bigData.runLog.run //> res9: Vector[String] = Vector(Some large data sets from job#2,Some large data sets from job#3,Some large data sets from job#1)
再看看Topic示范:
1 import scala.concurrent._ 2 import scala.concurrent.duration._ 3 import scalaz.stream.async.mutable._ 4 import scala.concurrent.ExecutionContext.Implicits.global 5 val sharedData: Topic[BigStringResult] = async.topic() 6 //> sharedData : scalaz.stream.async.mutable.Topic[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.package$$anon$1@797badd3 7 val subscriber = sharedData.subscribe.runLog //> subscriber : scalaz.concurrent.Task[Vector[demo.ws.blogStream.BigStringResult]] = scalaz.concurrent.Task@226a82c4 8 val otherThread = future { 9 subscriber.run // Added this here - now subscriber is really attached to the topic 10 } //> otherThread : scala.concurrent.Future[Vector[demo.ws.blogStream.BigStringResult]] = List() 11 // Need to give subscriber some time to start up. 12 // I doubt you'd do this in actual code. 13 14 // topics seem more useful for hooking up things like 15 // sensors that produce a continual stream of data, 16 17 // and where individual values can be dropped on 18 // floor. 19 Thread.sleep(100) 20 21 sharedData.publishOne(longGet(1)).run // don't just call publishOne; need to run the resulting task 22 sharedData.close.run // Don't just call close; need to run the resulting task 23 24 // Need to wait for the output 25 val result = Await.result(otherThread, Duration.Inf) 26 //> result : Vector[demo.ws.blogStream.BigStringResult] = Vector(Some large data sets from job#1)
以上对可能带有副作用的Source的各种产生方法提供了解释和示范。scalaz-stream的其他类型节点将在下面的讨论中深入介绍。
转载于:https://www.cnblogs.com/tiger-xc/p/5674366.html
Scalaz(47)- scalaz-stream: 深入了解-Source相关推荐
- Scalaz(23)- 泛函数据结构: Zipper-游标定位
外面沙尘滚滚一直向北去了,意识到年关到了,码农们都回乡过年去了,而我却留在这里玩弄"拉链".不要想歪了,我说的不是裤裆拉链而是scalaz Zipper,一种泛函数据结构游标(cu ...
- java8 Lambda表达式的应用(函数式接口、lambda表达式,方法引用及Stream API)
之前写了一篇博客简单介绍了一下java 8发布新增的一些特性功能,java 8在2014年发布,距今也不少年了,但是lambda表达式使用并不熟练,现在一边学习,一边记录一下. 目录 一.Lambda ...
- 【java学习之路】(java SE篇)014.Stream API
Stream API Stream是一组用来处理数组.集合的API ▪ Java 8之所以费这么大功夫引入函数式编程,原因有二: – 代码简洁函数式编程写出的代码简洁且意图明确,使用 stream 接 ...
- java中Stream流
一 概述 数据渠道.管道,用于操作数据源(集合.数组等)所生成的元素序列. 集合讲的是数据,流讲的是计算 即一组用来处理数组,集合的API. 二 Stream特点 1 Stream 不是数据结构,没有 ...
- 【进阶技术】一篇文章搞掂:Spring Cloud Stream
本文总结自官方文档http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.0.RC3/single/spring-clou ...
- Spring Cloud Stream中文指导手册
Spring Cloud Stream中文指导手册 source 文章目录 Spring Cloud Stream中文指导手册 @[toc] Spring Cloud Stream 核心 1.简介 2 ...
- 第7章:Lambda和Stream
第42条 Lambda优先于匿名类 42.1 匿名内部类 //1. 匿名内部类适合于需要函数对象的经典面向对象设计模式,特别是策略模式 //2. 为什么下面例子是策略模式的应用呢,因为sort方法,可 ...
- 《微软:DirectShow开发指南》第12章 Writing DirectShow Source Filters
Of the three classes of Microsoft DirectShow filters-source, transform, and renderer-the source filt ...
- 大数据工具篇之flume1.4-安装部署指南
一.引言 flume-ng是一个分布式.高可靠和高效的日志收集系统,flume-ng是flume的新版本的意思,其中"ng"意为new generate(新一代),目前来说,flu ...
- 基于虎书实现LALR(1)分析并生成GLSL编译器前端代码(C#)
基于虎书实现LALR(1)分析并生成GLSL编译器前端代码(C#) 为了完美解析GLSL源码,获取其中的信息(都有哪些in/out/uniform等),我决定做个GLSL编译器的前端(以后简称编译器或 ...
最新文章
- EasyRTMP手机直播推送rtmp流flash无法正常播放问题
- pthread_cleanup_push与pthread_cleanup_pop的目的、作用
- mysql2012更改表名_sql alter table修改数据库的表名字
- STM32----摸石头过河系列(四)
- 买书(信息学奥数一本通-T1293)
- 分布式对象存储 读书笔记(一) 开始
- Route66,GPS道路导航系统,我所用过的最大块头的手机软件
- 冰点密码破解 — 强悍的调试器 SOFTICE
- 很全的C51库函数(IIC类)(IIC、EEPROM、ADXL、PCF、HMC、L3G、BMP)
- 有趣的23000----整理(09)C,D词根
- 20200528 前端开发日报
- 小猿日记(9) - 今天,我又拒绝了阿里的一次机会
- 微信公众号工作中如何产生新媒体思维
- C#把月日年帶有 AM、PM的时间格式转换为正常时间格式
- 前端框架 ng 环境配置
- 加速编码的17款最棒的CSS工具
- stl文件html预览,基于SpringMVC对stl文件的3D可视化
- 第二篇第一章概述及第二章生产和储存物品的火灾危险性分类 重点在于表格...
- 【R语言】白葡萄酒的EDA分析
- Manjaro + Windows 双系统安装指南
热门文章
- vba commondialog控件添加不上_MyVBA加载宏——添加自定义菜单03——功能分析
- java 字符串string、int和arraylist互转
- 区块链 以太坊 智能合约 运行原理和开发实例
- mysql遵循acid_关系型数据库遵循ACID规则
- 基于springboot+vue的房屋租赁系统(前后端分离)
- cactiez mysql_cactiez v11添加对mysql数据库、apache系统进行监控
- 小贝拉机器人是朋友_被Angelababy、周震南等摸头杀?机器人贝拉凭什么受宠
- 通过ajax实现简单的数据交互(模板引擎)
- SQL中truncate 、delete与drop区别 (Rollback Segment)
- 十六、Oracle学习笔记:索引和约束(表字段快速查询和约束)