具有Akka反应流的反应队列
反应性流是最近宣布的一项计划,旨在在JVM上为具有内置背压的异步流处理创建标准。 该工作组由Typesafe,Red Hat,Oracle,Netflix等公司组成。
早期的实验性实现之一是基于Akka的 。 预览版0.3包括演员生产者和消费者,这为集成提供了新的可能性。
为了测试新技术,我实现了一个非常简单的Reactive Message Queue 。 该代码处于PoC阶段,缺乏错误处理等功能,但如果使用正确,则可以正常工作!
队列是响应式的,这意味着消息将在需要时传递给感兴趣的各方,而无需轮询。 在发送消息时(以便发送者不会使代理不堪重负)以及在接收消息时(以便代理仅发送与接收者可以使用的消息一样多的消息)都会施加反压。
让我们看看它是如何工作的!
队列
首先,队列本身是一个参与者,对(反应式)流一无所知。 该代码位于com.reactmq.queue
包中。 actor接受以下actor消息(“ message”一词在这里超载,因此我将使用普通的“ message”来表示我们发送到队列和从队列中接收的消息,“ actor-messages”是Scala。类实例发送给演员):
SendMessage(content)
–发送具有指定String
内容的消息。 带有消息ID的回复(SentMessage(id)
)被发送回发送者ReceiveMessages(count)
–表示发件人(执行者)想接收的邮件count
最多的信号。 该计数与先前发出信号的需求累加。DeleteMessage(id)
–毫不奇怪,删除一条消息
队列实现是ElasticMQ中的简化版本。 收到消息后,如果在10秒钟内未将其删除(确认),则可以再次接收。
当一个actor发出对消息的需求信号时(通过将ReceiveMessages
发送到队列actor),它应该期望ReceivedMessages(msgs)
任意数量的ReceivedMessages(msgs)
actor-message答复,其中包含接收到的数据。
变得被动
要创建和测试我们的反应式队列,我们需要三个应用程序:
Sender
- 中央
Broker
Receiver
我们可以运行任意数量的Senders
和Receivers
,但是当然我们应该只运行一个Broker
。
我们需要做的第一件事是通过网络将Sender
与Broker
连接,将Receiver
与Broker
连接。 我们可以使用Akka IO扩展和反应式TCP扩展来做到这一点。 使用connect
和bind
对,我们在绑定端获得了连接流:
// sender:
val connectFuture = IO(StreamTcp) ? StreamTcp.Connect(settings, sendServerAddress)connectFuture.onSuccess {case binding: StreamTcp.OutgoingTcpConnection =>logger.info("Sender: connected to broker")// per-connection logic
}// broker:
val bindSendFuture = IO(StreamTcp) ? StreamTcp.Bind(settings, sendServerAddress)bindSendFuture.onSuccess {case serverBinding: StreamTcp.TcpServerBinding =>logger.info("Broker: send bound")Flow(serverBinding.connectionStream).foreach { conn =>// per-connection logic}.consume(materializer)
}
有一个用于发送和接收消息的地址。
发件人
首先让我们看一下Sender
的每个连接逻辑。
Flow(1.second, () => { idx += 1; s"Message $idx from $senderName" }).map { msg =>logger.debug(s"Sender: sending $msg")createFrame(msg)}.toProducer(materializer).produceTo(binding.outputStream)
我们正在创建一个滴答流,它每秒产生一个新消息(非常方便测试)。 使用map
流转换器,我们用消息创建了一个字节帧(稍后会详细介绍)。 但这仅仅是对我们(非常简单)流的外观的描述; 它需要使用物化 toProducer
方法,该方法将提供流变换节点的具体实现。 目前,只有一个FlowMaterializer
,这同样令人惊讶地使用引擎盖下的Akka actor来实际创建流和流。
最后,我们将刚刚创建的生产者连接到TCP绑定的outputStream
,它恰好是使用者。 现在,我们有了一个反应性的网络消息流,这意味着仅当Broker
可以接受消息时,才发送消息。 否则,反压将一直施加到滴答声产生器。
代理:发送消息
在网络的另一端是Broker
。 让我们看看消息到达时会发生什么。
Flow(serverBinding.connectionStream).foreach { conn =>logger.info(s"Broker: send client connected (${conn.remoteAddress})")val sendToQueueConsumer = ActorConsumer[String](system.actorOf(Props(new SendToQueueConsumer(queueActor))))// sending messages to the queue, receiving from the clientval reconcileFrames = new ReconcileFrames()Flow(conn.inputStream).mapConcat(reconcileFrames.apply).produceTo(materializer, sendToQueueConsumer)
}.consume(materializer)
首先,我们创建了一个Flow
,那将是字节输入流-从连接的输入流。 接下来,我们重新构造使用框架发送的String
实例,最后将流定向到发送到队列的使用者。
SendToQueueConsumer
是到主队列SendToQueueConsumer
的每个连接的桥。 它使用Akka的Reactive Streams实施中的ActorConsumer
特性来自动管理应该在上游发出信号的需求。 利用这一特征,我们可以创建一个由参与者支持的反应流Consumer[_]
,从而实现完全可定制的接收器。
class SendToQueueConsumer(queueActor: ActorRef) extends ActorConsumer {private var inFlight = 0override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {override def inFlightInternally = inFlight}override def receive = {case OnNext(msg: String) =>queueActor ! SendMessage(msg)inFlight += 1case SentMessage(_) => inFlight -= 1}
}
需要提供给ActorConsumer
的方法是一种度量当前正在处理的流项目的数量的方法。 在这里,我们正在计算已发送到队列但尚未收到ID的消息数(因此,队列正在处理它们)。
消费者收到包装在OnNext
actor消息中的新消息; 因此, OnNext
由流发送到SentMessage
,而SentMessage
被队列SentMessage
发送以回复SendMessage
。
接收
接收部分以类似的方式完成,尽管它需要一些额外的步骤。 首先,如果您查看Receiver
,您会看到我们正在从输入流中读取字节,从帧中重构消息,并发回ID,从而确认消息。 实际上,我们将在接收消息和发回ID之间运行一些消息处理逻辑。
在Broker
方,我们为每个连接创建两个流。
一个是发送给接收者的消息流,另一个是来自接收者的已确认消息id的流,这些流被简单地转换为将DeleteMessage
消息发送给队列actor。
与使用者类似,我们需要从队列参与者到流的每个连接接收桥。 这是在ReceiveFromQueueProducer
实现的。 在这里,我们扩展了ActorProducer
特性,它使您可以完全控制实际创建流中消息的过程。
在此参与者中,流正在发送Request
参与者消息,以发出需求信号。 当有需求时,我们从队列中请求消息。 队列最终将以一个或多个ReceivedMessages
actor消息进行响应(当队列中有任何消息时); 由于消息的数量永远不会超过信号需求,因此我们可以安全地调用ActorProducer.onNext
方法,该方法将给定的项目发送到下游。
构图
一个小细节是,我们需要一个自定义的框架协议(感谢Roland Kuhn的澄清 ),因为TCP流只是字节流,因此我们可以获得数据的任意片段,需要稍后重新组合。 幸运的是,实现这样的框架非常简单–请参阅Framing
类。 每个帧由消息的大小和消息本身组成。
加起来
使用反应式流和Akka实施,可以轻松创建具有端到端背压的反应式应用程序。 上面的队列虽然缺少许多功能和证明,但不允许Senders
使Broker
过载,而另一方面, Broker
会使Receivers
过载。 所有这些,而无需实际编写任何背压处理代码!
翻译自: https://www.javacodegeeks.com/2014/06/reactive-queue-with-akka-reactive-streams.html
具有Akka反应流的反应队列相关推荐
- TCP流中各种队列:
TCP流中各种队列: RED队列的介绍 [https://blog.csdn.net/sinat_20184565/article/details/107521549]
- react 消息队列_具有AkkaReact流的React队列
react 消息队列 React性流是最近宣布的一项计划,旨在在JVM上为具有内置背压的异步流处理创建标准. 该工作组由Typesafe,Red Hat,Oracle,Netflix等公司组成. 早期 ...
- 分享笔记RabbitMQ高级之消息限流与延时队列
楔子 本篇是消息队列RabbitMQ的第五弹. 上篇本来打算讲述RabbitMQ的一些高级用法: 如何保证消息的可靠性? 消息队列如何进行限流? 如何设置延时队列进行延时消费? 最终因为篇幅缘故,上篇 ...
- 锁与并发工具包与线程池与LockSupport与Fork/Join框架与并行流串行流与阻塞队列与JPS,jstack命令查看死锁查看线程状态与AQS个人笔记九
朝闻道,夕死可矣 本文共计 86564字,估计阅读时长1小时 点击进入->Thread源码万字逐行解析 文章目录 本文共计 86564字,估计阅读时长1小时 一锁 二Java中13个原子操作类 ...
- 并发编程-25 高并发处理手段之消息队列思路 + 应用拆分思路 + 应用限流思路
文章目录 概述 消息队列 消息队列特性 为什么需要消息队列 消息队列的好处 消息队列举例 应用拆分 应用拆分的原则 应用拆分的思考 应用拆分常用的组件 Dubbo Spring Cloud 应用限流 ...
- 大数据实时流计算详解
开篇词-攻克实时流计算难点,掌握大数据未来 我曾任职于华为 2012 实验室高斯部门,负责实时分析型内存数据库 RTANA.华为公有云 RDS 服务的研发工作.目前,我专注于移动反欺诈解决方案的研发. ...
- Akka实战:HTTP大文件断点上传、下载,秒传
2019独角兽企业重金招聘Python工程师标准>>> 访问:https://github.com/yangbajing/scala-applications/tree/master ...
- 一周一论文(翻译)——[PVLDB 17] Dhalion: 基于Heron自适应调整的流处理系统
Abstract 近年来,大规模实时分析需求激增,并且已开发出大量流处理系统来支持此类应用. 即使遇到硬件和软件故障,这些系统也能够继续进行流处理. 然而,这些系统并未解决其Operator面临的一些 ...
- oracle aq_通过Java 8流使用Oracle AQ
oracle aq Oracle数据库最令人敬畏的功能之一是Oracle AQ:Oracle数据库高级队列 . AQ API直接在数据库中实现了完整的事务性消息传递系统. 在数据库处于系统中心的经典体 ...
最新文章
- html按钮线性炫光,6分钟实现CSS炫光倒影按钮 html+css
- 网络应用 axIos的基本使用
- 2021级山西高考成绩查询时间,2021山西高考成绩什么时候出
- Python | 用PrettyPrinter,让Python输出更漂亮,你值得拥有
- 数据蒋堂 | 大数据集群该不该透明化?
- java 常量字符串过长_90%的同学都没搞清楚的 Java 字符串常量池问题(图文并茂)
- linux基础知识复习
- 『设计模式』80年代的人们就已经领悟了设计模式-- 发布者/订阅者模式 (包括发布者/订阅者模式和观察者模式的区别)
- 大数据公司Palantir曾向法拉第未来投资2500万美元
- innobackupex和Xtrabackup备份和恢复MySQL数据
- Yii 2.0 权威指南(7) 关键概念
- 用python画动态图_Python使用matplotlib画动态图
- 安徽省湖泊河流ArcGIS地形图shp图层文件下载
- java系列视频教程下载
- 关于google拼音输入法的坑爹问题-IE浏览器浏览网页蓝屏等问题
- plc控制伺服电机 四轴攻丝机案例 该程序为plc控制伺服电机的工程案例程序,包含伺服电机接线图,包含程序流程的详细解释说明
- #芯片# R8025(RX-8025T)
- thinkpad重装系统步骤
- Linux 错误E45,readonly optionisset(add ...)
- win10系统无法连接xp工作组计算机,Win10系统访问XP共享打印机连接不了的解决方法...
热门文章
- 量子计算机对人类长寿,科学家称“极端长寿”在未来几十年可能会达到新的里程碑...
- Linux ss命令 报错,ECS Linux中ss命令显示连接状态的使用说明
- java虚拟机的内存模型_JVM(Java虚拟机)内存模型(转载/整理)
- 进程与服务的签名_苹果app签名需要注意哪几点
- python 线性回归函数_Python实现的简单线性回归算法实例分析
- java集合——集合框架
- elk入门_ELK堆栈入门
- arquillian_使用Arquillian(远程)测试OpenLiberty
- xml不利于调试_流利的接口不利于维护
- rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable