分布式事务要解决的问题是保证二个数据库数据的一致性,本地事务ACID属于刚性事务,基于CAP理论,分布式事务的核心要点柔性事务,最终一致性。

基于rabbitmq解决分布式事务要点如下

  • 生产者采用发送方确认机制加上mandatory参数或者备份交换机,保证消息被正确地到达队列中。
  • 队列、交换机、消息都需要持久化(可以考虑镜像队列机制,如果业务不是那么重要,比如短信邮件通知)。
  • 消费端采用手动应答的方式,确认消息已经被正确消费。

rabbitmq配置

spring:application:name: rabbitmq-servicerabbitmq:addresses: 192.168.137.128:5672,192.168.137.129:5672username: rootpassword: rootvirtual-host: transactionpublisher-confirms: true #开启发送端确认机制publisher-returns: true # 开启returnstemplate:mandatory: true # 交换机没有匹配的队列时,会将消息返回给生产者,避免消息丢失listener:simple:acknowledge-mode: manualretry:enabled: truemax-attempts: 5initial-interval: 3000server:port: 8400eureka:client:serviceUrl:defaultZone: http://localhost:8100/eureka/instance:prefer-ip-address: trueinstance-id: ${spring.cloud.client.ip-address}:${server.port}mq:common:exchange: common.exchangequeue: common.queueroutingkey: common.routing

代码案例是下单减库存

生产者代码

package com.liwen.mqservice.producer;import com.liwen.entity.Order;
import com.liwen.feign.IOrderServiceFeign;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MQOrderProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Value("${mq.common.exchange}")private String exchangeName;@Value("${mq.common.routingkey}")private String routingKey;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate IOrderServiceFeign orderServiceFeign;public void send(JSONObject json){rabbitTemplate.setConfirmCallback(this);//发送到确认机制rabbitTemplate.setReturnCallback(this);//消息Return回调Order order= (Order) JSONObject.toBean(json.getJSONObject("order"), Order.class);orderServiceFeign.saveOrder(order);String orderId = order.getOrderId();String goodId = order.getGoodId();json.put("orderId", orderId);json.put("goodId", goodId);CorrelationData correlationData = new CorrelationData(orderId);rabbitTemplate.convertAndSend(exchangeName,routingKey, json.toString(), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties messageProperties = message.getMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置消息持久化return message;}}, correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){/**   处理消息没有到达交换机,数据丢失的情况*   根据订单号查询到订单数据,并将数据保存到异常消息表中,定时补发,并报警人工处理* */String orderId = correlationData.getId();}else{//查询订单号是否在异常消息表,在的话要删除log.info(">>>下单消息发送成功{}<<<",correlationData);}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {//消息到达交换机,没有路由到队列,根据订单号查询到订单数据,并将数据保存到异常消息表中,定时补发,并报警人工处理/**  1 交换机没有绑定队列*  2 交换机根据路由键没有匹配到队列*  3 队列消息已满* */byte[] body = message.getBody();JSONObject json = JSONObject.fromObject(new String(body));System.out.println("return============================");System.out.println(message);}
}

对应生产者发送消息到rabbitmq有以下几种情况:

1 没有发送到交换机的数据,会回调public void confirm(CorrelationData correlationData, boolean ack, String cause) 方法(ack为false),我们可以在发送消息时把业务参数比如订单号设置到correlationData参数中,回调时把相关消息保存到异常消息表,采用定时任务兜底,并报警通知相关人员。

2 对于发送方确认机制,只能保证消息到达rabbitmq的交换机(ack为true),如果此交换机没有匹配的队 列,那么消息会丢失,所以需要结合mandatory(设置为true),当交换机没有匹配的队列时,会回调 public void returnedMessage(Message message, int i, String s, String s1, String s2)将消息返回给生产者,我们保存消息body部分,采用定时任务兜底,并报警通知相关人员。

2 交换机、队列、消息(重要的数据比如支付数据)需要持久化,避免消息在rabbitmq重启、宕机等异常情况下造成消息丢失。

3 对应消费者,需要手动应答,明确告诉rabbitmq服务器,已经正确消费了消息,然后rabbitmq服务器会删除已经被正确消费的消息。如果rabbitmq没有收到应答消息(比如消费者处理超时或者网络不好),rabbitmq会间隔的讲消息重新发给消费者消费,这是消费者需要根据消息id或者全局唯一业务字段做好幂等处理。如果是消费者代码有问题需要重新发布版本解决。兜底方案依然是保存消息到异常消息表,定时处理并报警通知相关人员处理。

总结:基于rabbitmq解决分布式事务,核心是最终一致性,比如电商下单减库存、外卖下单派单等场景,核心是最终一致性。经过一定的时间(具体能容忍多久看业务场景)二个数据库中的数据最终达到一致。

完结,不正确之处,望大佬指正!求点赞鼓励~

rabbitmq 查询版本_基于rabbitmq解决分布式事务相关推荐

  1. 基于消息中间件解决分布式事务的开源框架Myth

    基于消息中间件的解决分布式事务框架:https://github.com/yu199195/myth 1.rpc框架支持 : dubbo,motan,springcloud. 2.消息中间件支持 : ...

  2. .net 实时通信_基于 RabbitMQ 的实时消息推送

    实现服务器端推送的几种方式 Web 应用都是基于 HTTP 协议的请求/响应模式,无法像 TCP 协议那样保持长连接,因此 Web 应用就很难像手机那样实现实时的消息推送.就目前来看,Web 应用的消 ...

  3. 分布式事务 - 如何解决分布式事务问题?

    分布式事物 - 如何解决分布式事务问题? 面试题 分布式事务了解吗?你们是如何解决分布式事务问题的? 面试官心理分析 只要聊到你做了分布式系统,必问分布式事务,你对分布式事务一无所知的话,确实会很坑, ...

  4. 解决分布式事务,Seata真香

    目录 背景介绍 什么是分布式事务 什么叫做逆向补偿呢 互联网最流行的分布式事务组件seata 总结 背景 大家好,今天给大家分享一个在 2022 年出去面试 Java 几乎必问的一个技术,那就是 se ...

  5. SpringCloud分布式事务,版本二:添加 Seata 分布式事务版本

    基于 Seata 1.4.0 版本 首先贴出此项目地址:Seata 分布式事务版本 先了未添加事务项目再看此版本:未添加事务版本 此文章是基于上一篇的项目基础上添加的内容,所以务必先看上一篇 Seat ...

  6. RocketMQ如何解决分布式事务

    本文来说下RocketMQ如何解决分布式事务 文章目录 基本实现思路 RocketMQ的事务消息状态 代码实例 maven导入 yaml文件配置 核心代码 本文小结 基本实现思路 核心思想:事务消息总 ...

  7. 分布式事务模型--基于消息的分布式事务

    本文来说下分布式事务模型之基于消息的分布式事务 文章目录 概述 基于消息的分布式事务 基于事务消息的分布式事务 基于本地消息的分布式事务 特点剖析 本文小结 概述 事务是一组不可分组的操作集合,这些操 ...

  8. 基于activemq的分布式事务解决方案

    1.分布式事务出现场景 场景描述:支付宝转账余额宝 分布式事务必须满足的条件: 1.远程RPC调用,支付宝和余额宝存在接口调用 2.支付宝和余额宝使用不同的数据库 如图: 2.分布式事务解决方案 1. ...

  9. 搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

    搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocke ...

最新文章

  1. 任正非:管理就要铲除公司夹心层!
  2. java lList Map Set总结
  3. SAP生产订单预留相关的备忘录
  4. CommonJS概述及使用
  5. C语言编程笔记:关于 for循环 的那些不为人知的秘密
  6. python3学习者的福音
  7. Aspose.Cells Smart markers 基于模板导出Excel
  8. 利用javaScript动态增加表格行,删除表格行
  9. RoboMaster电机驱动
  10. 游戏能给QQ一个未来吗?
  11. MATLAB强化学习实战(十) 多智能体的路径跟随控制
  12. Python在数字后端中的应用(一)
  13. 文本分类——KNN算法
  14. 大麦网启动“麦香计划”,将投3亿元布局戏剧内容领域
  15. 驱动蓝屏代码及原因,解决方案
  16. 第六次大灭绝还远吗?––读《大灭绝时代》
  17. java计算机毕业设计营养分析系统源码+数据库+系统+lw文档+部署
  18. wfilters小波滤波器
  19. 高通公司全球首席商务官Jim Cathey先生一行莅临美格智能参观指导
  20. 第一期:代码出现的英文及含义

热门文章

  1. ARM Cortex-M嵌入式C基础编程(下)
  2. PHP 算法题:有多少苹果用来分赃1.1
  3. [JAVA EE] 拦截器
  4. HarmonyOS ScrollView 使用
  5. ConnectionAbortedError: [WinError 10053] 您的主机中的软件中止了一个已建立的连接
  6. Resource entery xx is already defined
  7. buildConfigField 使用
  8. 统计文本中出现的单词个数频率
  9. intel最新的服务器芯片,Intel最新服务器CPU 芯片组Roadmap
  10. 移动端导航页面html,swiper4实现移动端导航切换