版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/53929751

前言

在《使用Akka持久化——持久化与快照》一文中介绍了如何使用Akka持久化消息及生成快照。对于集群应用来说,发送者发出消息,只有当收到了接受者的成功回复才应当认为是一次完整的请求和应答(一些RPC框架只提供了远程调用、序列化/反序列化的机制,但是具体调用的成功与否实际是抛给了开发者本人),利用Akka的应答机制很容易实现这些功能。特殊情况下发送者发送了消息,但是最终接受者却没有接收到消息,导致这一情况发生的因素很多(例如:发送者调用完发送接口后,发送者所在进程奔溃了;网络故障;接收者不存在等)。如果这些消息的成功接收与处理对于整个应用而言具有强一致性的要求,那么这些都会导致很多困扰,好在我们可以使用Akka的持久化机制。

发送者在发送消息之前先对消息进行持久化,那么无论任何原因导致没有收到接收者的成功回复时,我们总能有办法从持久化信息中找出那些未被成功回复的消息进行重发(这说明接收者接到的消息有可能会重复,此时需要保证接收者的实现是冥等的)。当接收者收到消息进行处理后需要向发送者发送成功回复,发送者收到回复后的第一个动作应当是对回复内容的持久化,否则有可能在还未真正对成功回复处理时宕机或进程奔溃导致回复消息丢失(在持久化与收到消息之间仍然会存在宕机和进程奔溃的情况,只不过这个时间非常短,因此丢失回复的可能会很低),当持久化回复消息完成后,可以自己慢慢来处理这些确认信息,而不用担心它们丢失了。

本文将根据Akka官网的例子,对其做一些适应性改造后,向大家展示Akka持久化的另一个强大武器——At least once delivery!

消息投递规则

一般而言,消息投递有下面三种情况:

  • at-most-once 意味着每条应用了这种机制的消息会被投递0次或1次。可以说这条消息可能会丢失。
  • at-least-once 意味着每条应用了这种机制的消息潜在的存在多次投递尝试并保证至少会成功一次。就是说这条消息可能会重复但是不会丢失。
  • exactly-once 意味着每条应用了这种机制的消息只会向接收者准确的发送一次。换言之,这种消息既不会丢失也不会重复。
at-most-once的成本最低且性能最高,因为它在发送完消息后不会尝试去记录任何状态,然后这条消息将被他抛之脑后。at-least-once需要发送者必须认识它所发送过的消息,并对没有收到回复的消息进行发送重试。exactly-once的成本是三者中最高的而且性能却又是三者中最差的,它除了要求发送者有记忆和重试能力,还要求接收者能够认识接收过的消息并能过滤出那些重复的消息投递。Akka的Actor模型一般提供的消息都属于at-most-once,那是因为大多数场景都不需要有状态的消息投递,例如web服务器请求。当你有强一致性需求时,才应该启用Akka的at-least-once机制,那就是你的Actor不再继承自UntypedActor,而是继承自UntypedPersistentActorWithAtLeastOnceDelivery。

配置

如果要使用,那么需要在中增加以下的一些配置:
    at-least-once-delivery {redeliver-interval = 20000redelivery-burst-limit = 100}

redeliver-interval用于配置重新进行投递尝试的时间间隔,单位是毫秒。redelivery-burst-limit用于配置每次重新执行投递尝试时发送的最大消息条数。

一致性消息例子

我们首先来看看本例中用到的消息体MsgSent、Msg、Confirm及MsgConfirmed。MsgSent代表将要发送的消息,但是只用于持久化,持久化完成后会将MsgSent转换为Msg进行发送。也就是说Msg才会被真正用于消息发送。接收者收到Msg消息后将向发送者回复Confirm消息,需要注意的是Msg和Confirm都有属性deliveryId,此deliveryId由发送者的持久化功能生成,一条Msg消息和其对应的Confirm回复的deliveryId必须一致,否则在利用UntypedPersistentActorWithAtLeastOnceDelivery对回复消息进行确认时会产生严重的bug。发送者收到接收者的Confirm回复后首先将其转换为MsgConfirmed,然后对MsgConfirmed进行持久化,最后调用UntypedPersistentActorWithAtLeastOnceDelivery提供的confirmDelivery方法对回复进行确认。MsgSent、Msg、Confirm及MsgConfirmed的代码实现如下:

public interface Persistence {public static class Msg implements Serializable {private static final long serialVersionUID = 1L;public final long deliveryId;public final String s;public Msg(long deliveryId, String s) {this.deliveryId = deliveryId;this.s = s;}}public static class Confirm implements Serializable {private static final long serialVersionUID = 1L;public final long deliveryId;public Confirm(long deliveryId) {this.deliveryId = deliveryId;}}public static class MsgSent implements Serializable {private static final long serialVersionUID = 1L;public final String s;public MsgSent(String s) {this.s = s;}}public static class MsgConfirmed implements Serializable {private static final long serialVersionUID = 1L;public final long deliveryId;public MsgConfirmed(long deliveryId) {this.deliveryId = deliveryId;}}}

服务端

本例中的服务端非常简单,是一个接收处理Msg消息,并向发送者回复Confirm消息的Actor,代码如下:

@Named("MyDestination")
@Scope("prototype")
public class MyDestination extends UntypedActor {LoggingAdapter log = Logging.getLogger(getContext().system(), this);public void onReceive(Object message) throws Exception {if (message instanceof Msg) {Msg msg = (Msg) message;log.info("receive msg : " + msg.s + ", deliveryId : " + msg.deliveryId);getSender().tell(new Confirm(msg.deliveryId), getSelf());} else {unhandled(message);}}
}

服务端的启动代码如下:

        logger.info("Start myDestination");final ActorRef myDestination = actorSystem.actorOf(springExt.props("MyDestination"), "myDestination");logger.info("Started myDestination");

客户端

具体介绍客户端之前,先来列出其实现,代码如下:

@Named("MyPersistentActor")
@Scope("prototype")
public class MyPersistentActor extends UntypedPersistentActorWithAtLeastOnceDelivery {LoggingAdapter log = Logging.getLogger(getContext().system(), this);private final ActorSelection destination;@Overridepublic String persistenceId() {return "persistence-id";}public MyPersistentActor(ActorSelection destination) {this.destination = destination;}@Overridepublic void onReceiveCommand(Object message) {if (message instanceof String) {String s = (String) message;log.info("receive msg : " + s);persist(new MsgSent(s), new Procedure<MsgSent>() {public void apply(MsgSent evt) {updateState(evt);}});} else if (message instanceof Confirm) {Confirm confirm = (Confirm) message;log.info("receive confirm with deliveryId : " + confirm.deliveryId);persist(new MsgConfirmed(confirm.deliveryId), new Procedure<MsgConfirmed>() {public void apply(MsgConfirmed evt) {updateState(evt);}});} else if (message instanceof UnconfirmedWarning) {log.info("receive unconfirmed warning : " + message);// After a number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning message will be sent to self. The re-sending will still continue, but you can choose to call confirmDelivery to cancel the re-sending. List<UnconfirmedDelivery> list = ((UnconfirmedWarning) message).getUnconfirmedDeliveries();for (UnconfirmedDelivery unconfirmedDelivery : list) {Msg msg = (Msg) unconfirmedDelivery.getMessage();confirmDelivery(msg.deliveryId);}} else {unhandled(message);}}@Overridepublic void onReceiveRecover(Object event) {updateState(event);}void updateState(Object event) {if (event instanceof MsgSent) {final MsgSent evt = (MsgSent) event;deliver(destination, new Function<Long, Object>() {public Object apply(Long deliveryId) {return new Msg(deliveryId, evt.s);}});} else if (event instanceof MsgConfirmed) {final MsgConfirmed evt = (MsgConfirmed) event;confirmDelivery(evt.deliveryId);}}
}

正如我们之前所述——要使用at-least-once的能力,就必须继承UntypedPersistentActorWithAtLeastOnceDelivery。有关MsgSent、Msg、Confirm及MsgConfirmed等消息的处理过程已经介绍过,这里不再赘述。我们注意到onReceiveCommand方法还处理了一种名为UnconfirmedWarning的消息,这类消息将在at-least-once机制下进行无限或者一定数量的投递尝试后发送给当前Actor,这里的数量可以通过在at-least-once-delivery配置中增加配置项warn-after-number-of-unconfirmed-attempts来调整,例如:

    at-least-once-delivery {redeliver-interval = 20000redelivery-burst-limit = 100warn-after-number-of-unconfirmed-attempts = 6}

当你收到UnconfirmedWarning的消息时,说明已经超出了你期望的最大重试次数,此时可以做一些控制了,例如:对于这些消息发送报警、丢弃等。本例中选择了丢弃。

UntypedPersistentActorWithAtLeastOnceDelivery的状态由那些尚未被确认的消息和一个序列号组成。UntypedPersistentActorWithAtLeastOnceDelivery本身不会存储这些状态,依然需要你在调用deliver方法投递消息之前,调用persist方法持久化这些事件或消息,以便于当持久化Actor能够在恢复阶段恢复。在恢复阶段,deliver方法并不会将发出消息,此时持久化Actor一面恢复,一面只能等待接收回复。当恢复完成,deliver将发送那些被缓存的消息(除了收到回复,并调用confirmDelivery方法的消息)。

运行例子

本文将率先启动客户端并向服务端发送hello-1,hello-2,hello-3这三消息,但是由于服务端此时并未启动,所以客户端会不断重试,直到重试达到上限或者受到回复并确认。服务端发送消息的代码如下:

     logger.info("Start myPersistentActor");final String path = "akka.tcp://metadataAkkaSystem@127.0.0.1:2551/user/myDestination";final ActorSelection destination = actorSystem.actorSelection(path);final ActorRef myPersistentActor = actorSystem.actorOf(springExt.props("MyPersistentActor", destination), "myPersistentActor");actorMap.put("myPersistentActor", myPersistentActor);logger.info("Started myPersistentActor");myPersistentActor.tell("hello-1", null);myPersistentActor.tell("hello-2", null);myPersistentActor.tell("hello-3", null);

客户端发送三条消息后,日志中立马打印出了以下内容:

但是一直未受到回复信息,然后我们启动服务端,不一会就看到了以下日志输出:

我们再来看看客户端,发现已经收到了回复,内容如下:

总结

通过使用UntypedPersistentActorWithAtLeastOnceDelivery提供的persist、deliver及confirmDelivery等方法可以对整个应用的at-least-once需求,轻松实现在框架层面上一致的实现。

其它Akka应用的博文如下:

  1. 《Spring与Akka的集成》;
  2. 《使用Akka的远程调用》;
  3. 《使用Akka构建集群(一)》;
  4. 《使用Akka构建集群(二)》;
  5. 《使用Akka持久化——持久化与快照》;
  6. 《使用Akka持久化——消息发送与接收》;

后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。

京东:http://item.jd.com/11846120.html

当当:http://product.dangdang.com/23838168.html

使用Akka持久化——消息发送与接收相关推荐

  1. python 网络编程之Socket通信案例消息发送与接收

    背景 网络编程是python编程中的一项基本技术.本文将实现一个简单的Socket通信案例消息发送与接收 正文 在python中的socket编程的大致流程图如上所示 我们来首先编写客户端的代码: # ...

  2. go 实现 kafka 消息发送、接收

    引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...

  3. RabbitMQ消息发送和接收

    1.RabbitMQ的消息发送和接受机制 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列.生产者(producer)创建消息,然后发布到队列(queue)中, ...

  4. 消息发送和接收基本应用

    添加jar包依赖 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>roc ...

  5. JAVA ActiveMQ消息发送和接收

    JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. ...

  6. RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收

    文章目录: 1.写在前面 2.使用fanout交换机实现消息的发送和接收 2.1 编写消息接收类(有两个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费 ...

  7. 【转】DICOM医学图像处理:DIMSE消息发送与接收“大同小异”之DCMTK fo-dicom mDCM

    转自:https://my.oschina.net/zssure/blog/354816 背景: 从DICOM网络传输一文开始,相继介绍了C-ECHO.C-FIND.C-STORE.C-MOVE等DI ...

  8. linux ibm mq 安装,消息发送与接收

    下载地址 http://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqadv/ 安装 1.2 解压并安装 1.2 ...

  9. Golang实现Kafka消息发送、接收

    一:核心概念 kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序.具有横向扩展,容错,wicked fast(变态快)等优点. kafka中涉及的名词: 消息记录(r ...

最新文章

  1. Netty 如何做到单机秒级接收 35 万个对象?
  2. mac 安装 nginx
  3. 【c++】27.事件驱动、IO复用、sellect、poll、epoll三者的区别
  4. vue element项目常见实现表格内部可编辑功能
  5. 纷享销客完成新一轮数亿元融资,持续领跑中国CRM产业发展
  6. 利用Attribute扩展MVC的Title和Sitemap
  7. 使用ViewModel模式简化WPF TreeView
  8. 数据挖掘决策树python_机器学习之决策树ID3(python实现)
  9. c语言中局部变量存放在哪里,C语言全局变量存放在哪里?
  10. java毕业设计——基于java+Java Swing+sqlserver的图书馆书库管理系统设计与实现(毕业论文+程序源码)——图书馆书库管理系统
  11. Python自动化测试框架,谁才是你的唯一?
  12. 苹果蓝牙耳机怎么接电话_如何在开车时可以更安全的接电话——ROMAN R6000蓝牙耳机...
  13. 致敬科比,实现查询科比每赛季数据的Web服务器
  14. 配置完dcom需要重启计算机,DCOM电脑自动重启(win7系统一直反复重启)
  15. 分布式系统基础--CAP理论
  16. Luogu P3853 路标设置
  17. 【PHP注入01】PHP语言常见可注入函数(eval、assert、preg_replace、call_user_func、$a($b)等)
  18. Abbkine IFKine驴抗小鼠IgG二抗,绿色荧光标记方案
  19. Docker镜像加速器的配置
  20. C语言程序实现道格拉斯—普克算法(Douglas--Peucker)

热门文章

  1. 内省、JavaBean、PropertyDescriptor类、Introspector类、BeanUtils工具包、注解、Rentention、Target、注解的基本属性和高级属性...
  2. Qt5.6.0+OpenGL 纹理贴图首战告捷
  3. NSArray和NSString的联合使用
  4. [Cocos2d-html5]关于压缩
  5. [轉]VS2010 UML类图生成代码
  6. 解决PowerDesigner中Name与Code同步的问题
  7. SpringBoot集成MyBatis-Plus代码生成器(Dao)
  8. idea远程调试Java应用程序
  9. JAVA设计模式 - 单例模式
  10. PyQt5笔记(01) -- 创建空白窗体