使用反应流API将Akka流与rxJava结合在一起
这次只是一篇简短的文章,因为我仍在尝试这种东西。 关于反应式编程有很多话题。 在Java 8中,我们有Stream API,我们有rxJava我们有ratpack ,Akka有akka-streams 。
这些实现的主要问题是它们不兼容。 您不能将一个实现的订阅者连接到另一个实现的发布者。 幸运的是,一项倡议已经开始提供一种方法,使这些不同的实现可以协同工作:
“本规范旨在允许创建许多符合标准的实现,这些实现将通过遵守规则将能够平滑地互操作,并在流应用程序的整个处理图中保留上述好处和特征。”
来自– http://www.reactive-streams.org/
这是如何运作的
现在我们该怎么做? 让我们看一下基于akka-stream提供的示例的快速示例(从此处开始 )。 在下面的清单中:
package sample.streamimport akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{SubscriberSink, PublisherSource, Source}
import com.google.common.collect.{DiscreteDomain, ContiguousSet}
import rx.RxReactiveStreams
import rx.Observable;
import scala.collection.JavaConverters._object BasicTransformation {def main(args: Array[String]): Unit = {// define an implicit actorsystem and import the implicit dispatcherimplicit val system = ActorSystem("Sys")import system.dispatcher// flow materializer determines how the stream is realized.// this time as a flow between actors.implicit val materializer = FlowMaterializer()// input text for the stream.val text ="""|Lorem Ipsum is simply dummy text of the printing and typesetting industry.|Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, |when an unknown printer took a galley of type and scrambled it to make a type |specimen book.""".stripMargin// create an observable from a simple list (this is in rxjava style)val first = Observable.from(text.split("\\s").toList.asJava);// convert the rxJava observable to a publisherval publisher = RxReactiveStreams.toPublisher(first);// based on the publisher create an akka sourceval source = PublisherSource(publisher);// now use the akka style syntax to stream the data from the source// to the sink (in this case this is println)source.map(_.toUpperCase). // executed as actorsfilter(_.length > 3).foreach { el => // the sink/consumerprintln(el)}.onComplete(_ => system.shutdown()) // lifecycle event}
}
此示例中的代码注释几乎解释了正在发生的事情。 我们在这里所做的是创建一个基于rxJava的Observable。 将此Observable转换为“反应流”发布者,并使用此发布者创建akka-streams源。 对于其余的代码,我们可以使用akka-stream样式流API对流进行建模。 在这种情况下,我们只需要进行一些过滤并打印出结果即可。
翻译自: https://www.javacodegeeks.com/2014/11/use-reactive-streams-api-to-combine-akka-streams-with-rxjava.html
使用反应流API将Akka流与rxJava结合在一起相关推荐
- react api_使用React流API将Akka流与rxJava结合在一起
react api 这次只是快速的一篇文章,因为我仍在尝试这种东西. 关于React式编程有很多话题. 在Java 8中,我们有Stream API,有rxJava我们有ratpack ,Akka有a ...
- qmediaplayer获取流类型_Java 流API
流相关的接口和类在java.util.stream包中. AutoCloseable接口来自java.lang包. 所有流接口从继承自AutoCloseable接口的BaseStream接口继承. A ...
- meetup_使用RxNetty访问Meetup的流API
meetup 本文将涉及多个主题:响应式编程,HTTP,解析JSON以及与社交API集成. 完全在一个用例中:我们将通过非夸张的RxNetty库实时加载和处理新的metup.com事件,结合Netty ...
- lambda ::表达式_Lambda表达式和流API:基本示例
lambda ::表达式 这篇博客文章包含基本Lambda表达式和Stream API示例的列表,我在2014年6月在Java用户组Politechnica Gedanensis (格但斯克技术大学) ...
- jsr250-api_JSON处理的Java API(JSR-353)–流API
jsr250-api Java很快将具有一组标准的API,作为Java EE 7的一部分处理JSON.该标准定义为JSR 353 – JSON处理的Java API (JSON-P),目前正在最终批准 ...
- 使用RxNetty访问Meetup的流API
本文将涉及多个主题:响应式编程,HTTP,解析JSON以及与社交API集成. 完全在一个用例中:我们将通过非夸张的RxNetty库实时加载和处理新的metup.com事件,结合Netty框架的强大功能 ...
- Lambda表达式和流API:基本示例
该博客文章包含基本Lambda表达式和Stream API示例的列表,我在2014年6月在Java用户组– Politechnica Gedanensis (格但斯克技术大学)和Goyello的实时编 ...
- JSON处理的Java API(JSR-353)–流API
Java很快将具有一组标准的API,作为Java EE 7的一部分处理JSON.此标准定义为JSR 353 – JSON处理的Java API (JSON-P),目前正在最终批准投票中. JSON-P ...
- NodeJS Stream(可读流、可写流) API解读
原文链接:https://juejin.im/post/5ba0d504e51d450ea13223ba 流的介绍 在 NodeJS 中,我们对文件的操作需要依赖核心模块 fs,fs 中有很基本 AP ...
最新文章
- java常见异常说明汇总
- Spring Cloud Gateway(过滤器)
- background-image使用svg如何改变颜色
- Compare int a and int b without if
- 2021-06-27JSON介绍
- 笔记本cpu温度高怎么办_夏天面团温度高怎么办?几个技巧让面温立马降下来
- android与相机连接电脑,使用Android手机或平板电脑控制dSLR相机
- 蚁群算法原理及c++实现
- 一名优秀的数据分析师应该具备这10项关键技能
- localhost无法访问
- 2022CPA财务成本管理-企业管理专题Corporate Goverance【完结】
- IIS发生意外错误0x8ffe2740 IIS不…
- c# 数字转换为中文数字
- 一块宝藏存储芯片【SD NAND】
- vue中引用echarts设置渐变色new echarts不能用
- 【TCP的拥塞控制】基于窗口的拥塞控制
- 计算机视觉|投影与三维视觉
- 40 岁的 C++ 没有中年危机
- 区块链技术十周年—回眸与前瞻
- SEU-2020-卓越大赛面试
热门文章
- Spring的properties属性配置文件和Spring常用注解
- SpringBoot运行原理初探
- 计算字典的个数_[LeetCode] 440. 字典序的第K小数字
- 转-Redis AOF 持久化详解
- 如何评估模型的预测性能?
- tomcatSupplement(1)tomcat启动脚本分析(以Windows平台为例)
- micrometer_具有InlfuxDB的Spring Boot和Micrometer第2部分:添加InfluxDB
- jdk类加载器执行步骤_[JDK 11] jcmd中的类加载器层次结构详细信息
- jpa 循环引用_JPA中按身份引用
- 抽象工厂和工厂方法示例_工厂方法设计模式示例