如何简单实现TCC分布式事务框架

最近听到很多其他公司的小伙伴谈分布式事务的问题,各种业务场景都有,可能就是这两年很多公司都在往微服务发展,现在各个子系统都拆分、建设的差不多了,实现了模块化开发,但是也遇到了很多分布式事务等问题,大多都用消息重试来保证外部系统的最终一致,或者把外部参与者限制为一个,其他操作全部本地实现、再结合业务场景的方式来解决。

如果业务要求严格一致性、执行时间短、实时性要求高,那么使用补偿事务TCC是比较合适的,但是TCC事务模型虽然说起来简单,好像简单的调用一下Confirm/Cancel业务就可以了,但是如果不了解它的实现原理,直接去使用那些开源的、商用的框架,可能会有一定难度和风险。

然后自己的一些项目也一直有相似的问题,于是周末就尝试写了一个基于Spring Cloud的TCC分布式事务框架,在一个调用现金系统+红包系统完成支付订单的工程中跑了一下可用,demo仓库地址在下面有列出。

背景上面已经说了,然后说一下实现的思路:

目标:

基于spring cloud开发,代码侵入性少,可读性强,结构精简的TCC框架;

技术:

根据Spring Cloud的Fegin等组件的特性,调研了一些厂的做法,使用Spring、JDK的ThreadLocal、AOP、事务管理器、自定义注解等特性。

特点:

一阶段调用和平常的外部调用一样,依次调用外部参与者即可;
二阶段调用由框架自动完成;
独立的事务恢复服务,扩展性好,使用Spring Task实现。

代码已经实现差不多,已上传仓库,TwoStage:https://github.com/anylots/payment

TCC示意图

用语雀画了个图,TCC其实是这样,简单来说就是业务应用(发起者)需要将远程调用拆分为两步:

  • 第一布Try锁定资源(比如账户冻结10块钱);
  • 第二步Confirm/Cancel操作(比如扣减上一步冻结的10块钱/取消冻结)。
  • 其中第二步的Confirm/Cancel操作由事务协调器自动完成,这个事务协调器一般是作为一个模块引入到发起者系统的,发起者只需满足简单的编码规范即可。
  • 参与者需要实现两阶段中的Try,Confirm/Cancel三套逻辑,具体的业务场景实现也就不一样。

框架使用

这里写了一个调用现金系统+红包系统完成支付订单的工程demo,两个系统必须都调用成功,才能完成支付。

step1、开启分布式事务

首先在发起者方法上加上Spring事务注解@Transactional,然后在执行支付的代码中添加TwoStageStarter.startTwoStage()即可开启两阶段提交:

    /*** 两阶段支付服务** @param payInfo 支付工具信息*/@Override@Transactional(rollbackFor = Exception.class)public void payWithTwoStage(List<Map<String, Object>> payInfo) {//step1.开启两阶段提交TwoStageStarter.startTwoStage();//step2.现金扣减balanceManageService.balanceReduce(buildBalanceReduceInfo(payInfo));//step3.红包使用couponManageService.couponUse(buildCouponUseInfo(payInfo));}
step2、在参与者方法前面加上@TwoStages注解

一阶段调用该方法时,拦截器将存储该方法的信息,在二阶段时自动再次调用。

    /*** 现金扣减** @param reduceInfo*/@TwoStages@Overridepublic void balanceReduce(BalanceReduceInfo reduceInfo) {//step 1. balance reduceString result = balanceServiceClient.balanceReduce(reduceInfo);//step 2. assertion resultsAssert.isTrue(ServiceConstants.SUCCESS.equals(result), "couponUse result is fail");}

框架实现原理

1、Spring事务同步器

分布式事务的提交是和本地事务绑定在一起的,第一步的TwoStageStarter.startTwoStage()方法定义了一个事务同步器,并注册到Spring事务上下文中,事务同步器中的twoPhaseProcess逻辑将在本地事务提交时执行:

/*** 开启两阶段提交* <p>* 两阶段提交TwoStage的启动须放在本地Spring事务中,* 且须放在调用外部参与者之前。* <p>* 在一阶段调用时,TwoStagesAspect拦截器将参与者类名、方法名、参数保存在ThreadLocal中,* 在本地事务提交、回滚后,Spring事务同步器将取出一阶段保存的信息,自动调用参与者二阶段方法,完成最终提交/回滚。*/public static void startTwoStage() {//定义spring事务同步器TransactionSynchronizationAdapter tccSynchronizationAdapter = new TransactionSynchronizationAdapter() {//在事务提交/回滚后调用@Overridepublic void afterCompletion(int status) {switch (status) {case 0://transaction status is committwoPhaseProcess(TransactionStatusEnum.STATUS_COMMITTED.getCode());break;case 1://transaction status is rollbacktwoPhaseProcess(TransactionStatusEnum.STATUS_ROLLED_BACK.getCode());break;default:logger.error("tcc transaction status is unknown");throw new RuntimeException("tcc transaction status is unknown");}}};//注册spring事务同步器,spring本地事务提交、回滚时会执行事务同步器中对应的方法;TransactionSynchronizationManager.registerSynchronization(tccSynchronizationAdapter);}/*** 第二阶段处理** @param stage 提交、回滚*/private static void twoPhaseProcess(String stage) throws RuntimeException {//获取一阶段调用时保存的参与者信息Set<TwoStageCompleter> stageCompletes = TwoStagesThreadLocal.get();if (stageCompletes == null) {logger.error("stageCompletes is null");return;}for (TwoStageCompleter completer : stageCompletes) {completer.invokeAfterPrepare(stage);}}
2、参与者拦截器

参与者方法上的@TwoStages注解,会被拦截器TwoStagesAspect的方法advice()拦截到,然后当识别到是一阶段调用时,会将一阶段调用涉及到的类、方法、参数封装成一个TwoStageCompleter对象,保存在线程变量TwoStagesThreadLocal中:

    @Around("pointcut() && @annotation(twoStages)")public void advice(ProceedingJoinPoint joinPoint, TwoStages twoStages) {CommonInfo commonInfo = (CommonInfo) joinPoint.getArgs()[0];//一阶段调用if (commonInfo.isOnPrepareStage()) {//保存参与者信息到线程变量Set<TwoStageCompleter> stageCompletes = TwoStagesThreadLocal.get() == null ? new HashSet<>() : TwoStagesThreadLocal.get();stageCompletes.add(buildTwoStageCompleter(joinPoint, commonInfo));//保存事务订单记录OrderRecordService orderRecordService = (OrderRecordService) ApplicationContextGetBeanHelper.getBean(OrderRecordService.class);orderRecordService.saveOrderRecord(buildOrderRecord());}try {joinPoint.proceed();} catch (Throwable throwable) {LoggerUtil.error("tcc invoke error", throwable);throw new RuntimeException("tcc invoke error", throwable);}}
3、两阶段事务完成者TwoStageCompleter

TwoStageCompleter包含了参与者类、参与者方法名、参与者请求参数三个属性,以及二阶段执行方法invokeAfterPrepare():

/*** 两阶段事务完成者** @author anylots* @version $Id: TwoStageSync.java, v 0.1 2020年10月18日 20:14 anylots Exp $*/
public class TwoStageCompleter {private static Logger logger = LoggerFactory.getLogger(TwoStageCompleter.class);
/*** name of the class 参与者类*/private Class targetClass;/*** name of the class 参与者方法名*/private String methodName;/*** 参与者请求参数*/private CommonInfo commonInfo;/*** invoke after prepare** @param stage*/public void invokeAfterPrepare(String stage) {//设置参与者请求阶段commonInfo.setStage(stage);//调用参与者提交、回滚try {Method method = targetClass.getMethod(methodName, new Class[]{CommonInfo.class});method.invoke(ApplicationContextGetBeanHelper.getBean(targetClass), commonInfo);} catch (ReflectiveOperationException e) {logger.error("tcc method invoke error", e);throw new RuntimeException("tcc method invoke error", e);}logger.info("远程参与者事务提交/回滚完成");}
参与者两阶段方法实现

外部系统参与者Service需要实现TwoStageCommonService抽象类,然后根据具体业务实现prepare、commit、cancel方法:

public abstract class TwoStageCommonService {/*** 现金扣减两阶段方法** @param commonInfo*/public void process(CommonInfo commonInfo) {switch (commonInfo.getStage()) {case "prepare":prepare(commonInfo);break;case "commit":commit(commonInfo);break;case "cancel":cancel(commonInfo);break;default:break;}}public abstract void prepare(CommonInfo commonInfo);public abstract void commit(CommonInfo commonInfo);public abstract void cancel(CommonInfo commonInfo);

参与者方法必须实现幂等,以支持事务恢复任务、发起者重试;

4、事务恢复任务TccScheduledTask

如果发生二阶段执行失败,TccScheduledTask将定期捞取未完成的订单,重复调用参与者直到成功:

    /*** 每隔十分钟执行, 单位:ms。*/@Scheduled(fixedRate = 10 * 60 * 1000)public void executeFixRate() {//捞取未完成事务记录List<OrderRecord> orderRecords = orderRecordService.findByStatus(OrderStatusEnum.INIT.getCode());for (OrderRecord orderRecord : orderRecords) {//解析二阶段参与者列表List<String> feignList = JSON.parseObject(orderRecord.getContext(), new TypeReference<List<String>>() {});//依次调用参与者,完成二阶段事务for (String feignInfo : feignList) {invokeForFeign(feignInfo, buildCommonInfo(orderRecord));}//更新发起者事务记录表orderRecord.setStatus(OrderStatusEnum.COMPLETE.getCode());orderRecordService.updateByOrderId(orderRecord);}}

调用参与者方法invokeForFeign的逻辑是这样的:

    /*** invoke for feign** @param feignInfo* @param commonInfo*/private void invokeForFeign(String feignInfo, CommonInfo commonInfo) {//step1. feignInfo校验if (StringUtils.isEmpty(feignInfo) || !feignInfo.contains("_")) {LoggerUtil.error(String.format("feignInfo is not available,orderId=", commonInfo.getOrderId()));return;}//step2. 获取feign classClass clazz = null;try {clazz = Class.forName(feignInfo.split("_")[0]);} catch (ClassNotFoundException e) {LoggerUtil.error(String.format("feign class is not found,orderId=", commonInfo.getOrderId()), e);}//step3.调用参与者提交、回滚try {Method method = clazz.getMethod(feignInfo.split("_")[1], new Class[]{CommonInfo.class});method.invoke(ApplicationContextGetBeanHelper.getBean(clazz), commonInfo);} catch (ReflectiveOperationException e) {LoggerUtil.error("tcc schedule invoke error", e);}}

项目结构:

说明:本文还未完善,对分布式事务框架还将继续研究,然后继续更新

手写基于Spring Cloud的TCC分布式事务框架相关推荐

  1. Spring Cloud Alibaba —— Seata 分布式事务框架

    导航 一.Seata 介绍 二.Seata 的工作原理 2.1 三个角色 2.2 工作流程 三.Seata AT 工作机制 3.1 一阶段 3.2 二阶段 四.案例演示(待补充) 一.Seata 介绍 ...

  2. spring cloud 集成 seata 分布式事务

    spring cloud 集成 seata 分布式事务 基于 seata-server 1.6.x 序言 下载 seata-server 准备一个数据库 seata 专门为 seata-server ...

  3. 如何实现一个TCC分布式事务框架

    声明:本文转载自:TCC分布式事务框架的一点思考 关于TCC事务机制的介绍,可以参考TCC事务机制简介. TCC事务模型虽然说起来简单,然而要基于TCC实现一个通用的分布式事务框架,却比它看上去要复杂 ...

  4. tcc分布式事务框架源码解析系列(四)之项目实战

    通过之前的几篇文章我相信您已经搭建好了运行环境,本次的项目实战是依照happylifeplat-tcc-demo项目来演练,也是非常经典的分布式事务场景:支付成功,进行订单状态的更新,扣除用户账户,库 ...

  5. Spring Cloud Alibaba--分布式事务框架Seata

    Seata作用:分布式事务框架Seata用于保证微服务间的事务一致性. Seata中涉及的对象种类:事务管理者TM.资源管理者RM.事务协调者TC. 参考博客:阿里开源的分布式事务框架 Seata

  6. 基于Spring Cloud + MyBatis的分布式架构网约车平台(DD 打车)后端原型系统设计与实现

    资源下载地址:https://download.csdn.net/download/sheziqiong/85638879 资源下载地址:https://download.csdn.net/downl ...

  7. 带你手写基于 Spring 的可插拔式 RPC 框架(二)整体结构

    前言 上一篇文章中我们已经知道了什么是 RPC 框架和为什么要做一个 RPC 框架了,这一章我们来从宏观上分析,怎么来实现一个 RPC 框架,这个框架都有那些模块以及这些模块的作用. 总体设计 在我们 ...

  8. seata xid是什么_使用Seata彻底解决Spring Cloud中的分布式事务问题!

    Seata是Alibaba开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务,本文将通过一个简单的下单业务场景来对其用法进行详细介绍. 什么是分布式事务问题? 单体应用 单体应用 ...

  9. 使用Seata彻底解决Spring Cloud中的分布式事务问题!

    摘要 Seata是Alibaba开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务,本文将通过一个简单的下单业务场景来对其用法进行详细介绍. 什么是分布式事务问题? 单体应用 单 ...

最新文章

  1. python opencv 视频剪辑
  2. 数组nn从外围1递增_最完整的PyTorch数据科学家指南(1)
  3. 创建一个对象时,在一个类当中 静态代码块 和普通代码块构造方法 的顺序?
  4. indesign如何画弧线_彩铅画入门教程,如何给独角兽设计一款好发型
  5. 1024程序员节:给DBA们的福音
  6. 如何解决分布式系统数据事务一致性问题(HBase加Solr)
  7. oracle 新增加控制文件,Oracle增加控制文件副本
  8. 揭秘合伙创业做生意成功的密码?
  9. Pycharm 和 Vs code 字体大小调整(Ctrl + 鼠标滚轮实现)
  10. 【笔试/面试】—— 不使用大于、小于、if 语句,实现 max 宏
  11. vs怎么把文字超链接_怎么拥有自己设计的简单个人网站(超细节)
  12. Hadoop生态系统常用组件导图
  13. 如何在Mac的内置词典中添加和删除单词
  14. 高等数学(第七版)同济大学 习题2-1 个人解答
  15. plt.rcParams[‘font.sans-serif‘] = [‘SimHei‘] 和plt.rcParams[‘axes.unicode_minus‘] = False
  16. 威纶触摸屏485轮询通讯_【威纶】触摸屏 界面制作软件 EBpro使用手册.pdf
  17. 树梅派应用27:通过USB蓝牙适配器连接BLE设备
  18. 如何清除远程桌面连接记录
  19. 安装perf后,执行perf命令报错。
  20. 25 岁,毕业写前端的这三年,多益网络java面试

热门文章

  1. 厉害了!九州云边缘计算管理平台获国家权威认证
  2. 【C/C++】劫持技术
  3. 我肝了两周,用react做了一个俄罗斯方块
  4. UOS系统无线网络手动关闭后无法恢复解决办法
  5. vue给标签动态添加元素_vue页面动态添加元素
  6. carsim和matlab有安装顺序吗,CarSim2017免费版
  7. oracle 数据块修复工具,BBED (Oracle Block Brower and EDitor Tool) :数据块修复工具
  8. 前端学习案例1-brower路由
  9. 戴尔游匣g15 5515笔记本电脑g3060锐龙cpu版装入显卡驱动配置pytorch环境踩坑总结
  10. Internet网际协议---IPv4协议