RabbitMQ遵循了AMQP规范,用消息确认机制来保证:只要消息发送,就能确保被消费者消费来做到了消息最终一致性。而且开源,文档还异常丰富,貌似是实现分布式事务的良好载体

6.1 RabbitMQ消息确认机制

rabbitmq的整个发送过程如下

1. 生产者发送消息到消息服务
2. 如果消息落地持久化完成,则返回一个标志给生产者。生产者拿到这个确认后,才能放心的说消息终于成功发到消息服务了。否则进入异常处理流程。
 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (!ack) {//try to resend msg} else {//delete msg in db}});
3. 消息服务将消息发送给消费者
4. 消费者接受并处理消息,如果处理成功则手动确认。当消息服务拿到这个确认后,才放心的说终于消费完成了。否则重发,或者进入异常处理。
final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {//确认收到消息channel.basicAck(envelope.getDeliveryTag(), false);}}};

6.2 异常

我们来看看可能发送异常的四种

1. 直接无法到达消息服务

网络断了,抛出异常,业务直接回滚即可。如果出现connection closed错误,直接增加 connection数即可

connectionFactory.setChannelCacheSize(100);
2. 消息已经到达服务器,但返回的时候出现异常

rabbitmq提供了确认ack机制,可以用来确认消息是否有返回。因此我们可以在发送前在db中(内存或关系型数据库)先存一下消息,如果ack异常则进行重发

 /**confirmcallback用来确认消息是否有送达消息队列*/     rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (!ack) {//try to resend msg} else {//delete msg in db}});/**若消息找不到对应的Exchange会先触发returncallback */rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {try {Thread.sleep(Constants.ONE_SECOND);} catch (InterruptedException e) {e.printStackTrace();}log.info("send message failed: " + replyCode + " " + replyText);rabbitTemplate.send(message);});
3. 消息送达后,消息服务自己挂了

如果设置了消息持久化,那么ack= true是在消息持久化完成后,就是存到硬盘上之后再发送的,确保消息已经存在硬盘上,万一消息服务挂了,消息服务恢复是能够再重发消息

4. 未送达消费者

消息服务收到消息后,消息会处于"UNACK"的状态,直到客户端确认消息

channel.basicQos(1); // accept only one unack-ed message at a time (see below)final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {//确认收到消息channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
5. 确认消息丢失

消息返回时假设确认消息丢失了,那么消息服务会重发消息。注意,如果你设置了autoAck= false,但又没应答channel.baskAck也没有应答channel.baskNack,那么会导致非常严重的错误:消息队列会被堵塞住,所以,无论如何都必须应答

6. 消费者业务处理异常

消息监听接受消息并处理,假设抛异常了,第一阶段事物已经完成,如果要配置回滚则过于麻烦,即使做事务补偿也可能事务补偿失效的情况,所以这里可以做一个重复执行,比如guavaretry,设置一个指数时间来循环执行,如果n次后依然失败,发邮件、短信,用人肉来兜底。

转载于:https://blog.51cto.com/4925054/2096781

消息最终一致性解决方案之RabbitMQ实现相关推荐

  1. 消费消息删除_【进阶之路】可靠消息最终一致性解决方案

    导言 大家好,我是南橘,从接触java到现在也有差不多两年时间了,两年时间,从一名连java有几种数据结构都不懂超级小白,到现在懂了一点点的进阶小白,学到了不少的东西.知识越分享越值钱,我这段时间总结 ...

  2. rocket mq 监听端口_MQ消息最终一致性解决方案

    随着分布式服务架构的流行与普及,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用.虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务问题,多个服务之间使 ...

  3. 我说分布式事务之消息最终一致性事务(一):原理及实现

    来源:https://0x9.me/YgaUc 在之前的文章中,我们已经学习总结了分布式事务的两种解决方案. 我说分布式事务之TCC 我说分布式事务之最大努力通知型事务 本文我们将学习到另一种常见的柔 ...

  4. 分布式事务——消息最终一致性方案

    前言 分布式事务一直是服务化拆分后一个绕不开的话题,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用.虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务 ...

  5. 《深入理解分布式事务》第九章 可靠消息最终一致性分布式事务原理

    <深入理解分布式事务>第九章 可靠消息最终一致性分布式事务原理 文章目录 <深入理解分布式事务>第九章 可靠消息最终一致性分布式事务原理 一.基本原理 二.本地消息表 1.实现 ...

  6. (微服务)分布式事务-最大努力交付 消息最终一致性方案

    小插曲 本话题已收入视频讲座<Spring Cloud分布式事务解决方案>大家不妨围观下.开源项目: CoolMQ,项目支持网站: http://rabbitmq.org.cn,最新文章或 ...

  7. 我说分布式事务之消息最终一致性事务(二):RocketMQ的实现

    来源:https://0x9.me/A76YN 号外:最近整理了一下以前编写的一系列Spring Boot内容,整了个<Spring Boot基础教程>的PDF,关注我,回复:001,快来 ...

  8. 可靠消息最终一致性设计_如何最终启动您的设计产品组合

    可靠消息最终一致性设计 It's not a secret that most designers procrastinate on their portfolios whether it is to ...

  9. .Net Core with 微服务 - 可靠消息最终一致性分布式事务

    前面我们讲了分布式事务的2PC.3PCTCC 的原理.这些事务其实都在尽力的模拟数据库的事务,我们可以简单的认为他们是一个同步行的事务.特别是 2PC,3PC 他们完全利用数据库的事务能力,在一阶段开 ...

最新文章

  1. 关于ListView中adapter调用notifyDataSetChanged无效的原因
  2. 基带信号传输之信道均衡
  3. 还在为DST模型刷不动而感到苦恼吗?来试试无监督DST吧,DSI等你来战!
  4. 不支持某些浏览器_水狐:一个支持旧版扩展的火狐复刻版
  5. python的自定义异常类,带参Exception,多个except,断言语句,断点,try...except,try...except...else,try...except...finally处理
  6. 趋势科技企业级杀软产品俩 0day 已遭利用
  7. 蓝桥杯 ALGO-114 算法训练 黑白无常
  8. Mac上emacs gpg: 无法检查签名:没有公钥
  9. arm微软服务器,ARM扬帆 借力微软 杀进服务器市场
  10. php自定义类生成lib,thinkphp引入自定义封装类
  11. Tampermonkey油猴插件
  12. 【读书笔记】浪潮之巅——方法论篇
  13. 结合个人规划对物联网(IOT)的一点思考
  14. thon中的全局变量
  15. nginx: [error] CreateFile() “D:\nginx-1.20.1/logs/nginx.pid“ failed (2: The system cannot find the
  16. Vue----.stop、.prevent、.capture、.self用法以及.stop和.self的区别
  17. SQLSERVER Agent服务无法启动
  18. 基于Verilog HDL的数字秒表
  19. 关于Vue的再次试炼
  20. windows11便签在哪里,win11怎么新建便签

热门文章

  1. Android View 的scrollTo 和 scrollBy方法
  2. java基础的正则表达式
  3. MySQL的隔离级别
  4. 震惊!!!CSS垂直居中竟然有这么多方法~
  5. 王校长一分钟能吃多少热狗?| 小游戏
  6. 西普实验吧-ctf-web-1
  7. Lichee(两) 在sun4i_crane该平台下编译
  8. GNOME下也是Alt+F2,输入gnome-terminal
  9. 关于字符串和字符数组的再讨论
  10. JAVA的类名.this