不懂就问

灵魂拷问之☞导盲犬禁止入内是给犬看的还是盲人看的?
各位巨佬们把答案留在评论区吧

分布式事务

随着互联网快速发展,微服务,SOA 等服务架构模式正在被大规模的使用,现在分布式系统一般由多个独立的子系统组成,多个子系统通过网络通信互相协作配合完成各个功能。有很多用例会跨多个子系统才能完成,比较典型的是电子商务网站的下单支付流程,至少会涉及交易系统和支付系统。而且这个过程中会涉及到事务的概念,即保证交易系统和支付系统的数据一致性,此处我们称这种跨系统的事务为分布式事务。具体一点而言,分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

解决方案

1.两阶段提交(2PC)
2.补偿事务(TCC)
3.本地消息表(异步确保)
4.MQ 事务消息

实现步骤

1.上游服务像消息服务发送一条预提交消息
2.消息服务返回对应的曲剧唯一的消息ID
3.上游服务实现自身业务,执行本地逻辑,根据本地事务决定提交或者回滚
4.消息服务根据上游服务响应的结果提交或者回滚(删除消息)
5.如果上游消息响应提交则吧消息发送到MQ
6.发送消息到MQ后,需要把MQ的Confirm机制打开,针对消息发送的状态进行回调
7.消息服务监听MQ回调,根据业务逻辑判断是否需要回滚或者提交,走第4步
8.当上游消息执行某段业务逻辑可能会抛异常或者其他的错误,会导致消息一直都是待提交的状态,需要启动一个后台定时任务轮询消息表,把所有未提交的消息进行确定,根据结果提交或者回滚

实战代码

源码

github源码
csdn源码

1.项目结构

源码会上传到github和csdn的资源,可以自行下载,就不提供像maven等相关依赖、配置文件相关的代码了,项目整体的架构是Springboot、注册和配置中心Nacos、Redis加上RabbitMQ。需要好哥哥们熟悉相关的技术点,后续有时间一个个来整吧
结构

2.sql语句
CREATE TABLE `message_record` (`id_` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',`business_id` varchar(64) DEFAULT NULL COMMENT '业务数据ID',`business_type` tinyint(2) DEFAULT NULL COMMENT '业务类型:具体业务',`message_id` varchar(64) NOT NULL COMMENT '消息ID',`retries_number` tinyint(2) DEFAULT '0' COMMENT '重试次数',`status_` tinyint(2) DEFAULT '0' COMMENT '结果 1 成功  0 失败',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',PRIMARY KEY (`id_`),UNIQUE KEY `inx_message_id` (`message_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='rabbit消息记录';
3.MQ配置
import com.xjw.config.constant.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author xiejianwei* @ClassName BusinessOrderRabbitMqConfig*/
@Configuration
public class OrderRabbitMqConfig {/*** 初始化队列** @return*/@Beanpublic Queue orderQueue() {return new Queue(RabbitmqConstant.ORDER_QUEUE, true);}/*** 初始化交换机** @return*/@Beanpublic DirectExchange orderExchange() {return new DirectExchange(RabbitmqConstant.ORDER_EXCHANGE, true, false);}/*** 队列通过路由键绑定到交换机** @return*/@Beanpublic Binding bind() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(RabbitmqConstant.ORDER_ROUTING_KEY);}
}
3.实体类
import lombok.Getter;
import lombok.Setter;import java.util.Date;
import java.util.UUID;/*** @author xiejianwei*/
@Getter
@Setter
public class MessageRecord {/*** 主键ID*/private Long id;/*** 业务数据ID*/private String businessId;/*** 业务类型*/private int businessType;/*** 消息ID*/private String messageId;/*** 重试次数*/private int retriesNumber;/*** 消息状态 (0.失败,1成功)*/private int status;/*** 创建时间*/private Date createTime;public MessageRecord() {}public MessageRecord(String businessId, int businessType) {this.businessId = businessId;this.businessType = businessType;this.messageId = UUID.randomUUID().toString().replace("-", "").toLowerCase();this.retriesNumber = 0;this.createTime = new Date();this.status = 0;}
}
import java.math.BigDecimal;/*** @author xiejianwei*/
@Getter
@Setter
public class Order extends SerializableDto {/*** 订单编号*/private String orderId;/*** 订单金额*/private BigDecimal amount;/*** 做简单的例子就不关联业务ID了*/private String productName;
}
4.业务实现
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.entity.pojo.Order;
import com.xjw.service.MessageRecordService;
import com.xjw.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;/*** @author xiejianwei*/
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {@Autowiredpublic MessageRecordService messageRecordService;/*** 模拟发起一个简单的订单** @param order* @return*/@Override@Transactional(rollbackFor = Exception.class)public boolean start(Order order) {//触发保存本地消息表MessageRecord messageRecord = new MessageRecord(order.getOrderId(), 1);messageRecordService.preCommit(messageRecord);log.info("这里可以做本地业务操作");log.info("下单中,请稍等-----");log.info("恭喜您,下单成功,订单号:{}", order.getOrderId());// 操作本地事务成功则commit 消息,如果处理本地事务异常,则会有定时任务回调messageRecordService.commit(messageRecord.getMessageId(), true);return true;}
}
import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.mapper.MessageRecordMapper;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.List;/*** @author xiejianwei*/
@Service
public class MessageRecordServiceImpl implements MessageRecordService {@Autowiredpublic MessageRecordMapper messageRecordMapper;@Autowiredpublic RabbitmqService rabbitmqService;@Overridepublic boolean preCommit(MessageRecord messageRecord) {return messageRecordMapper.insert(messageRecord);}@Overridepublic boolean commit(String messageId, boolean commitFlag) {/*** 不提交则代表回滚*/if (!commitFlag) {messageRecordMapper.delete(messageId);return true;}// 提交消息到MQMessageRecord messageRecord = messageRecordMapper.find(messageId);/*** 发送MQ消息* 将唯一消息ID设置给CorrelationData* 回调时可以用这个ID查找到数据对应的消息记录*/rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));return true;}@Overridepublic void update(String messageId) {messageRecordMapper.update(messageId);}@Overridepublic MessageRecord find(String messageId) {return messageRecordMapper.find(messageId);}@Overridepublic List<MessageRecord> findAll(int status) {return messageRecordMapper.findAll(status);}
}
import com.xjw.callback.RabbitMqConfirmCallback;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** @author xiejianwei* @ClassName RabbitmqServiceImpl* @Description 发送mq消息*/
@Service
public class RabbitmqServiceImpl implements RabbitmqService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitMqConfirmCallback rabbitMqConfirmCallback;/*** 发送消息到mq(单个)** @param exchange   交换机的名称* @param routingKey 路由key值* @param messages   消息的附件消息*/@Overridepublic void sendMessage(String exchange, String routingKey, String messages, CorrelationData correlationData) {/*** 设置回调*/rabbitTemplate.setConfirmCallback(rabbitMqConfirmCallback);rabbitTemplate.convertAndSend(exchange, routingKey, messages, correlationData);}
}
5.接口管理
import com.xjw.entity.pojo.Order;
import com.xjw.entity.vo.R;
import com.xjw.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.math.BigDecimal;
import java.util.UUID;/*** 订单接口管理** @author xiejianwei*/
@RestController
@RequestMapping("/order")
@Validated
public class OrderController {@Autowiredpublic OrderService orderService;@PostMapping("/start")public R page(@RequestBody String productName) {Order order = new Order();order.setAmount(BigDecimal.valueOf(5000));order.setProductName(productName);order.setOrderId(UUID.randomUUID().toString().replace("-", "").toLowerCase());orderService.start(order);return R.success();}
}
6.mq/本地消息回调
import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author xiejianwei*/
@Component
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback {@Autowiredprivate MessageRecordService messageRecordService;@Autowiredpublic RabbitmqService rabbitmqService;/*** @param correlationData 相关配置信息* @param ack             交换机是否成功收到消息* @param cause           错误信息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {/*** 这个就是我们发送消息设置的messageId*/String messageId = correlationData.getId();// 未发送成功if (!ack) {MessageRecord messageRecord = messageRecordService.find(messageId);if (null != messageRecord) {// 重发rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));}} else {// 修改消息状态为成功messageRecordService.update(messageId);}}
}
/*** 根据具体的业务,判断是否需要提交或者回滚消息** @author xiejianwei*/
@Component
public class OrderMessageRecordConfirm implements MessageRecordCallback {@Overridepublic boolean confirm(MessageRecord messageRecord) {String messageId = messageRecord.getMessageId();/*** 根据具体的业务,判断是否需要提交或者回滚消息*/if ("1212321".equals(messageId)) {return true;}return false;}
}
7.定时任务
import com.xjw.callback.MessageRecordCallback;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.List;/*** @author xiejianwei*/
@Component
@EnableScheduling
public class MessageRecordConfirmTask {@Autowiredpublic MessageRecordService messageRecordService;@Autowiredpublic MessageRecordCallback messageRecordCallback;/*** 每隔5分钟轮询消息表*/@Scheduled(cron = "0 0/5 * * * ?")public void confirm() {// 查询所有状态等于0(未提交的状态)List<MessageRecord> all = messageRecordService.findAll(0);if (null != all && all.size() > 0) {all.forEach(messageRecord -> {boolean confirm = messageRecordCallback.confirm(messageRecord);// 根据回调结果执行提交或者回滚messageRecordService.commit(messageRecord.getMessageId(), confirm);});}}
}

本期到这里啦,写的不对的地方巨佬们多多指点,喜欢的话来一个一键三连吧

「项目实战」有一说一这才是RabbitMQ实现分布式事务的正确姿势(附源码)相关推荐

  1. 有一说一!这才是RabbitMQ实现分布式事务的正确姿势(项目实战)

    分布式事务 随着互联网快速发展,微服务,SOA 等服务架构模式正在被大规模的使用,现在分布式系统一般由多个独立的子系统组成,多个子系统通过网络通信互相协作配合完成各个功能. 有很多用例会跨多个子系统才 ...

  2. Python爬虫实战,requests+openpyxl模块,爬取小说数据并保存txt文档(附源码)

    前言 今天给大家介绍的是Python爬取小说数据并保存txt文档,在这里给需要的小伙伴们代码,并且给出一点小心得. 首先是爬取之前应该尽可能伪装成浏览器而不被识别出来是爬虫,基本的是加请求头,但是这样 ...

  3. 萌新Java开发实战记录:大数据开发之”IP热力图、地点热门TopN(文章底部附源码)

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 一. 课程设计背景概述 1. <IP经纬热力图>概述 2. <电商分析系统>概述 二.需求分析 1.&l ...

  4. 「项目实战」一文读懂思科网络设备IOS系统

    今天给大家带来的小知识是一文读懂思科的IOS系统,相信大家都有了解,但是今天呢给大家把完整的流程梳理出来,这样有助于大家记笔记哦! IOS是被用来传送网络服务并启动网络应用的.Cisco路由器的IOS ...

  5. MATLAB应用实战系列(四十四)-基于matlab的支持向量机分类、回归问题(附源码解析)

    Part.1 支持向量机(support vector machines)是一种二分类模型,它的目的是寻找一个超平面来对样本进行分割,分割的原则是间隔最大化,最终转化为一个凸二次规划问题来求解. 对于 ...

  6. 【Python爬虫实战】 不生产小说,只做网站的搬运工,太牛逼了~(附源码)

    前言 遇见你时,漫天星河皆为浮尘 不知从什么时候开始.小说开始掀起了一股浪潮,它让我们平日里的生活不在枯燥乏 味,很多我们做不到的事情在小说里都能轻易实现. 那么话不多说,下面我们就来具体看看它是如何 ...

  7. 「群体遗传学实战」第三课: 如何对SNP位点进行过滤

    往期教程 「群体遗传学实战」第一课: 对SNP位点进行注释 「群体遗传学实战」第二课: 画出和文章几乎一样的PCA图 SNP过滤有两种情况,一种是仅根据位点质量信息(测序深度,回帖质量等)对SNP进行 ...

  8. 5年后,你将如何融入20万亿美元的「项目经济」|ONES 洞见

    「项目经济」来了. 据项目管理协会(PMI)估算,5年后的2027年,全世界至少有8800万人从事项目管理相关工作,以项目为导向的经济活动创造的价值达20万亿美元--2021年,中国 GDP 总量约1 ...

  9. 测速源码_物联网之智能平衡车开发实战项目(附源码)

    自从上次分享了"适合练手的10个前端实战项目(附源码)"之后,很多小伙伴就私信问有没有物联网相关的实战项目教程,那么今天就给大家分享一个物联网工作初期经常接触的项目:智能平衡车开发 ...

最新文章

  1. python迭代器和生成器_python中迭代器和生成器。
  2. 《从问题到程序:用Python学编程和计算》——3.4 定义函数
  3. Linux的项目中积累的实际工作技巧
  4. 代码评审的不可能三角
  5. 锅巴H264播放器地址和说明
  6. 如何提高安卓代码的质量和语法
  7. solaris php,针对 Solaris 的安装提示
  8. java.util.Random 类的 nextInt(int num )
  9. 函数调用方式以及this指向
  10. flashcs3java_Flash CS3组件开发图文教程
  11. C#获取当前时区转换方法
  12. juc-并发工具类源码解析
  13. PHP进销存软件源代码ERP多仓储管理系统源码
  14. 性能测试实战(五):参数化+关联
  15. 前端开发:报错Error in... ”SyntaxError:Unexpected token u in JSON at position 0”…解决方法
  16. (ROC-RK3568-PC) 裸机24_驱动VOP2显示自绘画面
  17. 在线调试工具BTrace 的使用--例子
  18. 「雷锋前线」那昕出任CEO,“什么值得买”会变吗
  19. PC端和移动端微信加入群聊测试用例设计点
  20. 测试驱动开发(TDD)

热门文章

  1. 实时监测网络是否断线的几种办法(转coolend)
  2. App Annie 现更名 data.ai,首家由 AI 赋能的聚合数据公司
  3. crontab中如何设置每30秒执行一次任务
  4. 双重打击:手机碎屏了,换屏过程中可被黑客入侵
  5. excel筛选栏显示各项数量,excel筛选怎么看数量
  6. Dubbo+Flutter在线交友平台教程第三天 今日佳人功能实现
  7. 看过来!「开源者行」——清华大学站精彩图文回顾新鲜出炉!
  8. 国内真正有技术开发能力的不足200人,区块链是否“只是一种传说”?
  9. cdr怎么转plt_CorelDraw如何批量转换PLT CDR文字适合路径的问题 精简增强版的快捷键 “按间距分布”插件...
  10. 二手商品在线拍卖网站系统