分布式事务

随着互联网快速发展,微服务,SOA 等服务架构模式正在被大规模的使用,现在分布式系统一般由多个独立的子系统组成,多个子系统通过网络通信互相协作配合完成各个功能。

有很多用例会跨多个子系统才能完成,比较典型的是电子商务网站的下单支付流程,至少会涉及交易系统和支付系统。而且这个过程中会涉及到事务的概念,即保证交易系统和支付系统的数据一致性,此处我们称这种跨系统的事务为分布式事务。
具体一点而言,分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

解决方案

1.两阶段提交(2PC)
2.补偿事务(TCC)
3.本地消息表(异步确保)
4.MQ 事务消息

实现步骤

1.上游服务像消息服务发送一条预提交消息
2.消息服务返回对应的曲剧唯一的消息ID
3.上游服务实现自身业务,执行本地逻辑,根据本地事务决定提交或者回滚
4.消息服务根据上游服务响应的结果提交或者回滚(删除消息)
5.如果上游消息响应提交则吧消息发送到MQ
6.发送消息到MQ后,需要把MQ的Confirm机制打开,针对消息发送的状态进行回调
7.消息服务监听MQ回调,根据业务逻辑判断是否需要回滚或者提交,走第4步
8.当上游消息执行某段业务逻辑可能会抛异常或者其他的错误,会导致消息一直都是待提交的状态,需要启动一个后台定时任务轮询消息表,把所有未提交的消息进行确定,根据结果提交或者回滚

流程图

image

实战代码

1.项目结构

源码会上传到github和csdn的资源,可以自行下载,就不提供像maven等相关依赖、配置文件相关的代码了,项目整体的架构是Springboot、注册和配置中心Nacos、Redis加上RabbitMQ。需要好哥哥们熟悉相关的技术点,后续有时间一个个来整吧

结构

image

2.sql语句
CREATE TABLE `message_record` (`id_` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',`business_id` varchar(64) DEFAULT NULL COMMENT '业务数据ID',`business_type` tinyint(2) DEFAULT NULL COMMENT '业务类型:具体业务',`message_id` varchar(64) NOT NULL COMMENT '消息ID',`retries_number` tinyint(2) DEFAULT '0' COMMENT '重试次数',`status_` tinyint(2) DEFAULT '0' COMMENT '结果 1 成功  0 失败',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',PRIMARY KEY (`id_`),UNIQUE KEY `inx_message_id` (`message_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='rabbit消息记录';
3.MQ配置
import com.xjw.config.constant.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author xiejianwei* @ClassName BusinessOrderRabbitMqConfig*/
@Configuration
public class OrderRabbitMqConfig {/*** 初始化队列** @return*/@Beanpublic Queue orderQueue() {return new Queue(RabbitmqConstant.ORDER_QUEUE, true);}/*** 初始化交换机** @return*/@Beanpublic DirectExchange orderExchange() {return new DirectExchange(RabbitmqConstant.ORDER_EXCHANGE, true, false);}/*** 队列通过路由键绑定到交换机** @return*/@Beanpublic Binding bind() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(RabbitmqConstant.ORDER_ROUTING_KEY);}
}
3.实体类
import lombok.Getter;
import lombok.Setter;import java.util.Date;
import java.util.UUID;/*** @author xiejianwei*/
@Getter
@Setter
public class MessageRecord {/*** 主键ID*/private Long id;/*** 业务数据ID*/private String businessId;/*** 业务类型*/private int businessType;/*** 消息ID*/private String messageId;/*** 重试次数*/private int retriesNumber;/*** 消息状态 (0.失败,1成功)*/private int status;/*** 创建时间*/private Date createTime;public MessageRecord() {}public MessageRecord(String businessId, int businessType) {this.businessId = businessId;this.businessType = businessType;this.messageId = UUID.randomUUID().toString().replace("-", "").toLowerCase();this.retriesNumber = 0;this.createTime = new Date();this.status = 0;}
}
import java.math.BigDecimal;/*** @author xiejianwei*/
@Getter
@Setter
public class Order extends SerializableDto {/*** 订单编号*/private String orderId;/*** 订单金额*/private BigDecimal amount;/*** 做简单的例子就不关联业务ID了*/private String productName;
}
4.业务实现
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.entity.pojo.Order;
import com.xjw.service.MessageRecordService;
import com.xjw.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;/*** @author xiejianwei*/
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {@Autowiredpublic MessageRecordService messageRecordService;/*** 模拟发起一个简单的订单** @param order* @return*/@Override@Transactional(rollbackFor = Exception.class)public boolean start(Order order) {//触发保存本地消息表MessageRecord messageRecord = new MessageRecord(order.getOrderId(), 1);messageRecordService.preCommit(messageRecord);log.info("这里可以做本地业务操作");log.info("下单中,请稍等-----");log.info("恭喜您,下单成功,订单号:{}", order.getOrderId());// 操作本地事务成功则commit 消息,如果处理本地事务异常,则会有定时任务回调messageRecordService.commit(messageRecord.getMessageId(), true);return true;}
}
import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.mapper.MessageRecordMapper;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.List;/*** @author xiejianwei*/
@Service
public class MessageRecordServiceImpl implements MessageRecordService {@Autowiredpublic MessageRecordMapper messageRecordMapper;@Autowiredpublic RabbitmqService rabbitmqService;@Overridepublic boolean preCommit(MessageRecord messageRecord) {return messageRecordMapper.insert(messageRecord);}@Overridepublic boolean commit(String messageId, boolean commitFlag) {/*** 不提交则代表回滚*/if (!commitFlag) {messageRecordMapper.delete(messageId);return true;}// 提交消息到MQMessageRecord messageRecord = messageRecordMapper.find(messageId);/*** 发送MQ消息* 将唯一消息ID设置给CorrelationData* 回调时可以用这个ID查找到数据对应的消息记录*/rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));return true;}@Overridepublic void update(String messageId) {messageRecordMapper.update(messageId);}@Overridepublic MessageRecord find(String messageId) {return messageRecordMapper.find(messageId);}@Overridepublic List<MessageRecord> findAll(int status) {return messageRecordMapper.findAll(status);}
}
import com.xjw.callback.RabbitMqConfirmCallback;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** @author xiejianwei* @ClassName RabbitmqServiceImpl* @Description 发送mq消息*/
@Service
public class RabbitmqServiceImpl implements RabbitmqService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitMqConfirmCallback rabbitMqConfirmCallback;/*** 发送消息到mq(单个)** @param exchange   交换机的名称* @param routingKey 路由key值* @param messages   消息的附件消息*/@Overridepublic void sendMessage(String exchange, String routingKey, String messages, CorrelationData correlationData) {/*** 设置回调*/rabbitTemplate.setConfirmCallback(rabbitMqConfirmCallback);rabbitTemplate.convertAndSend(exchange, routingKey, messages, correlationData);}
}
5.接口管理
import com.xjw.entity.pojo.Order;
import com.xjw.entity.vo.R;
import com.xjw.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.math.BigDecimal;
import java.util.UUID;/*** 订单接口管理** @author xiejianwei*/
@RestController
@RequestMapping("/order")
@Validated
public class OrderController {@Autowiredpublic OrderService orderService;@PostMapping("/start")public R page(@RequestBody String productName) {Order order = new Order();order.setAmount(BigDecimal.valueOf(5000));order.setProductName(productName);order.setOrderId(UUID.randomUUID().toString().replace("-", "").toLowerCase());orderService.start(order);return R.success();}
}
6.mq/本地消息回调
import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author xiejianwei*/
@Component
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback {@Autowiredprivate MessageRecordService messageRecordService;@Autowiredpublic RabbitmqService rabbitmqService;/*** @param correlationData 相关配置信息* @param ack             交换机是否成功收到消息* @param cause           错误信息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {/*** 这个就是我们发送消息设置的messageId*/String messageId = correlationData.getId();// 未发送成功if (!ack) {MessageRecord messageRecord = messageRecordService.find(messageId);if (null != messageRecord) {// 重发rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));}} else {// 修改消息状态为成功messageRecordService.update(messageId);}}
}
/*** 根据具体的业务,判断是否需要提交或者回滚消息** @author xiejianwei*/
@Component
public class OrderMessageRecordConfirm implements MessageRecordCallback {@Overridepublic boolean confirm(MessageRecord messageRecord) {String messageId = messageRecord.getMessageId();/*** 根据具体的业务,判断是否需要提交或者回滚消息*/if ("1212321".equals(messageId)) {return true;}return false;}
}
7.定时任务
import com.xjw.callback.MessageRecordCallback;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.List;/*** @author xiejianwei*/
@Component
@EnableScheduling
public class MessageRecordConfirmTask {@Autowiredpublic MessageRecordService messageRecordService;@Autowiredpublic MessageRecordCallback messageRecordCallback;/*** 每隔5分钟轮询消息表*/@Scheduled(cron = "0 0/5 * * * ?")public void confirm() {// 查询所有状态等于0(未提交的状态)List<MessageRecord> all = messageRecordService.findAll(0);if (null != all && all.size() > 0) {all.forEach(messageRecord -> {boolean confirm = messageRecordCallback.confirm(messageRecord);// 根据回调结果执行提交或者回滚messageRecordService.commit(messageRecord.getMessageId(), confirm);});}}
}

http://www.taodudu.cc/news/show-5438788.html

相关文章:

  • 分布式(一)分布式事务解决方案
  • Springboot 参数校验@Valid @Validated(最新最全)
  • 项目二:管理与维护linux系统
  • Docker启动提示:Cannot connect to the Docker daemon...
  • SpringBoot入门(一)
  • uniapp之h5公众号分享和授权
  • 数字信号处理实验 序列的傅里叶变换和离散傅里叶变换及其关系
  • 「项目实战」有一说一这才是RabbitMQ实现分布式事务的正确姿势(附源码)
  • OpenCV4函数合集
  • 第1台电子计算机英文缩写,1世界第一台电子计算机的英文名称是文档.doc
  • STM32——液晶显示中英文
  • 看完跳槽少说涨 5 K,前端面试从准备到谈薪完全指南(近万字精华)
  • 看完跳槽少说涨 5 K,前端面试从准备到谈薪完全指南(全是精华)
  • 涨薪 30 % 以上,看这篇万字干货就行
  • Go业务系统实战总结
  • Go业务系统开发总结
  • ❤️40条软件测试面试常考题目总结(附答案解析)【建议收藏】❤️
  • 学前端开发技术,CSS的文本样式属性值
  • 【带你上手云原生体系】第二部分:Go语言从入门到精通
  • 第十章 C++编程之构造顺序
  • Error: Activity class {com.xxx.xxx/com.xxx.xxx.MainActivity} does not exist.
  • easyExcel 导出文件时,设置单元格样式,自适应列宽
  • ThreadPoolExecutor源码解析
  • 数据成员是reference或const时该如何赋值?
  • JavaScript | call和apply
  • C#面向对象(三)多态
  • XBRL调查资料整理
  • java 共用体_结构体与共用体
  • php fpm www.conf,php-fpm.d 下面www.conf 配置错误,想问下全局的前缀怎么设置
  • C语言学习第八章

有一说一!这才是RabbitMQ实现分布式事务的正确姿势(项目实战)相关推荐

  1. 「项目实战」有一说一这才是RabbitMQ实现分布式事务的正确姿势(附源码)

    不懂就问 灵魂拷问之☞导盲犬禁止入内是给犬看的还是盲人看的? 各位巨佬们把答案留在评论区吧 分布式事务 随着互联网快速发展,微服务,SOA 等服务架构模式正在被大规模的使用,现在分布式系统一般由多个独 ...

  2. 基于RabbitMQ的分布式事务最终一致性解决方案

    1. 分布式事务 所谓事务,通俗一点讲就是一系列操作要么同时成功,要么同时失败.而分布式事务就是这一系列的操作在不同的节点上,那要如何保证事务的ACID特性呢. 原子性(atomicity).一个事务 ...

  3. rabbitmq 查询版本_基于rabbitmq解决分布式事务

    分布式事务要解决的问题是保证二个数据库数据的一致性,本地事务ACID属于刚性事务,基于CAP理论,分布式事务的核心要点柔性事务,最终一致性. 基于rabbitmq解决分布式事务要点如下 生产者采用发送 ...

  4. 这样才是代码管理和 Commit 的正确姿势 | 研发效能提升36计

    简介:效能提升从小习惯开始,这样才是代码管理和 Commit 的正确姿势! 专栏策划|雅纯 志愿编辑|张晟 软件交付是以代码为中心的交付过程,其中代码的作用有几点:第一,最终的制品要交付成什么样,需要 ...

  5. RabbitMQ实现分布式事务,保证数据一致性

    一.实验环境 Lunix系统:Centos7.5 安装软件:rabbitmq 开发工具:IDEA 二.实验目的 Rabbitmq实现多系统间的分布式事务,保证数据一致性 三.实验方案 rabbitmq ...

  6. 别再用 kill -9 了,这才是微服务上下线的正确姿势!

    作者:fredalxin 来源:https://fredal.xin/graceful-soa-updown 对于微服务来说,服务的优雅上下线是必要的. 就上线来说,如果组件或者容器没有启动成功,就不 ...

  7. 这才是JAVA中打印日志的正确姿势

    作者:lrwin 原文链接:http://t.cn/E9BkD7a 使用slf4j 1. 使用门面模式的日志框架,有利于维护和各个类的日志处理方式统一 2. 实现方式统一使用: Logback框架 打 ...

  8. java logger使用_这才是JAVA中打印日志的正确姿势

    作者:lrwin 原文链接:http://t.cn/E9BkD7a 使用slf4j 1. 使用门面模式的日志框架,有利于维护和各个类的日志处理方式统一 2. 实现方式统一使用: Logback框架 打 ...

  9. 这才是实现分布式锁的正确姿势!

    都9102年了,你还在手写分布式锁吗? 经常被问到"如何实现分布式锁",看来这是大家的一个痛点. 其实Java世界的"半壁江山"--Spring早就提供了分布式 ...

最新文章

  1. Spring Cloud构建微服务架构:服务容错保护(Hystrix服务降级)
  2. 结构体成员地址获得结构体起始地址
  3. 【图像分类】基于Pascal VOC2012增强数据的多标签图像分类实战
  4. 这个拖后腿的“in”
  5. requests库之处理响应
  6. java 鼠标精灵_纯Java实现跨平台鼠标键盘模拟、找图找色,Java版按键精灵
  7. mysql配置文件简易代码_MySQL配置文件my.cnf 例子最详细翻译
  8. BZOJ4423: [AMPPZ2013]Bytehattan
  9. from Crypto.Cipher import AES报错
  10. android 电话状态的监听(来电和去电)
  11. 易语言批量替换html,易语言实现批量文本替换操作的代码
  12. PHP5.3.1 安装包VC9/VC6区别
  13. Broadcom WICED Wi-Fi 研究BCM943362WCD4之STM32F205+43362通信
  14. 光电耦合器MOC3041
  15. 各种数据库中的dual表
  16. 人工智能会用绝对的力量,用更高层次的方式直接进入你的工作行业
  17. 25岁研究生拿高薪互联网的offer,意气风发的走出学校
  18. 程序员分析:99%的创业公司都不值得加入
  19. 网上最全的系统服务,让PF降到50以下(转)
  20. 秒杀系统架构设计思路

热门文章

  1. 在Ucloud实施KubernetesDocker集群踩坑实录
  2. 北汽新能源eu5/ex3/ex5系列免登录
  3. 2022年全球与中国前开口统一吊舱(FOUP)行业发展趋势及投资战略分析报告
  4. python实现股票选取
  5. 禾赛冲刺纳斯达克:9个月营收近8亿 小米百度美团是股东
  6. 对话禾赛CEO李一帆:一个激光雷达头部玩家对从1到10的思考!
  7. ucosii 软件定时器
  8. c语言圆的半径为5,运用c语言求解:已知圆的半径为5,求圆,圆的内接正方形,圆的外接正方形的面积和周长。感谢大神。...
  9. 2020年,数字化转型的策略和趋势
  10. OpenWrt 18.06编译龙尚模块驱动