9、Akka中邮箱(MailBox)
简介
Akka 的邮箱中保存着发给 Actor 的信息。通常,每个 Actor 都有自己的邮箱,但也有例外,如使用BalancingPool,则所有路由器(routees
)将共享一个邮箱实例。
邮箱选择
默认邮箱
未指定邮箱时,使用默认邮箱。默认情况下,它是一个无边界的邮箱,由java.util.concurrent.ConcurrentLinkedQueue
支持。
SingleConsumerOnlyUnboundedMailbox
是一个效率更高的邮箱,它可以用作默认邮箱,但不能与BalancingDispatcher
一起使用。
将SingleConsumerOnlyUnboundedMailbox
配置为默认邮箱:
akka.actor.default-mailbox {mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
Actor特定的邮箱
特定类型的actor可以用特定类型的邮箱,只要这个actor实现了参数化的接口RequiresMessageQueue
。这里是一个例子:
import akka.dispatch.BoundedMessageQueueSemantics;
import akka.dispatch.RequiresMessageQueue;public class MyBoundedActor extends MyActorimplements RequiresMessageQueue<BoundedMessageQueueSemantics> {}
RequiresMessageQueue
接口的类型参数需要映射到配置中的邮箱,如下所示:
bounded-mailbox {mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"mailbox-capacity = 1000
}akka.actor.mailbox.requirements {"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
现在,每次创建MyBoundedActor
类型的 Actor 时,它都会尝试获取一个有界邮箱。如果 Actor 在部署中配置了不同的邮箱,可以直接配置,也可以通过具有指定邮箱类型的调度器(dispatcher
)配置,那么这将覆盖此映射。
- 注释:为Actor创建的邮箱中的队列类型用接口中要求的类型进行检查,如果队列没有实现要求的类型,那么actor创建就会失败。
Dispatcher特定的邮箱
Dispatcher也需要一个邮箱类型,用于运行中的actor。一个例子就是BalancingDispatcher
,它需要一个并发的、线程安全的消息队列。这样的需求可以在分发器配置中进行规划,就像这样:
my-dispatcher {mailbox-requirement = org.example.MyInterface
}
给定的需求命名了一个类或者接口,必须保证这个类或者接口是消息队列实现的超类型。万一冲突了,例如:如果actor需要一个邮箱类型,但是它不满足这个需求,那么actor创建就会失败。
如何选择邮箱类型
创建 Actor 时,ActorRefProvider
首先确定将执行它的dispatcher
。然后按照如下顺序确定邮箱类型:
- 如果
actor
的部署配置部分包含一个mailbox
关键字,那么这个mailbox
关键字就指定了要使用的邮箱类型; - 如果
actor
的Props
包含mailbox选择—即调用了withMailbox
方法—那么这个方法指定要使用的邮箱类型; - 如果分发器的配置部分包含一个
mailbox-type
关键字,那么这部分也将被用于配置邮箱类型; - 如果actor需要上面描述的邮箱类型,那么这个需求的映射将被用于确定邮箱类型;如果失败了,那么分发器的需求-如果存在-将被会尝试;
- 如果分发器需要后面描述的邮箱类型,那么这个需求的映射将被用于确定邮箱类型;
- 将使用默认的邮箱akka.actor.default-mailbox。
哪些配置会传给Mailbox类型
每个邮箱类型都由一个扩展MailboxType
并接受两个构造函数参数的类实现:ActorSystem.Settings
对象和Config
部分。后面这个是通过actor系统的配置获取,用邮箱类型的配置路径覆盖它的id关键字,并添加一个默认邮箱配置的回调。
内置邮箱实现
Akka 附带了许多邮箱实现:
UnboundedMailbox
(默认)- 默认邮箱
- 由
java.util.concurrent.ConcurrentLinkedQueue
支持 - 是否阻塞:
No
- 是否有界:
No
- 配置名称:
unbounded
或akka.dispatch.UnboundedMailbox
SingleConsumerOnlyUnboundedMailbox
,此队列可能比默认队列快,也可能不比默认队列快,具体取决于你的用例,请确保正确地进行基准测试!- 由多个生产商单个使用者队列支持,不能与BalancingDispatcher一起使用
- 是否阻塞:
No
- 是否有界:
No
- 配置名称:
akka.dispatch.SingleConsumerOnlyUnboundedMailbox
NonBlockingBoundedMailbox
- 由一个非常高效的”多生产者,单消费者“队列支持
- 是否阻塞:
No
(将溢出的消息丢弃为deadLetters
) - 是否有界:
Yes
- 配置名称:
akka.dispatch.NonBlockingBoundedMailbox
UnboundedControlAwareMailbox
- 传递以更高优先级扩展
akka.dispatch.ControlMessage
的消息 - 由两个
java.util.concurrent.ConcurrentLinkedQueue
支持 - 是否阻塞:
No
- 是否有界:
No
- 配置名称:
akka.dispatch.UnboundedControlAwareMailbox
- 传递以更高优先级扩展
UnboundedPriorityMailbox
- 由
java.util.concurrent.PriorityBlockingQueue
支持 - 等优先级邮件的传递顺序未定义,与
UnboundedStablePriorityMailbox
相反 - 是否阻塞:
No
- 是否有界:
No
- 配置名称:
akka.dispatch.UnboundedPriorityMailbox
- 由
UnboundedStablePriorityMailbox
- 由包装在
akka.util.PriorityQueueStabilizer
中的java.util.concurrent.PriorityBlockingQueue
提供支持 - 对于优先级相同的消息保留
FIFO
顺序,与UnboundedPriorityMailbox
相反 - 是否阻塞:
No
- 是否有界:
No
- 配置名称:
akka.dispatch.UnboundedStablePriorityMailbox
其他有界邮箱实现,如果达到容量并配置了非零mailbox-push-timeout-time
超时时间,则会阻止发件人。特别地,以下邮箱只能与零mailbox-push-timeout-time
一起使用。
BoundedMailbox
- 由
java.util.concurrent.LinkedBlockingQueue
支持 - 是否阻塞:如果与非零
mailbox-push-timeout-time
一起使用,则为Yes
,否则为NO
- 是否有界:
Yes
- 配置名称:
bounded
或akka.dispatch.BoundedMailbox
- 由
BoundedPriorityMailbox
- 由包装在
akka.util.BoundedBlockingQueue
中的java.util.PriorityQueue
提供支持 - 优先级相同的邮件的传递顺序未定义,与
BoundedStablePriorityMailbox
相反 - 是否阻塞:如果与非零
mailbox-push-timeout-time
一起使用,则为Yes
,否则为NO
- 是否有界:
Yes
- 配置名称:
akka.dispatch.BoundedPriorityMailbox
- 由包装在
BoundedStablePriorityMailbox
- 由包装在
akka.util.PriorityQueueStabilizer
和akka.util.BoundedBlockingQueue
中的java.util.PriorityQueue
提供支持 - 对于优先级相同的消息保留
FIFO
顺序,与BoundedPriorityMailbox
相反 - 是否阻塞:如果与非零
mailbox-push-timeout-time
一起使用,则为Yes
,否则为NO
- 是否有界:
Yes
- 配置名称:
akka.dispatch.BoundedStablePriorityMailbox
- 由包装在
BoundedControlAwareMailbox
- 传递以更高优先级扩展
akka.dispatch.ControlMessage
的消息 - 由两个
java.util.concurrent.ConcurrentLinkedQueue
支持,如果达到容量,则在排队时阻塞 - 是否阻塞:如果与非零
mailbox-push-timeout-time
一起使用,则为Yes
,否则为NO
- 是否有界:
Yes
- 配置名称:
akka.dispatch.BoundedControlAwareMailbox
- 传递以更高优先级扩展
邮箱配置示例
PriorityMailbox
如何创建PriorityMailbox
:
static class MyPrioMailbox extends UnboundedStablePriorityMailbox {// needed for reflective instantiationpublic MyPrioMailbox(ActorSystem.Settings settings, Config config) {// Create a new PriorityGenerator, lower prio means more importantsuper(new PriorityGenerator() {@Overridepublic int gen(Object message) {if (message.equals("highpriority"))return 0; // 'highpriority messages should be treated first if possibleelse if (message.equals("lowpriority"))return 2; // 'lowpriority messages should be treated last if possibleelse if (message.equals(PoisonPill.getInstance()))return 3; // PoisonPill when no other leftelse return 1; // By default they go between high and low prio}});}
}
然后将其添加到配置中:
prio-dispatcher {mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"//Other dispatcher configuration goes here
}
下面是一个关于如何使用它的示例:
class Demo extends AbstractActor {LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);{for (Object msg :new Object[] {"lowpriority","lowpriority","highpriority","pigdog","pigdog2","pigdog3","highpriority",PoisonPill.getInstance()}) {getSelf().tell(msg, getSelf());}}@Overridepublic Receive createReceive() {return receiveBuilder().matchAny(message -> {log.info(message.toString());}).build();}
}// We create a new Actor that just prints out what it processes
ActorRef myActor =system.actorOf(Props.create(Demo.class, this).withDispatcher("prio-dispatcher"));/*
Logs:'highpriority'highpriority'pigdog'pigdog2'pigdog3'lowpriority'lowpriority
*/
也可以这样直接配置邮箱类型(这是顶级配置项):
prio-mailbox {mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"//Other mailbox configuration goes here
}akka.actor.deployment {/priomailboxactor {mailbox = prio-mailbox}
}
然后从这样的部署中使用它:
ActorRef myActor = system.actorOf(Props.create(MyActor.class), "priomailboxactor");
或者这样的代码:
ActorRef myActor = system.actorOf(Props.create(MyActor.class).withMailbox("prio-mailbox"));
ControlAwareMailbox
如果 Actor 需要立即接收控制消息,无论邮箱中已经有多少其他消息,ControlAwareMailbox
都非常有用。
可以这样配置:
control-aware-dispatcher {mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"//Other dispatcher configuration goes here
}
控制消息需要扩展ControlMessage
特性:
static class MyControlMessage implements ControlMessage {}
下面是一个关于如何使用它的示例:
class Demo extends AbstractActor {LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);{for (Object msg :new Object[] {"foo", "bar", new MyControlMessage(), PoisonPill.getInstance()}) {getSelf().tell(msg, getSelf());}}@Overridepublic Receive createReceive() {return receiveBuilder().matchAny(message -> {log.info(message.toString());}).build();}
}// We create a new Actor that just prints out what it processes
ActorRef myActor =system.actorOf(Props.create(Demo.class, this).withDispatcher("control-aware-dispatcher"));/*
Logs:'MyControlMessage'foo'bar
*/
创建自己的邮箱类型
示例如下:
// Marker interface used for mailbox requirements mapping
public interface MyUnboundedMessageQueueSemantics {}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Envelope;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import com.typesafe.config.Config;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Queue;
import scala.Option;public class MyUnboundedMailboximplements MailboxType, ProducesMessageQueue<MyUnboundedMailbox.MyMessageQueue> {// This is the MessageQueue implementationpublic static class MyMessageQueue implements MessageQueue, MyUnboundedMessageQueueSemantics {private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();// these must be implemented; queue used as examplepublic void enqueue(ActorRef receiver, Envelope handle) {queue.offer(handle);}public Envelope dequeue() {return queue.poll();}public int numberOfMessages() {return queue.size();}public boolean hasMessages() {return !queue.isEmpty();}public void cleanUp(ActorRef owner, MessageQueue deadLetters) {for (Envelope handle : queue) {deadLetters.enqueue(owner, handle);}}}// This constructor signature must exist, it will be called by Akkapublic MyUnboundedMailbox(ActorSystem.Settings settings, Config config) {// put your initialization code here}// The create method is called to create the MessageQueuepublic MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {return new MyMessageQueue();}
}
然后,将MailboxType
的 FQCN 指定为调度器配置或邮箱配置中mailbox-type
的值。
- 注释:请确保包含一个采用
akka.actor.ActorSystem.Settings
和com.typesafe.config.Config
参数的构造函数,因为此构造函数是通过反射调用来构造邮箱类型的。作为第二个参数传入的配置是配置中描述使用此邮箱类型的调度器或邮箱设置的部分;邮箱类型将为使用它的每个调度器或邮箱设置实例化一次。
你还可以使用邮箱作为调度器的要求(requirement
),如下所示:
custom-dispatcher {mailbox-requirement ="jdocs.dispatcher.MyUnboundedMessageQueueSemantics"
}akka.actor.mailbox.requirements {"jdocs.dispatcher.MyUnboundedMessageQueueSemantics" =custom-dispatcher-mailbox
}custom-dispatcher-mailbox {mailbox-type = "jdocs.dispatcher.MyUnboundedMailbox"
}
或者像这样定义 Actor 类的要求:
static class MySpecialActor extends AbstractActorimplements RequiresMessageQueue<MyUnboundedMessageQueueSemantics> {// ...
}
system.actorOf 的特殊语义
为了使system.actorOf
既同步又不阻塞,同时保持返回类型ActorRef
(以及返回的ref
完全起作用的语义),对这种情况进行了特殊处理。在幕后,构建了一种空的 Actor 引用,将其发送给系统的守护者 Actor,该 Actor 实际上创建了 Actor 及其上下文,并将其放入引用中。在这之前,发送到ActorRef
的消息将在本地排队,只有在交换真正的填充之后,它们才会被传输到真正的邮箱中。因此,
final Props props = ...
// this actor uses MyCustomMailbox, which is assumed to be a singleton
system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang", sender);
assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));
可能会失败;你必须留出一段时间通过并重试检查TestKit.awaitCond
。
关注公众号 数据工匠记
,专注于大数据领域离线、实时技术干货定期分享!个人网站 www.lllpan.top
9、Akka中邮箱(MailBox)相关推荐
- akka一些邮箱的实现
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)extends SystemMessageQueue with ...
- 邮箱-Mailbox
Actor之间发送消息,并不是直接通过API的方式直接实现,而是依赖于自己的邮箱(mailbox).这就好比在旧社会中,大家进行信件交流,发件人把自己的信件放到自己的邮箱中,送件人每天早上从你的邮箱中 ...
- (30)System Verilog进程间同步(邮箱mailbox)
(30)System Verilog进程间同步(邮箱mailbox) 1.1 目录 1)目录 2)FPGA简介 3)System Verilog简介 4)System Verilog进程间同步(邮箱m ...
- Akka系列(二):Akka中的Actor系统
Actor模型作为Akka中最核心的概念,所以Actor在Akka中的组织结构也至关重要,本文主要介绍Akka中Actor系统. Actor系统 Actor作为一种封装状态和行为的对象,总是需要一个系 ...
- akka es/cqrs_在Akka中实现主从/网格计算模式
akka es/cqrs 主从模式是容错和并行计算的主要示例. 模式背后的想法是将工作划分为相同的子任务,然后将其委派给从属. 这些从属节点或实例将处理工作任务,并将结果发送回主节点. 然后主节点将编 ...
- 在Akka中实现主从/网格计算模式
主从模式是容错和并行计算的主要示例. 模式背后的想法是将工作划分为相同的子任务,然后将其委派给从属. 这些从节点或实例将处理工作任务,并将结果发送回主节点. 然后主节点将编译从所有从节点接收到的结果. ...
- jquery中邮箱地址 URL网站地址正则验证实例代码
jquery中邮箱地址 URL网站地址正则验证实例代码 QQ网站有一个网站举报的功能,看了一些js代码觉得写得很不错,我就拿下来了,下面是一个email验证与url网址验证js代码,分享给大家 ema ...
- Java中邮箱的相关使用
Java中邮箱的相关使用 1 Java中邮箱的简介 2 邮箱的使用 1 Java原生操作邮箱 1 导入maven坐标 2 添加邮件工具类 2 SpringBoot中操作邮箱 0 准备一个好的Sprin ...
- 批量验证邮箱地址php代码,php结合正则批量抓取网页中邮箱地址
php如何抓取网页中邮箱地址,下面我就给大家分享一个用php抓取网页中电子邮箱的实例. $url='http://www.jb51.net'; //要采集的网址 $content=file_get_c ...
最新文章
- onnxruntime安装
- (51)SSDT HOOK 实现进程保护
- python 网络编程 套接字的初使用 基于TCP协议的socket
- php 获得当月时间戳,php获取当前月与上个月月初及月末时间戳的方法
- c语言十佳运动员有奖评选系统_2019年沃德十佳内饰解读
- 设计模式笔记——Bridge
- java中怎么判断相等_Java中判断相等 (== 与 .equals())
- 为了减少接口的响应时间,有哪些优化措施?(可以从架构、代码等各个角度谈)?
- eclipse修改工作的目录顺序
- 前端每日实战:73# 视频演示如何用纯 CSS 创作一只卡通狐狸
- 使用Kotlin的Android ListView
- c++ static 关键字总结
- CATIA软件有限元分析功能详解及使用教程
- Python连接MySQL数据库
- 网络统考计算机操作题分数占比,计算机一级office考试 word占多少分值?
- pg8168改mac命令_Realtek 8168网卡改MAC地址教程
- 玩转基因组浏览器之初识IGV
- tensorflow学习 矩阵乘法和元素乘法
- pdf中矢量图提取出来,插入visio 或者 word,保持矢量图特性,十分清晰;
- BAT超强IOS面试题116道,助你拿到高薪offer