手写基于Spring Cloud的TCC分布式事务框架
如何简单实现TCC分布式事务框架
最近听到很多其他公司的小伙伴谈分布式事务的问题,各种业务场景都有,可能就是这两年很多公司都在往微服务发展,现在各个子系统都拆分、建设的差不多了,实现了模块化开发,但是也遇到了很多分布式事务等问题,大多都用消息重试来保证外部系统的最终一致,或者把外部参与者限制为一个,其他操作全部本地实现、再结合业务场景的方式来解决。
如果业务要求严格一致性、执行时间短、实时性要求高,那么使用补偿事务TCC是比较合适的,但是TCC事务模型虽然说起来简单,好像简单的调用一下Confirm/Cancel业务就可以了,但是如果不了解它的实现原理,直接去使用那些开源的、商用的框架,可能会有一定难度和风险。
然后自己的一些项目也一直有相似的问题,于是周末就尝试写了一个基于Spring Cloud的TCC分布式事务框架,在一个调用现金系统+红包系统完成支付订单的工程中跑了一下可用,demo仓库地址在下面有列出。
背景上面已经说了,然后说一下实现的思路:
目标:
基于spring cloud开发,代码侵入性少,可读性强,结构精简的TCC框架;
技术:
根据Spring Cloud的Fegin等组件的特性,调研了一些厂的做法,使用Spring、JDK的ThreadLocal、AOP、事务管理器、自定义注解等特性。
特点:
一阶段调用和平常的外部调用一样,依次调用外部参与者即可;
二阶段调用由框架自动完成;
独立的事务恢复服务,扩展性好,使用Spring Task实现。
代码已经实现差不多,已上传仓库,TwoStage:https://github.com/anylots/payment
TCC示意图
用语雀画了个图,TCC其实是这样,简单来说就是业务应用(发起者)需要将远程调用拆分为两步:
- 第一布Try锁定资源(比如账户冻结10块钱);
- 第二步Confirm/Cancel操作(比如扣减上一步冻结的10块钱/取消冻结)。
- 其中第二步的Confirm/Cancel操作由事务协调器自动完成,这个事务协调器一般是作为一个模块引入到发起者系统的,发起者只需满足简单的编码规范即可。
- 参与者需要实现两阶段中的Try,Confirm/Cancel三套逻辑,具体的业务场景实现也就不一样。
框架使用
这里写了一个调用现金系统+红包系统完成支付订单的工程demo,两个系统必须都调用成功,才能完成支付。
step1、开启分布式事务
首先在发起者方法上加上Spring事务注解@Transactional,然后在执行支付的代码中添加TwoStageStarter.startTwoStage()即可开启两阶段提交:
/*** 两阶段支付服务** @param payInfo 支付工具信息*/@Override@Transactional(rollbackFor = Exception.class)public void payWithTwoStage(List<Map<String, Object>> payInfo) {//step1.开启两阶段提交TwoStageStarter.startTwoStage();//step2.现金扣减balanceManageService.balanceReduce(buildBalanceReduceInfo(payInfo));//step3.红包使用couponManageService.couponUse(buildCouponUseInfo(payInfo));}
step2、在参与者方法前面加上@TwoStages注解
一阶段调用该方法时,拦截器将存储该方法的信息,在二阶段时自动再次调用。
/*** 现金扣减** @param reduceInfo*/@TwoStages@Overridepublic void balanceReduce(BalanceReduceInfo reduceInfo) {//step 1. balance reduceString result = balanceServiceClient.balanceReduce(reduceInfo);//step 2. assertion resultsAssert.isTrue(ServiceConstants.SUCCESS.equals(result), "couponUse result is fail");}
框架实现原理
1、Spring事务同步器
分布式事务的提交是和本地事务绑定在一起的,第一步的TwoStageStarter.startTwoStage()方法定义了一个事务同步器,并注册到Spring事务上下文中,事务同步器中的twoPhaseProcess逻辑将在本地事务提交时执行:
/*** 开启两阶段提交* <p>* 两阶段提交TwoStage的启动须放在本地Spring事务中,* 且须放在调用外部参与者之前。* <p>* 在一阶段调用时,TwoStagesAspect拦截器将参与者类名、方法名、参数保存在ThreadLocal中,* 在本地事务提交、回滚后,Spring事务同步器将取出一阶段保存的信息,自动调用参与者二阶段方法,完成最终提交/回滚。*/public static void startTwoStage() {//定义spring事务同步器TransactionSynchronizationAdapter tccSynchronizationAdapter = new TransactionSynchronizationAdapter() {//在事务提交/回滚后调用@Overridepublic void afterCompletion(int status) {switch (status) {case 0://transaction status is committwoPhaseProcess(TransactionStatusEnum.STATUS_COMMITTED.getCode());break;case 1://transaction status is rollbacktwoPhaseProcess(TransactionStatusEnum.STATUS_ROLLED_BACK.getCode());break;default:logger.error("tcc transaction status is unknown");throw new RuntimeException("tcc transaction status is unknown");}}};//注册spring事务同步器,spring本地事务提交、回滚时会执行事务同步器中对应的方法;TransactionSynchronizationManager.registerSynchronization(tccSynchronizationAdapter);}/*** 第二阶段处理** @param stage 提交、回滚*/private static void twoPhaseProcess(String stage) throws RuntimeException {//获取一阶段调用时保存的参与者信息Set<TwoStageCompleter> stageCompletes = TwoStagesThreadLocal.get();if (stageCompletes == null) {logger.error("stageCompletes is null");return;}for (TwoStageCompleter completer : stageCompletes) {completer.invokeAfterPrepare(stage);}}
2、参与者拦截器
参与者方法上的@TwoStages注解,会被拦截器TwoStagesAspect的方法advice()拦截到,然后当识别到是一阶段调用时,会将一阶段调用涉及到的类、方法、参数封装成一个TwoStageCompleter对象,保存在线程变量TwoStagesThreadLocal中:
@Around("pointcut() && @annotation(twoStages)")public void advice(ProceedingJoinPoint joinPoint, TwoStages twoStages) {CommonInfo commonInfo = (CommonInfo) joinPoint.getArgs()[0];//一阶段调用if (commonInfo.isOnPrepareStage()) {//保存参与者信息到线程变量Set<TwoStageCompleter> stageCompletes = TwoStagesThreadLocal.get() == null ? new HashSet<>() : TwoStagesThreadLocal.get();stageCompletes.add(buildTwoStageCompleter(joinPoint, commonInfo));//保存事务订单记录OrderRecordService orderRecordService = (OrderRecordService) ApplicationContextGetBeanHelper.getBean(OrderRecordService.class);orderRecordService.saveOrderRecord(buildOrderRecord());}try {joinPoint.proceed();} catch (Throwable throwable) {LoggerUtil.error("tcc invoke error", throwable);throw new RuntimeException("tcc invoke error", throwable);}}
3、两阶段事务完成者TwoStageCompleter
TwoStageCompleter包含了参与者类、参与者方法名、参与者请求参数三个属性,以及二阶段执行方法invokeAfterPrepare():
/*** 两阶段事务完成者** @author anylots* @version $Id: TwoStageSync.java, v 0.1 2020年10月18日 20:14 anylots Exp $*/
public class TwoStageCompleter {private static Logger logger = LoggerFactory.getLogger(TwoStageCompleter.class);
/*** name of the class 参与者类*/private Class targetClass;/*** name of the class 参与者方法名*/private String methodName;/*** 参与者请求参数*/private CommonInfo commonInfo;/*** invoke after prepare** @param stage*/public void invokeAfterPrepare(String stage) {//设置参与者请求阶段commonInfo.setStage(stage);//调用参与者提交、回滚try {Method method = targetClass.getMethod(methodName, new Class[]{CommonInfo.class});method.invoke(ApplicationContextGetBeanHelper.getBean(targetClass), commonInfo);} catch (ReflectiveOperationException e) {logger.error("tcc method invoke error", e);throw new RuntimeException("tcc method invoke error", e);}logger.info("远程参与者事务提交/回滚完成");}
参与者两阶段方法实现
外部系统参与者Service需要实现TwoStageCommonService抽象类,然后根据具体业务实现prepare、commit、cancel方法:
public abstract class TwoStageCommonService {/*** 现金扣减两阶段方法** @param commonInfo*/public void process(CommonInfo commonInfo) {switch (commonInfo.getStage()) {case "prepare":prepare(commonInfo);break;case "commit":commit(commonInfo);break;case "cancel":cancel(commonInfo);break;default:break;}}public abstract void prepare(CommonInfo commonInfo);public abstract void commit(CommonInfo commonInfo);public abstract void cancel(CommonInfo commonInfo);
参与者方法必须实现幂等,以支持事务恢复任务、发起者重试;
4、事务恢复任务TccScheduledTask
如果发生二阶段执行失败,TccScheduledTask将定期捞取未完成的订单,重复调用参与者直到成功:
/*** 每隔十分钟执行, 单位:ms。*/@Scheduled(fixedRate = 10 * 60 * 1000)public void executeFixRate() {//捞取未完成事务记录List<OrderRecord> orderRecords = orderRecordService.findByStatus(OrderStatusEnum.INIT.getCode());for (OrderRecord orderRecord : orderRecords) {//解析二阶段参与者列表List<String> feignList = JSON.parseObject(orderRecord.getContext(), new TypeReference<List<String>>() {});//依次调用参与者,完成二阶段事务for (String feignInfo : feignList) {invokeForFeign(feignInfo, buildCommonInfo(orderRecord));}//更新发起者事务记录表orderRecord.setStatus(OrderStatusEnum.COMPLETE.getCode());orderRecordService.updateByOrderId(orderRecord);}}
调用参与者方法invokeForFeign的逻辑是这样的:
/*** invoke for feign** @param feignInfo* @param commonInfo*/private void invokeForFeign(String feignInfo, CommonInfo commonInfo) {//step1. feignInfo校验if (StringUtils.isEmpty(feignInfo) || !feignInfo.contains("_")) {LoggerUtil.error(String.format("feignInfo is not available,orderId=", commonInfo.getOrderId()));return;}//step2. 获取feign classClass clazz = null;try {clazz = Class.forName(feignInfo.split("_")[0]);} catch (ClassNotFoundException e) {LoggerUtil.error(String.format("feign class is not found,orderId=", commonInfo.getOrderId()), e);}//step3.调用参与者提交、回滚try {Method method = clazz.getMethod(feignInfo.split("_")[1], new Class[]{CommonInfo.class});method.invoke(ApplicationContextGetBeanHelper.getBean(clazz), commonInfo);} catch (ReflectiveOperationException e) {LoggerUtil.error("tcc schedule invoke error", e);}}
项目结构:
说明:本文还未完善,对分布式事务框架还将继续研究,然后继续更新
手写基于Spring Cloud的TCC分布式事务框架相关推荐
- Spring Cloud Alibaba —— Seata 分布式事务框架
导航 一.Seata 介绍 二.Seata 的工作原理 2.1 三个角色 2.2 工作流程 三.Seata AT 工作机制 3.1 一阶段 3.2 二阶段 四.案例演示(待补充) 一.Seata 介绍 ...
- spring cloud 集成 seata 分布式事务
spring cloud 集成 seata 分布式事务 基于 seata-server 1.6.x 序言 下载 seata-server 准备一个数据库 seata 专门为 seata-server ...
- 如何实现一个TCC分布式事务框架
声明:本文转载自:TCC分布式事务框架的一点思考 关于TCC事务机制的介绍,可以参考TCC事务机制简介. TCC事务模型虽然说起来简单,然而要基于TCC实现一个通用的分布式事务框架,却比它看上去要复杂 ...
- tcc分布式事务框架源码解析系列(四)之项目实战
通过之前的几篇文章我相信您已经搭建好了运行环境,本次的项目实战是依照happylifeplat-tcc-demo项目来演练,也是非常经典的分布式事务场景:支付成功,进行订单状态的更新,扣除用户账户,库 ...
- Spring Cloud Alibaba--分布式事务框架Seata
Seata作用:分布式事务框架Seata用于保证微服务间的事务一致性. Seata中涉及的对象种类:事务管理者TM.资源管理者RM.事务协调者TC. 参考博客:阿里开源的分布式事务框架 Seata
- 基于Spring Cloud + MyBatis的分布式架构网约车平台(DD 打车)后端原型系统设计与实现
资源下载地址:https://download.csdn.net/download/sheziqiong/85638879 资源下载地址:https://download.csdn.net/downl ...
- 带你手写基于 Spring 的可插拔式 RPC 框架(二)整体结构
前言 上一篇文章中我们已经知道了什么是 RPC 框架和为什么要做一个 RPC 框架了,这一章我们来从宏观上分析,怎么来实现一个 RPC 框架,这个框架都有那些模块以及这些模块的作用. 总体设计 在我们 ...
- seata xid是什么_使用Seata彻底解决Spring Cloud中的分布式事务问题!
Seata是Alibaba开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务,本文将通过一个简单的下单业务场景来对其用法进行详细介绍. 什么是分布式事务问题? 单体应用 单体应用 ...
- 使用Seata彻底解决Spring Cloud中的分布式事务问题!
摘要 Seata是Alibaba开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务,本文将通过一个简单的下单业务场景来对其用法进行详细介绍. 什么是分布式事务问题? 单体应用 单 ...
最新文章
- python opencv 视频剪辑
- 数组nn从外围1递增_最完整的PyTorch数据科学家指南(1)
- 创建一个对象时,在一个类当中 静态代码块 和普通代码块构造方法 的顺序?
- indesign如何画弧线_彩铅画入门教程,如何给独角兽设计一款好发型
- 1024程序员节:给DBA们的福音
- 如何解决分布式系统数据事务一致性问题(HBase加Solr)
- oracle 新增加控制文件,Oracle增加控制文件副本
- 揭秘合伙创业做生意成功的密码?
- Pycharm 和 Vs code 字体大小调整(Ctrl + 鼠标滚轮实现)
- 【笔试/面试】—— 不使用大于、小于、if 语句,实现 max 宏
- vs怎么把文字超链接_怎么拥有自己设计的简单个人网站(超细节)
- Hadoop生态系统常用组件导图
- 如何在Mac的内置词典中添加和删除单词
- 高等数学(第七版)同济大学 习题2-1 个人解答
- plt.rcParams[‘font.sans-serif‘] = [‘SimHei‘] 和plt.rcParams[‘axes.unicode_minus‘] = False
- 威纶触摸屏485轮询通讯_【威纶】触摸屏 界面制作软件 EBpro使用手册.pdf
- 树梅派应用27:通过USB蓝牙适配器连接BLE设备
- 如何清除远程桌面连接记录
- 安装perf后,执行perf命令报错。
- 25 岁,毕业写前端的这三年,多益网络java面试
热门文章
- 厉害了!九州云边缘计算管理平台获国家权威认证
- 【C/C++】劫持技术
- 我肝了两周,用react做了一个俄罗斯方块
- UOS系统无线网络手动关闭后无法恢复解决办法
- vue给标签动态添加元素_vue页面动态添加元素
- carsim和matlab有安装顺序吗,CarSim2017免费版
- oracle 数据块修复工具,BBED (Oracle Block Brower and EDitor Tool) :数据块修复工具
- 前端学习案例1-brower路由
- 戴尔游匣g15 5515笔记本电脑g3060锐龙cpu版装入显卡驱动配置pytorch环境踩坑总结
- Internet网际协议---IPv4协议