Akka(19): Stream:组合数据流,组合共用-Graph modular composition
akka-stream的Graph是一种运算方案,它可能代表某种简单的线性数据流图如:Source/Flow/Sink,也可能是由更基础的流图组合而成相对复杂点的某种复合流图,而这个复合流图本身又可以被当作组件来组合更大的Graph。因为Graph只是对数据流运算的描述,所以它是可以被重复利用的。所以我们应该尽量地按照业务流程需要来设计构建Graph。在更高的功能层面上实现Graph的模块化(modular)。按上回讨论,Graph又可以被描述成一种黑盒子,它的入口和出口就是Shape,而内部的作用即处理步骤Stage则是用GraphStage来形容的。下面是akka-stream预设的一些基础数据流图:
上面Source,Sink,Flow代表具备线性步骤linear-stage的流图,属于最基础的组件,可以用来构建数据处理链条。而Fan-In合并型,Fan-Out扩散型则具备多个输入或输出端口,可以用来构建更复杂的数据流图。我们可以用以上这些基础Graph来构建更复杂的复合流图,而这些复合流图又可以被重复利用去构建更复杂的复合流图。下面就是一些常见的复合流图:
注意上面的Composite Flow(from Sink and Source)可以用Flow.fromSinkAndSource函数构建:
def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =fromSinkAndSourceMat(sink, source)(Keep.none)
这个Flow从流向来说先Sink再Source是反的,形成的Flow上下游间无法协调,即Source端终结信号无法到达Sink端,因为这两端是相互独立的。我们必须用CoupledTermination对象中的fromSinkAndSource函数构建的Flow来解决这个问题:
/*** Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them.* Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages.*/ object CoupledTerminationFlow {@deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2")def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] =Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)
从上面图列里的Composite BidiFlow可以看出:一个复合Graph的内部可以是很复杂的,但从外面看到的只是简单的几个输入输出端口。不过Graph内部构件之间的端口必须按照功能逻辑进行正确的连接,剩下的就变成直接向外公开的界面端口了。这种机制支持了层级式的模块化组合方式,如下面的图示:
最后变成:
在DSL里我们可以用name("???")来分割模块:
val nestedFlow =Flow[Int].filter(_ != 0) // an atomic processing stage.map(_ - 2) // another atomic processing stage.named("nestedFlow") // wraps up the Flow, and gives it a name val nestedSink =nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow.named("nestedSink") // wrap it up// Create a RunnableGraph val runnableGraph = nestedSource.to(nestedSink)
在下面这个示范里我们自定义一个某种功能的流图模块:它有2个输入和3个输出。然后我们再使用这个自定义流图模块组建一个完整的闭合流图:
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._import scala.collection.immutableobject GraphModules {def someProcess[I, O]: I => O = i => i.asInstanceOf[O]case class TwoThreeShape[I, I2, O, O2, O3](in1: Inlet[I],in2: Inlet[I2],out1: Outlet[O],out2: Outlet[O2],out3: Outlet[O3]) extends Shape {override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Niloverride def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Niloverride def deepCopy(): Shape = TwoThreeShape(in1.carbonCopy(),in2.carbonCopy(),out1.carbonCopy(),out2.carbonCopy(),out3.carbonCopy())} //a functional module with 2 input 3 outputdef TwoThreeGraph[I, I2, O, O2, O3] = GraphDSL.create() { implicit builder =>val balancer = builder.add(Balance[I](2))val flow = builder.add(Flow[I2].map(someProcess[I2, O2]))TwoThreeShape(balancer.in, flow.in, balancer.out(0), balancer.out(1), flow.out)}val closedGraph = GraphDSL.create() {implicit builder =>import GraphDSL.Implicits._val inp1 = builder.add(Source(List(1,2,3))).outval inp2 = builder.add(Source(List(10,20,30))).outval merge = builder.add(Merge[Int](2))val mod23 = builder.add(TwoThreeGraph[Int,Int,Int,Int,Int])inp1 ~> mod23.in1inp2 ~> mod23.in2mod23.out1 ~> merge.in(0)mod23.out2 ~> merge.in(1)mod23.out3 ~> Sink.foreach(println)merge ~> Sink.foreach(println)ClosedShape} }object TailorGraph extends App {import GraphModules._implicit val sys = ActorSystem("streamSys")implicit val ec = sys.dispatcherimplicit val mat = ActorMaterializer()RunnableGraph.fromGraph(closedGraph).run()scala.io.StdIn.readLine()sys.terminate()}
这个自定义的TwoThreeGraph是一个复合的流图模块,是可以重复使用的。注意这个~>符合的使用:akka-stream只提供了对预设定Shape作为连接对象的支持如:
def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...}def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit =b.addEdge(importAndGetPort(b), b.add(to).in)def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit =b.addEdge(importAndGetPort(b), to.in) ...
所以对于我们自定义的TwoThreeShape就只能使用直接的端口连接了:
def ~>[U >: T](to: Inlet[U])(implicit b: Builder[_]): Unit =b.addEdge(importAndGetPort(b), to)
以上的过程显示:通过akka的GraphDSL,对复合型Graph的构建可以实现形象化,大部分工作都在如何对组件之间的端口进行连接。我们再来看个较复杂复合流图的构建过程,下面是这个流图的图示:
可以说这是一个相对复杂的数据处理方案,里面甚至包括了数据流回路(feedback)。无法想象如果用纯函数数据流如scalaz-stream应该怎样去实现这么复杂的流程,也可能根本是没有解决方案的。但用akka GraphDSL可以很形象的组合这个数据流图;
import GraphDSL.Implicits._RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>val A: Outlet[Int] = builder.add(Source.single(0)).outval B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))val G: Inlet[Any] = builder.add(Sink.foreach(println)).inC <~ FA ~> B ~> C ~> FB ~> D ~> E ~> FE ~> GClosedShape})
另一个端口连接方式的版本如下:
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>val B = builder.add(Broadcast[Int](2))val C = builder.add(Merge[Int](2))val E = builder.add(Balance[Int](2))val F = builder.add(Merge[Int](2))Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)C.in(0) <~ F.outB.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)E.out(1) ~> Sink.foreach(println)ClosedShape })
如果把上面这个复杂的Graph切分成模块的话,其中一部分是这样的:
val partial = GraphDSL.create() { implicit builder =>val B = builder.add(Broadcast[Int](2))val C = builder.add(Merge[Int](2))val E = builder.add(Balance[Int](2))val F = builder.add(Merge[Int](2))C <~ FB ~> C ~> FB ~> Flow[Int].map(_ + 1) ~> E ~> FFlowShape(B.in, E.out(1))}.named("partial")
// Convert the partial graph of FlowShape to a Flow to get // access to the fluid DSL (for example to be able to call .filter()) val flow = Flow.fromGraph(partial)// Simple way to create a graph backed Source val source = Source.fromGraph( GraphDSL.create() { implicit builder =>val merge = builder.add(Merge[Int](2))Source.single(0) ~> mergeSource(List(2, 3, 4)) ~> merge// Exposing exactly one output portSourceShape(merge.out) })// Building a Sink with a nested Flow, using the fluid DSL val sink = {val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow")nestedFlow.to(Sink.head) }// Putting all together val closed = source.via(flow.filter(_ > 1)).to(sink)
返回运算结果是通过viaMat, toMat来实现的。简写的via,to默认选择流图左边运算产生的结果。
转载于:https://www.cnblogs.com/tiger-xc/p/7421514.html
Akka(19): Stream:组合数据流,组合共用-Graph modular composition相关推荐
- R语言使用ggpubr包的ggarrange函数组合多张结论图(垂直组合+水平组合)并对图像进行顺序编码A、B、C,,,
R语言使用ggpubr包的ggarrange函数组合多张结论图(垂直组合+水平组合)并对图像进行顺序编码A.B.C,,, 目录
- R语言使用ggpubr包的ggarrange函数组合多张结论图:使用ggpubr包将多个可视化结论嵌套起来输出(ggarrange组合ggarrange组合后的图像)
R语言使用ggpubr包的ggarrange函数组合多张结论图:使用ggpubr包将多个可视化结论嵌套起来输出(ggarrange组合ggarrange组合后的图像) 目录
- 【组合数学】排列组合 ( 排列组合示例 )
文章目录 一.排列组合示例 1 ( 组合 | 乘法法则 | 加法法则 ) 二.排列组合示例 2 参考博客 : [组合数学]基本计数原则 ( 加法原则 | 乘法原则 ) [组合数学]集合的排列组合问题示 ...
- 隐藏数据的好方法----Alternative Data Stream(可选数据流)
前言 "Alternative Data Stream"用中文来说就是"可选数据流" 它可以干啥呢? 它可以隐藏数据. 演示 创建一个aa.txt和一个bb.t ...
- 组合有功电能,组合无功电能的概念
概念 组合有功电能 combination active energy 对正向.反向有功电能进行加.减组合运算得出的有功电能,单位是kWh. 组合无功电能 combination reactive e ...
- pyecharts-动态可视化(4)日历表 / k线图 /饼图组合 /散点图组合
pyecharts中日程表/ k线图 /饼图组合 /散点图组合,代码可以直接运行 在制作图表所需用到的代码均做了注释,用的V1版本.非常的小白,非常的友好!! 超级详细的注释,还有动图呢! 日历表 导 ...
- 组合导航原理-松组合+紧组合概念
文章目录 组合导航系统 组合导航的概念 广义 狭义 什么是滤波器 互补滤波 惯性导航中的卡尔曼滤波思想 GNSS/INS组合导航分类 松组合 概念 特点 紧组合 概念 特点 深组合 概念 特点 GNS ...
- 【算法leetcode每日一练】剑指 Offer II 080. 含有 k 个元素的组合 | 77. 组合
文章目录 剑指 Offer II 080. 含有 k 个元素的组合 | 77. 组合: 样例 1: 样例 2: 提示: 分析 题解 java c c++ python go rust javascri ...
- 【组合数学】排列组合 ( 集合组合、一一对应模型分析示例 )
文章目录 一.集合组合.一一对应模型分析示例 排列组合参考博客 : [组合数学]基本计数原则 ( 加法原则 | 乘法原则 ) [组合数学]集合的排列组合问题示例 ( 排列 | 组合 | 圆排列 | 二 ...
最新文章
- nChain再获数字货币安全专利,助力BCH更加安全可靠
- ASP.NET 2.0中改变passwordrecovery发邮件时的信息
- Android studio下将项目代码上传至github包括更新,同步,创建依赖
- mysql分组取日期最大的记录_mysql 分组 group by, 排序 取每条记录中,时间最大
- 对路径的访问被拒绝怎么办_学习了解ACL—扩展访问控制列表,就在网工知识角...
- 跳表(skipList)
- Android开发:操作UI线程4种方法
- spark mongo java_Spark Mongodb集成 - Python版
- game,match,competition,contest区别
- bfs广度优先搜索算法_图的广度优先搜索(BFS)
- ad09只在一定范围内查找相似对象_重磅!Excel更新了超级查找函数XLOOKUP,可以对VLOOKUP说拜拜了...
- UE4之cmd调用函数
- vnc连接服务器怎么修改配置,VNC配置最详细易懂教程-图文并茂
- 大众CEO提前“毕业”,马斯克:软件是通向未来的关键
- string容器模拟实现及使用——C++
- 手机rar压缩包解密,rar压缩包权限密码多少?
- Doevents用法
- 解决ubuntu下外接2k显示器却无法调2k分辨率问题
- Linux(Ubuntu16.04)自学笔记,资源整理
- 简单归纳一下32位、64位、x86、x64的区别和联系
热门文章
- 罗斯蒙特电磁流量计8723说明书_罗斯蒙特8732E电磁流量计故障原因及解决办法!...
- 20175204 张湲祯 2018-2019-2《Java程序设计》 第一周学习总结
- Spring对AspectJ的支持
- Windows10远程报错:由于CredSSP加密Oracle修正
- 用C语言实现:判断1000-2000年之间的闰年。
- Android - 基于Toolbar的Navigation Drawer(Material Design)
- 兼容门:先卸载腾讯QQ,再卸载360软件!
- ADO.NET 3.5 Cookbook:一、连接数据(1)保存连接字符串
- springboot中getOutputStream() has already been called for this response和java.io.FileNotFoundException
- 联想电脑g470 vs2010很卡 问题解决