邮箱-Mailbox
Actor之间发送消息,并不是直接通过API的方式直接实现,而是依赖于自己的邮箱(mailbox)。这就好比在旧社会中,大家进行信件交流,发件人把自己的信件放到自己的邮箱中,送件人每天早上从你的邮箱中取出信件,在这个过程中,邮箱就起到了中间存储作用,所有的信件都会放在里面。在Actor系统中,邮箱是一个队列结构,默认遵循FIFO,可以根据我们的需要自定义。
Actor可以接受其它多个Actor的消息,默认的消息顺序没有规律,无法保证某个消息一定排在另外一个消息的前面,但是对于同一个Actor发送的多个消息,一定是按照先后顺序接受。例如:actorA给ActorB发送message1和message2,那么ActorB接受消息的顺序一定是message1->message2,否则,执行任务岂不是乱套了。
邮箱的分类
在Akka中,邮箱主要分为两种类型:无界和有界。
类型 |
描述 |
无界(Unbounded) |
邮箱的容量没有上限,大部分无界邮箱都是无阻塞的。也就是说当消息的数量过于庞大时,也不会阻塞消息处理。 |
有界(Bounded) |
邮箱的容量有限制,当消息数量达到邮箱数量时,会根据mailbox-push-timeout-time(消息入队超时时间)的配置进行阻塞(或非阻塞),负数则表示无限超时(慎用)。 |
邮箱的配置
Akka系统中,提供了邮箱配置项,我们可以根据自己的需要进行配置,系统给我们提供了默认配置,例如:
akka.actor.default-mailbox{mailbox-type="akka.dispatch.UnboundedMailbox"mailbox-capacity=1000mailbox-push-timeout-time=10s
}
参数分析:
mailbox-type:邮箱类型,默认采用无界邮箱UnboundedMailbox,表示邮箱队列大小不受限制。
mailbox-capacity:邮箱容量,定义有界邮箱(BoundedMail)的大小,该值只能为正数。
mailbox-push-timeout-time:入队超时时间,也就是消息进入队列的超时时限。
UnboundedMailbox主要依赖java.util.concurrent.ConcurrentLinkedQueue队列来实现,它是一个基于链表的队列结构,遵循FIFO,该队列结构采用CAS无锁算法保证多线程的安全,性能更佳(大家都懂得,锁会导致系统性能和吞吐量的下降,这里就不做过多介绍了)。
优先级邮箱
在实际的项目中,根据业务逻辑的需要,消息处理可能存在一定的优先级,那么我们就需要使用PriorityMailbox。下面我们就来看看优先级邮箱怎样使用:
目标:历史美女按照你喜欢的规则输出
第一步:extends PriorityMailbox,创建自己规则的优先级邮箱
public class PriorityMailbox extends UnboundedStablePriorityMailbox {public PriorityMailbox(ActorSystem.Settings settings, Config config) {super(new PriorityGenerator() {//返回值越小,优先级越高@Overridepublic int gen(Object message) {if ("貂蝉".equals(message)) {return 0;} else if ("西施".equals(message)) {return 1;} else if ("大乔".equals(message)) {return 2;} else {return 3;}}});}public static void main(String[] args) {ActorSystem system = ActorSystem.create("system", ConfigFactory.load("mailbox"));ActorRef priorityActor = system.actorOf(Props.create(PriorityMailboxActor.class).withMailbox("priority-mailbox"), "priorityActor");String[] messages={"李四","大乔","貂蝉","西施"};for(String message:messages){priorityActor.tell(message,ActorRef.noSender());}}
}class PriorityMailboxActor extends AbstractActor {@Overridepublic Receive createReceive() {return receiveBuilder().matchAny(System.out::println).build();}
}
在该构造方法中,我们new了一个ProrityGenerator对象,并重写gen方法,该方法就是我们定义自己优先级规则的地方,返回值越小,优先级越高。
第二步:配置邮箱,如下:
mailbox.conf
priority-mailbox{mailbox-type="com.release.util.akka.mailbox.PriorityMailbox"
}
第三步:让Actor使用withMailbox关联该邮箱类型
ActorSystem system = ActorSystem.create("system", ConfigFactory.load("mailbox"));ActorRef priorityActor = system.actorOf(Props.create(PriorityMailboxActor.class).withMailbox("priority-mailbox"), "priorityActor");String[] messages={"李四","大乔","貂蝉","西施"};for(String message:messages){priorityActor.tell(message,ActorRef.noSender());
}
结果:
貂蝉
西施
大乔
李四
通过输出结果,大家应该看到我们的优先级配置已经起了作用。
控制消息
在大部分的需求中,优先级邮箱已经可以满足,但是当我们需要某个消息每次都拥有最高的优先级时,就需要额外的邮箱支持了。Akka提供了ControAwareMailbox邮箱类型,可以让实现了ControlMessage接口的消息拥有最高优先级。
第一步:定义消息Message,实现ControlMessage
public class ControlMessageBean implements ControlMessage {private final String msg;public ControlMessageBean(String msg) {this.msg = msg;}@Overridepublic String toString() {return this.msg;}
}
第二步:配置ControAwareMailbox邮箱
control-mailbox{mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
}
第三步:通过关联withMailBox方式关联该邮箱
public class ControlMessageActor extends AbstractActor {@Overridepublic Receive createReceive() {return receiveBuilder().matchAny(System.out::println).build();}public static void main(String[] args) {ActorSystem system = ActorSystem.create("system", ConfigFactory.load("mailbox"));ActorRef controlActor = system.actorOf(Props.create(ControlMessageActor.class).withMailbox("control-mailbox"), "controlActor");Object[] messages={"张三","李四",new ControlMessageBean("皇上"),new ControlMessageBean("皇后"),"王五"};for(Object message:messages){controlActor.tell(message,ActorRef.noSender());}}
}
输出结果:
皇上
皇后
张三
李四
王五
大家可以发现,ControlMessageBean定义的消息总会被先处理,说明控制消息已经起到作用,实现ControlMessage 消息,在ControAwareMailbox邮箱中拥有最高优先级。
邮箱使用方式
在上述示例中,我们都是通过withMailbox代码方式来关联邮箱类型,其实,邮箱使用还有许多其它的方式,例如:
- 直接配置Actor邮箱
- 将邮箱配置到dispatcher中,然后让Actor关联该dispatcher
直接配置Actor邮箱
我们可以在配置文件中,指定某个Actor的邮箱类型,如下:
akka.actor.deployment{/priorityActor{mailbox=priority-mailbox}
}
上述配置中,priorityActor是创建Actor所指定的名称,,mailbox是我们自定义的优先级邮箱名称。
配置dispatcher邮箱
在消息调度时,dispatcher会依赖Actor邮箱,所以我们可以把邮箱配置dispatcher上,然后让Actor使用该dispatcher即可。
priority-dispatcher{type=Dispatchermailbox-type="com.release.util.akka.mailbox.PriorityMailbox"
}
Actor使用该dispatcher,如下:
ActorRef configActor = system.actorOf(Props.create(PriorityActor.class).withDispatcher("priority-dispatcher"), "priorityActor");
RequiresMessageQueue接口
除了上述方法给某个Actor指定邮箱外,我们还可以让Actor实现RequiresMessageQueue接口,让Actor自动拥有特定类型的邮箱。该接口提供泛型,我们可以给定某个邮箱的语义。关于语义和邮箱的关系,我们可以从akka包下找到reference.conf文件,里面定义了相关映射,如下:
我们根据里面的配置,总结如下表格:
语义接口 |
邮箱类型 |
akka.dispatch.UnboundedMessageQueueSemantics |
akka.dispatch.UnboundedMailbox |
akka.dispatch.BoundedMessageQueueSemantics |
akka.dispatch.BoundedMailbox |
akka.dispatch.DequeBasedMessageQueueSemantics |
akka.dispatch.UnboundedDequeBasedMailbox |
akka.dispatch.UnboundedDequeBasedMessageQueueSemantics |
akka.dispatch.UnboundedDequeBasedMailbox |
akka.dispatch.BoundedDequeBasedMessageQueueSemantics |
akka.dispatch.BoundedDequeBasedMailbox |
akka.dispatch.MultipleConsumerSemantics |
akka.dispatch.UnboundedMailbox |
akka.dispatch.ControlAwareMessageQueueSemantics |
akka.dispatch.UnboundedControlAwareMailbox |
akka.dispatch.UnboundedControlAwareMessageQueueSemantics |
akka.dispatch.UnboundedControlAwareMailbox |
akka.dispatch.BoundedControlAwareMessageQueueSemantics |
akka.dispatch.BoundedControlAwareMailbox |
akka.event.LoggerMessageQueueSemantics |
akka.event.LoggerMailboxType |
如果我们现在想要实现某一类消息优先级发送,需求变得非常简单,示例如下:
public class RequiresMsgActor extends AbstractActor implements RequiresMessageQueue<UnboundedControlAwareMessageQueueSemantics> {@Overridepublic Receive createReceive() {return receiveBuilder().matchAny(System.out::println).build();}
}
如果我们给RequiresMsgActor发送消息,实现ControlMessage的消息将被优先处理。
自定义邮箱
或许,在实际项目中,大家需要做一些业务化的需求,内置邮箱不能满足业务的需要,这时,我们就可以自定义邮箱。
自定义邮箱过程:首先定义一个邮箱队列(实现MessageQueue接口),然后定义邮箱类型并指定邮箱队列,最后配置邮箱给Actor。下面我们详细说一下整个流程。
第一步:定义MailBoxQueue类,实现MessageQueue接口:
public class MailBoxQueue implements MessageQueue {/*** 自定义邮箱队列*/private Queue<Envelope> queue = new ConcurrentLinkedQueue<>();/*** 投递消息,消息入队*/@Overridepublic void enqueue(ActorRef actorRef, Envelope envelope) {queue.offer(envelope);}/*** 取出消息,消息出队* @return Envelope*/@Overridepublic Envelope dequeue() {return queue.poll();}/*** 返回当前消息数量* @return int*/@Overridepublic int numberOfMessages() {return queue.size();}/*** 当前是否存在消息* @return true or false*/@Overridepublic boolean hasMessages() {return !queue.isEmpty();}/*** 消息传递过程中,Actor出现故障,投递的消息将进入到死信队列* @param actorRef actor引用* @param deadLetters 死信队列*/@Overridepublic void cleanUp(ActorRef actorRef, MessageQueue deadLetters) {for (Envelope e : queue) {deadLetters.enqueue(actorRef, e);}}
}
第二步:定义邮箱类型,实现MailboxType接口并且在create方法中指定自定义的MailBoxQueue队列。
public class MailBoxType implements MailboxType, ProducesMessageQueue<MailBoxQueue> {/*** 定义邮箱类型必须给出一个公开的,带有Settings、Config这两个类型的构造函数* 否则会抛出java.lang.NoSuchMethodException: com.release.util.akka.mailbox.MailBoxType.<init>(akka.actor.ActorSystem$Settings, com.typesafe.config.Config)** @param settings* @param config*/public MailBoxType(ActorSystem.Settings settings, Config config) {}@Overridepublic MessageQueue create(Option<ActorRef> option, Option<ActorSystem> option1) {return new MailBoxQueue();}
}
第三步:配置邮箱类型
mySelf-mailbox{mailbox-type= "com.release.util.akka.mailbox.MailBoxType"
}
现在我们就可以把该类型的邮箱指定给我们的Actor使用了,当然,在实际项目中,我们的业务可能复杂多样,并不像这里简单的定义邮箱类型,不过流程都是一样的,只是说对消息的入队、出队等处理方式复杂化。大家可以按照上述使用邮箱的方式,进行测试。
总结
邮箱分为无界和有界两种,前者表示无容量限制,后者表示有容量限制,用来存储消息。Actor都拥有一个邮箱,如果我们没有给Actor配置邮箱,将采用无界默认邮箱。Actor系统已经内置了很多邮箱类型,如果在项目中,这些邮箱不能满足需要,我们可以通过实行MailboxType和MessageQueue自定义邮箱。
邮箱-Mailbox相关推荐
- (30)System Verilog进程间同步(邮箱mailbox)
(30)System Verilog进程间同步(邮箱mailbox) 1.1 目录 1)目录 2)FPGA简介 3)System Verilog简介 4)System Verilog进程间同步(邮箱m ...
- Java中Person类型赋值_Java设计:定义一个Person类和它的子类Employee。Person类有姓名、地址、电话号码和电子邮箱,...
匿名用户 1级 2017-04-26 回答 Person类: public class Person { private String address; private String name; pr ...
- 翻译:AKKA笔记 - Actor消息 -1(二)
消息 我们只是让QuoteRequest到ActorRef去但是我们根本没见过消息类! 它是这样的: (一个最佳实践是把你的消息类包装在一个完整的对象里以利于更好的组织) TeacherProtoco ...
- 拿下计网协议后,我就是公园里最靓的仔
下面我们就要对不同的协议层进行分类介绍了,我们还是采用自上而下的方式来介绍,这种介绍对读者来说更容易接纳,吸收程度更好. 一般情况下,用户不太在意网络应用程序实际上是按照怎样的机制运行的,但我们是程序 ...
- exchange之2003迁移至2007
我的实验环境是这样的:刚开始,我的域控制器操作系统和exchange2003操作系统都是windows2003,在迁移过程中提示:"想要从exchange2003迁移到exchange200 ...
- java actor akka_Actor 模型及Akka简介
Actor 模型 Actor 的基础就是消息传递,一个 Actor 可以认为是一个基本的计算单元,它能接收消息并基于其执行运算,它也可以发送消息给其他 Actor.Actors 之间相互隔离,它们之间 ...
- 几种嵌入式RTOS的分析与比较
几种嵌入式RTOS的分析与比较 http://tech.ddvip.com 2008年07月10日 社区交流 Sailor_forever sailing_9806@163.com 整理 ...
- 底层框架_你有必要了解一下Flink底层RPC使用的框架和原理
1. 前言 对于Flink中各个组件(JobMaster.TaskManager.Dispatcher等),其底层RPC框架基于Akka实现,本文着重分析Flink中的Rpc框架实现机制及梳理其通信流 ...
- 邮件服务器在企业网中的应用
简介: 电子邮件是因特网上最为流行的应用之一.如同邮递员分发投递传统邮件一样,电子邮件也是异步的,也就是说人们是在方便的时候发送和阅读邮件的,无须预先与别人协同.与传统邮件不同的是,电子邮件既迅速,又 ...
最新文章
- boost::system::errc相关的测试程序
- mongodb添加创建修改时间_mongodb副本集生产环境下部署案例,推荐一个主两个从三台机器...
- primefaces_使用PrimeFaces开发数据导出实用程序
- 在一个数组中,如何确定所需元素在数组中的位置.
- springboot导入项目依赖报错_最详细的 Spring Boot 多模块开发与排坑指南
- Scrapy-Link Extractors(链接提取器)
- php改变iframe的src,js动态改变iframe的src属性
- 亲测无限坐席在线客服系统源码
- 完美使用application cache几点心得
- ElasticSearch入门 第五篇:使用C#查询文档
- R之data.table速查手册
- 酷派删除android系统软件,Coolpad酷派8720L哪些系统软件可以删除(精简列表)
- java 怎么使用 设计模式对业务进行解耦(一)
- 运放放大倍数计算公式_运算放大器基本电路大全(转)
- 财税!2020个人银行账户进账多少会被查?
- 快手极速版-青龙羊毛
- 接口测试[PostMan]
- archlinux 安装matlab
- EIP-3523:半同质代币介绍
- Java选择题(十八)