akka 消息发送接收

在上一篇文章中,我们研究了Akka Typed提供的一些基本功能。 在本文和下一篇文章中,我们将更进一步地了解一些其他功能,并通过查看Akka Typed提供的两种不同模式来做到这一点:Receiver和Receptionist模式。 如果您是Akka Typed的新手,那么最好先阅读上一篇文章,因为这将使您对Akka Typed有所了解。 因此,对于本系列中的Akka型文章,我们将研究Receiver模式。

  • 与往常一样,您可以在Github Gist中找到此示例的代码: https : //gist.github.com/josdirksen/77e59d236c637d46ab32

接收方模式

在Akka Typed发行版中,有一个名为akka.typed.patterns的包。 在此程序包中,有两种不同的模式,即接收方模式和接收方模式。 坦白说,为什么这两种模式足够重要以增加发行版,但我确实不知道,但是它们确实为在Akka Typed之后引入更多概念和想法提供了一个很好的方法。

因此,让我们看一下Receiver模式,在下一篇文章中我们将做Receptionist模式。 要了解Receiver模式的功能,只需看一下我们可以发送给它的消息:

/*** Retrieve one message from the Receiver, waiting at most for the given duration.*/final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]/*** Retrieve all messages from the Receiver that it has queued after the given* duration has elapsed.*/final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]/*** Retrieve the external address of this Receiver (i.e. the side at which it* takes in the messages of type T.*/final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]

从这些消息中可以看到,Receiver的工作是将T类型的消息排队,并提供其他命令以在等待特定时间的同时获取这些消息中的一个或多个。 要使用接收器,我们需要获取ExternalAddress,以便我们可以向其发送类型为T的消息。 并且可以从其他参与者发送get GetOne和GetAll消息,以查看接收器中是否有任何消息在等待。

对于我们的示例,我们将创建以下参与者:

  • 生产者,它向接收者发送类型为T的消息。
  • 可以从此接收器检索类型T消息的使用者。
  • 根角色,运行此方案。

我们将从生产者开始,如下所示:

/*** Producer object containing the protocol and the behavior. This is a very simple* actor that produces messages using a schedule. To start producing messages* we need to send an initial message*/object Producer {// a simple protocol defining the messages that can be sentsealed trait ProducerMsgfinal case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsgfinal case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg// the producer, which first waits for a registerReceiver message, after which// it changes behavior, to send messages.val producer = Full[ProducerMsg] {// if we receive a register message, we know where to send messages tocase Msg(ctx, registerReceiverMsgIn(msgConsumer)) =>println("Producer: Switching behavior")// simple helper function which sends a message to self.def scheduleMessage() = ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(s"hello @ ${System.currentTimeMillis()}")))// schedule the first one, the rest will be triggered through the behavior.scheduleMessage()Static {// add a message to the receiver and schedule a new onecase addHelloWorldMsg(msg) => {println(s"Producer: Adding new '$msg' to receiver: $msgConsumer") ;msgConsumer ! msg; scheduleMessage()}}// don't switch behavior on any of the other messagescase _ => Same}}

在此对象中,我们定义了可以发送给角色的消息以及行为。 registerReceiverMsgIn消息为actor提供了应该向其发送消息的目的地(稍后会对此进行详细介绍),并且addHelloWorldMsg告诉行为将什么消息发送到registerReceiverMsgIn消息提供的地址。 如果您查看此行为,则可以看到我们使用Full [T]行为。 对于这种行为,我们必须为所有消息和信号提供匹配器,此外,我们还可以访问actor ctx。 在其初始状态下,此行为仅响应registerReceiverMsgIn消息。 当它收到这样的消息时,它会做两件事:

  1. 它定义了一个函数,我们可以用来调度消息,我们也可以直接调用它,以调度消息在半秒钟内发送。
  2. 它定义了我们的新行为。 此新行为可以处理scheduleMessage函数发送的消息。 收到该消息后,它将内容发送到提供的messageConsumer(接收方),然后再次调用计划消息。 保持每500毫秒发送一次消息。

因此,当我们发送初​​始的registerReceiverMessage时,它将导致actor每500 ms向接收者发送一条新消息。 现在让我们看看另一面:消费者。

对于消费者,我们还将所有内容包装在一个对象中,如下所示:

object Consumer {val consumer = Total[HelloMsg] {// in the case of a registerReceiver message, we change the implementation// since we're ready to receive other message.case registerReceiverCmdIn(commandAddress) => {println("Consumer: Switching behavior")// return a static implementation which closes over actorRefs// all messages we receive we pass to the receiver, which will queue// them. We have a specific message that prints out the received messagesContextAware { ctx =>Static[HelloMsg] {// printmessages just prints out the list of messages we've receivedcase PrintMessages(msgs) => println(s"Consumer: Printing messages: $msgs") ;msgs.foreach { hw => println(s"  $hw")}// if we get the getAllMessages request, we get all the messages from// the receiver.case GetAllMessages() => {println("Consumer: requesting all messages")val wrap = ctx.spawnAdapter[GetAllResult[HelloMsg]] {case msgs:GetAllResult[HelloMsg] => println(s"Consumer: Received ${msgs.msgs.length} messages"); PrintMessages(msgs.msgs)}commandAddress ! GetAll(2 seconds)(wrap)}}}}// for all the other cases return the existing implementation, in essence// we're just ignoring other messages till we change statecase _ => Same}    }

在此对象中,我们定义了一个行为,该行为在接收到第一条消息后也会切换其实现。 在这种情况下,第一个消息称为registerReceiverCmdIn。 通过此消息,我们可以访问(接收方的)actorRef,将GetAll和getOne消息发送至该消息。 切换行为后,我们将处理自己的自定义GetAllMessages消息,这将触发将GetAll消息发送到接收器。 由于未针对从Receiver收到的响应类型键入我们自己的行为,因此我们使用适配器(ctx.spawnAdapter)。 该适配器将接收来自接收器的响应并打印出消息。

最后一个消息部分是一个启动此行为的参与者:

// Simple root actor, which we'll use to start the other actorsval scenario1 = {Full[Unit] {case Sig(ctx, PreStart) => {import Producer._import Consumer._println("Scenario1: Started, now lets start up a number of child actors to do our stuff")// first start the two actors, one implements the receiver pattern, and// the other is the one we control directly.val receiverActor = ctx.spawn(Props(Receiver.behavior[HelloMsg]), "receiver")val consumerActor = ctx.spawn(Props(consumer), "adder")val producerActor = ctx.spawn(Props(producer), "producer")// our producerActor first needs the actorRef it can use to add messages to the receiver// for this we use a wrapper, this wrapper creates a child, which we use to get the// address, to which we can send messages.val wrapper = ctx.spawnAdapter[ActorRef[HelloMsg]] {case p: ActorRef[HelloMsg] => producerActor ! registerReceiverMsgIn(p)}// now send the message to get the external address, the response will be sent// to our own actor as a registerReceiver message, through the adapterreceiverActor ! ExternalAddress(wrapper)// our printing actor needs to now the address of the receiver so send it to himconsumerActor ! registerReceiverCmdIn(receiverActor)// by calling getAllMessages we get the messages within a time period.println("Scenario1: Get all the messages")consumerActor ! GetAllMessages()Thread.sleep(3000)consumerActor ! GetAllMessages()Thread.sleep(5000)consumerActor ! GetAllMessages()Same}}}

这里没什么特别的。 在这种情况下,我们将创建各种角色,并使用ctx.spawnAdapter来获取接收者的外部地址,并将其传递给producerActor。 接下来,我们将接收者参与者的地址传递给消费者。 现在,我们在使用者地址上调用GetAllMessages,该地址将从接收方获取消息并打印出来。

因此,总结一下将在此示例中执行的步骤:

  1. 我们创建一个将运行此方案的根角色。
  2. 从这个根基参与者,我们创建了三个参与者:接收者,消费者和生产者。
  3. 接下来,我们从接收方获取externalAddress(我们将类型为T的消息发送到的地址),并使用适配器将其传递给生产方。
  4. 生产者在接收到此消息后,将切换行为并开始将消息发送到传递的地址。
  5. 同时,根actor将接收方的地址传递给使用者。
  6. 使用者在收到此消息时,将更改行为并现在等待GetAllMessages类型的消息。
  7. 现在,根actor将发送GetAllMessages到使用者。
  8. 当使用者接收到此消息时,它将使用适配器将GetAll消息发送给接收者。 当适配器接收到响应时,它会打印出接收到的消息数量,并通过为接收者从接收到的每条消息发送一个PrintMessage来对使用者进行进一步处理。

这种情况的结果如下所示:

Scenario1: Started, now lets start up a number of child actors to do our stuff
Scenario1: Get all the messages
Consumer: Switching behavior
Consumer: requesting all messages
Producer: Switching behavior
Producer: Adding new 'Hello(hello @ 1446277162929)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163454)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163969)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 3 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277162929), Hello(hello @ 1446277163454), Hello(hello @ 1446277163969))Hello(hello @ 1446277162929)Hello(hello @ 1446277163454)Hello(hello @ 1446277163969)
Producer: Adding new 'Hello(hello @ 1446277164488)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277165008)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277165529)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166049)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166569)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277167089)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 6 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277164488), Hello(hello @ 1446277165008), Hello(hello @ 1446277165529), Hello(hello @ 1446277166049), Hello(hello @ 1446277166569), Hello(hello @ 1446277167089))Hello(hello @ 1446277164488)Hello(hello @ 1446277165008)Hello(hello @ 1446277165529)Hello(hello @ 1446277166049)Hello(hello @ 1446277166569)Hello(hello @ 1446277167089)
Producer: Adding new 'Hello(hello @ 1446277167607)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168129)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168650)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169169)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169690)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277170210)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277170729)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171249)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171769)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277172289)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 10 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277167607), Hello(hello @ 1446277168129), Hello(hello @ 1446277168650), Hello(hello @ 1446277169169), Hello(hello @ 1446277169690), Hello(hello @ 1446277170210), Hello(hello @ 1446277170729), Hello(hello @ 1446277171249), Hello(hello @ 1446277171769), Hello(hello @ 1446277172289))Hello(hello @ 1446277167607)Hello(hello @ 1446277168129)Hello(hello @ 1446277168650)Hello(hello @ 1446277169169)Hello(hello @ 1446277169690)Hello(hello @ 1446277170210)Hello(hello @ 1446277170729)Hello(hello @ 1446277171249)Hello(hello @ 1446277171769)Hello(hello @ 1446277172289)
Producer: Adding new 'Hello(hello @ 1446277172808)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173328)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173849)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277174369)' to receiver: Actor[akka://Root/user/receiver#1097367365]

酷吧! 从消息序列中可以看到,我们的生产者将消息发送到接收者,接收者将它们排队。 接下来,我们有一个使用者,它请求到目前为止已收到的所有消息并打印出来。

这是关于Akka-Typed的文章的内容,在下一篇文章中,我们将介绍同样在Akka-Typed中呈现的接待员模式。

翻译自: https://www.javacodegeeks.com/2015/11/akka-typed-actors-exploring-the-receiver-pattern.html

akka 消息发送接收

akka 消息发送接收_Akka型演员:探索接收器模式相关推荐

  1. Akka型演员:探索接收器模式

    在上一篇文章中,我们研究了Akka Typed提供的一些基本功能. 在本文和下一篇文章中,我们将更进一步地了解一些其他功能,并通过查看Akka Typed提供的两种不同模式来做到这一点:Receive ...

  2. RabbitMQ入门学习系列(三).消息发送接收

    快速阅读 用Rabitmq的队列管理,以及如何保证消息在队列中不丢失.通过ack的消息确认和持久化进行操作.以及Rabbit中如何用Web面板进行管理队列.消费者如何处理耗时的任务 生产者代码创建链接 ...

  3. html消息发送接收,在html页面中 如何应用mqtt协议发送/接收消息

    经过前面几篇文章的介绍,在很多场景下利用NodeMCU加持mqtt协议来控制几乎所有需要传感器监控的行业都能极大地简化物联的成本.在这样一个基础上,还能拓展出很多好玩的.实际运用的甚至能够作为商业化运 ...

  4. 深入理解ActiveMQ支持的2类消息发送接收模型queue和topic

    本文已经收录进专栏,谢谢支持.

  5. RabbitMQ消息队列(六):SpringBoot整合之通配符模式

    RabbitMQ消息队列(六):SpringBoot整合之通配符模式 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AeZQrNHS-1660220618697)(E: ...

  6. 使用Akka持久化——消息发送与接收

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/53929751 前言 在<使用Akka持久化 ...

  7. python 网络编程之Socket通信案例消息发送与接收

    背景 网络编程是python编程中的一项基本技术.本文将实现一个简单的Socket通信案例消息发送与接收 正文 在python中的socket编程的大致流程图如上所示 我们来首先编写客户端的代码: # ...

  8. go 实现 kafka 消息发送、接收

    引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...

  9. RabbitMQ消息发送和接收

    1.RabbitMQ的消息发送和接受机制 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列.生产者(producer)创建消息,然后发布到队列(queue)中, ...

最新文章

  1. apache 不执行PHP,显示代码
  2. lua require dofile loadfile区别
  3. 技术盘点:2022 年容器、Serverless、可观测、服务网格有哪些值得关注的趋势?
  4. windows 禁用ipv6服务_在 Windows 7 中禁用IPv6协议/IPv6隧道
  5. 项目中cxf和weblogic整合时报错的问题
  6. 移动云正式发布基于龙蜥 Anolis OS 的 BC-Linux V8.2 通用版操作系统
  7. Linux系统编程8:入门篇之简单明了说明如何在Linux中Git提交代码
  8. Mr.J--正则表达式
  9. Direct2D (37) : 使用不同画刷绘制文本
  10. 在日常维护管理中对MySQL 日志的需求
  11. Java和U3D比较,Unity热更方案 ILRuntime 和 toLua的比较
  12. 20172324 2018-2019-1《程序设计与数据结构》实验1报告
  13. 有幸和一位企业家聊天,他白手起家
  14. Linux 末路,Kubernetes 崛起!
  15. PythonDay7
  16. 41.Linux/Unix 系统编程手册(下) -- 共享库基础
  17. 空间回归分析笔记3——OLS、GWR输出结果的意义
  18. Apache Ranger KMS 部署文档
  19. (附源码)node.js物资管理系统 毕业设计 071130
  20. Java 类对象基础知识--科普

热门文章

  1. Loj#3077-「2019 集训队互测 Day 4」绝目编诗【结论,虚树,鸽笼原理】
  2. P5488-差分与前缀和【NTT,生成函数】
  3. 【2018.5.12】模拟赛之三-ssl2415 连通块【并查集】
  4. 【数学】奶牛编号(jzoj 2932)
  5. MongoDB SpringBoot ObjectId序列化json为String
  6. 【附答案】Java面试2019常考题目汇总(一)
  7. 在Eclipse中使用JUnit4进行单元测试(初级篇)
  8. publiccms中,怎么修改默认的端口8080以及默认上下文名称
  9. 《朝花夕拾》金句摘抄(一)
  10. sql server简单查询