通过之前的几篇文章我相信您已经搭建好了运行环境,本次的项目实战是依照happylifeplat-tcc-demo项目来演练,也是非常经典的分布式事务场景:支付成功,进行订单状态的更新,扣除用户账户,库存扣减这几个模块来进行tcc分布式事务。话不多说,让我们一起进入体验吧!

  • 首先我们找到 PaymentServiceImpl.makePayment 方法,这是tcc分布式事务的发起者
   //tcc分布式事务的发起者@Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")public void makePayment(Order order) {order.setStatus(OrderStatusEnum.PAYING.getCode());orderMapper.update(order);//扣除用户余额 远端的rpc调用AccountDTO accountDTO = new AccountDTO();accountDTO.setAmount(order.getTotalAmount());accountDTO.setUserId(order.getUserId());accountService.payment(accountDTO);//进入扣减库存操作 远端的rpc调用InventoryDTO inventoryDTO = new InventoryDTO();inventoryDTO.setCount(order.getCount());inventoryDTO.setProductId(order.getProductId());inventoryService.decrease(inventoryDTO);}//更新订单的confirm方法public void confirmOrderStatus(Order order) {order.setStatus(OrderStatusEnum.PAY_SUCCESS.getCode());orderMapper.update(order);LOGGER.info("=========进行订单confirm操作完成================");}//更新订单的cancel方法public void cancelOrderStatus(Order order) {order.setStatus(OrderStatusEnum.PAY_FAIL.getCode());orderMapper.update(order);LOGGER.info("=========进行订单cancel操作完成================");}复制代码
  • makePayment 方法是tcc分布式事务的发起者,它里面有更新订单状态(本地),进行库存扣减(rpc扣减),资金扣减(rpc调用)

  • confirmOrderStatus 为订单状态confirm方法,cancelOrderStatus 为订单状态cancel方法。

  • 我们重点关注 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus") 这个注解,为什么加了这个注解就有神奇的作用。

@Tcc注解切面

  • 我们找到在happylifeplat-tcc-core包中找到
    TccTransactionAspect,这里定义@Tcc的切点。
@Aspect
public abstract class TccTransactionAspect {private TccTransactionInterceptor tccTransactionInterceptor;public void setTccTransactionInterceptor(TccTransactionInterceptor tccTransactionInterceptor) {this.tccTransactionInterceptor = tccTransactionInterceptor;}@Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")public void txTransactionInterceptor() {}@Around("txTransactionInterceptor()")public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {return tccTransactionInterceptor.interceptor(proceedingJoinPoint);}public abstract int getOrder();
}复制代码
  • 我们可以知道Spring实现类的方法凡是加了@Tcc注解的,在调用的时候,都会进行 tccTransactionInterceptor.interceptor 调用。

  • 其次我们发现该类是一个抽象类,肯定会有其他类继承它,你猜的没错,对应dubbo用户,他的继承类为:

@Aspect
@Component
public class DubboTccTransactionAspect extends TccTransactionAspect implements Ordered {@Autowiredpublic DubboTccTransactionAspect(DubboTccTransactionInterceptor dubboTccTransactionInterceptor) {super.setTccTransactionInterceptor(dubboTccTransactionInterceptor);}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE;}
}复制代码
  • springcloud的用户,它的继承类为:
@Aspect
@Component
public class SpringCloudTxTransactionAspect extends TccTransactionAspect implements Ordered {@Autowiredpublic SpringCloudTxTransactionAspect(SpringCloudTxTransactionInterceptor  springCloudTxTransactionInterceptor) {this.setTccTransactionInterceptor(springCloudTxTransactionInterceptor);}public void init() {}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE;}
}复制代码
  • 我们注意到他们都实现了Spring的Ordered接口,并重写了 getOrder 方法,都返回了 Ordered.HIGHEST_PRECEDENCE 那么可以知道,他是优先级最高的切面。

TccCoordinatorMethodAspect 切面

@Aspect
@Component
public class TccCoordinatorMethodAspect implements Ordered {private final TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor;@Autowiredpublic TccCoordinatorMethodAspect(TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor) {this.tccCoordinatorMethodInterceptor = tccCoordinatorMethodInterceptor;}@Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")public void coordinatorMethod() {}@Around("coordinatorMethod()")public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {return tccCoordinatorMethodInterceptor.interceptor(proceedingJoinPoint);}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE + 1;}
}复制代码
  • 该切面是第二优先级的切面,意思就是当我们对有@Tcc注解的方法发起调用的时候,首先会进入 TccTransactionAspect 切面,然后再进入 TccCoordinatorMethodAspect 切面。
    该切面主要是用来获取@Tcc注解上的元数据,比如confrim方法名称等等。

  • 到这里如果大家基本能明白的话,差不多就了解了整个框架原理,现在让我们来跟踪调用吧。

现在我们回过头,我们来调用 PaymentServiceImpl.makePayment 方法

  • dubbo用户,我们会进入如下拦截器:

@Component
public class DubboTccTransactionInterceptor implements TccTransactionInterceptor {private final TccTransactionAspectService tccTransactionAspectService;@Autowiredpublic DubboTccTransactionInterceptor(TccTransactionAspectService tccTransactionAspectService) {this.tccTransactionAspectService = tccTransactionAspectService;}@Overridepublic Object interceptor(ProceedingJoinPoint pjp) throws Throwable {final String context = RpcContext.getContext().getAttachment(Constant.TCC_TRANSACTION_CONTEXT);TccTransactionContext tccTransactionContext = null;if (StringUtils.isNoneBlank(context)) {tccTransactionContext =GsonUtils.getInstance().fromJson(context, TccTransactionContext.class);}return tccTransactionAspectService.invoke(tccTransactionContext, pjp);}
}复制代码
  • 我们继续跟踪 tccTransactionAspectService.invoke(tccTransactionContext, pjp) ,发现通过判断过后,我们会进入 StartTccTransactionHandler.handler 方法,我们已经进入了分布式事务处理的入口了!
/*** 分布式事务处理接口** @param point   point 切点* @param context 信息* @return Object* @throws Throwable 异常*/@Overridepublic Object handler(ProceedingJoinPoint point, TccTransactionContext context) throws Throwable {Object returnValue;try {//开启分布式事务tccTransactionManager.begin();try {//发起调用 执行try方法returnValue = point.proceed();} catch (Throwable throwable) {//异常执行canceltccTransactionManager.cancel();throw throwable;}//try成功执行confirm confirm 失败的话,那就只能走本地补偿tccTransactionManager.confirm();} finally {tccTransactionManager.remove();}return returnValue;}复制代码
  • 我们进行跟进 tccTransactionManager.begin() 方法:
/*** 该方法为发起方第一次调用* 也是tcc事务的入口*/void begin() {LogUtil.debug(LOGGER, () -> "开始执行tcc事务!start");TccTransaction tccTransaction = CURRENT.get();if (Objects.isNull(tccTransaction)) {tccTransaction = new TccTransaction();tccTransaction.setStatus(TccActionEnum.TRYING.getCode());tccTransaction.setRole(TccRoleEnum.START.getCode());}//保存当前事务信息coordinatorCommand.execute(new CoordinatorAction(CoordinatorActionEnum.SAVE, tccTransaction));CURRENT.set(tccTransaction);//设置tcc事务上下文,这个类会传递给远端TccTransactionContext context = new TccTransactionContext();context.setAction(TccActionEnum.TRYING.getCode());//设置执行动作为trycontext.setTransId(tccTransaction.getTransId());//设置事务idTransactionContextLocal.getInstance().set(context);}复制代码
  • 这里我们保存了事务信息,并且开启了事务上下文,并把它保存在了ThreadLoacl里面,大家想想这里为什么一定要保存在ThreadLocal里面。

  • begin方法执行完后,我们回到切面,现在我们来执行 point.proceed(),当执行这一句代码的时候,会进入第二个切面,即进入了 TccCoordinatorMethodInterceptor


public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {final TccTransaction currentTransaction = tccTransactionManager.getCurrentTransaction();if (Objects.nonNull(currentTransaction)) {final TccActionEnum action = TccActionEnum.acquireByCode(currentTransaction.getStatus());switch (action) {case TRYING:registerParticipant(pjp, currentTransaction.getTransId());break;case CONFIRMING:break;case CANCELING:break;}}return pjp.proceed(pjp.getArgs());}复制代码
  • 这里由于是在try阶段,直接就进入了 registerParticipant 方法
private void registerParticipant(ProceedingJoinPoint point, String transId) throws NoSuchMethodException {MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();Class<?> clazz = point.getTarget().getClass();Object[] args = point.getArgs();final Tcc tcc = method.getAnnotation(Tcc.class);//获取协调方法String confirmMethodName = tcc.confirmMethod();/* if (StringUtils.isBlank(confirmMethodName)) {confirmMethodName = method.getName();}*/String cancelMethodName = tcc.cancelMethod();/* if (StringUtils.isBlank(cancelMethodName)) {cancelMethodName = method.getName();}
*///设置模式final TccPatternEnum pattern = tcc.pattern();tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());TccInvocation confirmInvocation = null;if (StringUtils.isNoneBlank(confirmMethodName)) {confirmInvocation = new TccInvocation(clazz,confirmMethodName, method.getParameterTypes(), args);}TccInvocation cancelInvocation = null;if (StringUtils.isNoneBlank(cancelMethodName)) {cancelInvocation = new TccInvocation(clazz,cancelMethodName,method.getParameterTypes(), args);}//封装调用点final Participant participant = new Participant(transId,confirmInvocation,cancelInvocation);tccTransactionManager.enlistParticipant(participant);}复制代码
  • 这里就获取了 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
    的信息,并把他封装成了 Participant,存起来,然后真正的调用 PaymentServiceImpl.makePayment 业务方法,在业务方法里面,我们首先执行的是更新订单状态(相信现在你还记得0.0):
       order.setStatus(OrderStatusEnum.PAYING.getCode());orderMapper.update(order);复制代码
  • 接下来,我们进行扣除用户余额,注意扣除余额这里是一个rpc方法:
       AccountDTO accountDTO = new AccountDTO();accountDTO.setAmount(order.getTotalAmount());accountDTO.setUserId(order.getUserId());accountService.payment(accountDTO)复制代码
  • 现在我们来关注 accountService.payment(accountDTO) ,这个接口的定义。

dubbo接口:


public interface AccountService {/*** 扣款支付** @param accountDTO 参数dto* @return true*/@Tccboolean payment(AccountDTO accountDTO);
}复制代码

springcloud接口

@FeignClient(value = "account-service", configuration = MyConfiguration.class)
public interface AccountClient {@PostMapping("/account-service/account/payment")@TccBoolean payment(@RequestBody AccountDTO accountDO);}复制代码

很明显这里我们都在接口上加了@Tcc注解,我们知道springAop的特性,在接口上加注解,是无法进入切面的,所以我们在这里,要采用rpc框架的某些特性来帮助我们获取到 @Tcc注解信息。 这一步很重要。当我们发起

accountService.payment(accountDTO) 调用的时候:

  • dubbo用户,会走dubbo的filter接口,TccTransactionFilter:
private void registerParticipant(Class clazz, String methodName, Object[] arguments, Class... args) throws TccRuntimeException {try {Method method = clazz.getDeclaredMethod(methodName, args);Tcc tcc = method.getAnnotation(Tcc.class);if (Objects.nonNull(tcc)) {//获取事务的上下文final TccTransactionContext tccTransactionContext =TransactionContextLocal.getInstance().get();if (Objects.nonNull(tccTransactionContext)) {//dubbo rpc传参数RpcContext.getContext().setAttachment(Constant.TCC_TRANSACTION_CONTEXT,GsonUtils.getInstance().toJson(tccTransactionContext));}if (Objects.nonNull(tccTransactionContext)) {if(TccActionEnum.TRYING.getCode()==tccTransactionContext.getAction()){//获取协调方法String confirmMethodName = tcc.confirmMethod();if (StringUtils.isBlank(confirmMethodName)) {confirmMethodName = method.getName();}String cancelMethodName = tcc.cancelMethod();if (StringUtils.isBlank(cancelMethodName)) {cancelMethodName = method.getName();}//设置模式final TccPatternEnum pattern = tcc.pattern();tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());TccInvocation confirmInvocation = new TccInvocation(clazz,confirmMethodName,args, arguments);TccInvocation cancelInvocation = new TccInvocation(clazz,cancelMethodName,args, arguments);//封装调用点final Participant participant = new Participant(tccTransactionContext.getTransId(),confirmInvocation,cancelInvocation);tccTransactionManager.enlistParticipant(participant);}}}} catch (NoSuchMethodException e) {throw new TccRuntimeException("not fount method " + e.getMessage());}}复制代码
  • springcloud 用户,则会进入 TccFeignHandler
public class TccFeignHandler implements InvocationHandler {/*** logger*/private static final Logger LOGGER = LoggerFactory.getLogger(TccFeignHandler.class);private Target<?> target;private Map<Method, MethodHandler> handlers;public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (Object.class.equals(method.getDeclaringClass())) {return method.invoke(this, args);} else {final Tcc tcc = method.getAnnotation(Tcc.class);if (Objects.isNull(tcc)) {return this.handlers.get(method).invoke(args);}final TccTransactionContext tccTransactionContext =TransactionContextLocal.getInstance().get();if (Objects.nonNull(tccTransactionContext)) {final TccTransactionManager tccTransactionManager =SpringBeanUtils.getInstance().getBean(TccTransactionManager.class);if (TccActionEnum.TRYING.getCode() == tccTransactionContext.getAction()) {//获取协调方法String confirmMethodName = tcc.confirmMethod();if (StringUtils.isBlank(confirmMethodName)) {confirmMethodName = method.getName();}String cancelMethodName = tcc.cancelMethod();if (StringUtils.isBlank(cancelMethodName)) {cancelMethodName = method.getName();}//设置模式final TccPatternEnum pattern = tcc.pattern();tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());final Class<?> declaringClass = method.getDeclaringClass();TccInvocation confirmInvocation = new TccInvocation(declaringClass,confirmMethodName,method.getParameterTypes(), args);TccInvocation cancelInvocation = new TccInvocation(declaringClass,cancelMethodName,method.getParameterTypes(), args);//封装调用点final Participant participant = new Participant(tccTransactionContext.getTransId(),confirmInvocation,cancelInvocation);tccTransactionManager.enlistParticipant(participant);}}return this.handlers.get(method).invoke(args);}}public void setTarget(Target<?> target) {this.target = target;}public void setHandlers(Map<Method, MethodHandler> handlers) {this.handlers = handlers;}}复制代码

重要提示 在这里我们获取了在第一步设置的事务上下文,然后进行dubbo的rpc传参数,细心的你可能会发现,这里获取远端confirm,cancel方法其实就是自己本身啊?,那发起者还怎么来调用远端的confrim,cancel方法呢?这里的调用,我们通过事务上下文的状态来控制,比如在confrim阶段,我们就调用confrim方法等。。

到这里我们就完成了整个消费端的调用,下一篇分析提供者的调用流程!

转载于:https://juejin.im/post/59e080126fb9a0450f20f408

tcc分布式事务框架源码解析系列(四)之项目实战相关推荐

  1. Myth源码解析系列之二-项目结构介绍

    上一篇我们了解了myth是什么及能做什么,下面我们来了解下项目主体结构吧 项目工程结构图 项目工程详解 myth-annotation myth分布式事务框架注解(如 @myth注解),业务层主要通过 ...

  2. Android 常用开源框架源码解析 系列 (四)Glide

    一.定义  Glide 一个被google所推荐的图片加载库,作者是bumptech.对Android SDk 最低要求是 API 10  与之功能类似的是Square公司的picasso  二.基本 ...

  3. Android 常用开源框架源码解析 系列 (九)dagger2 呆哥兔 依赖注入库

    一.前言 依赖注入定义 目标类中所依赖的其他的类的初始化过程,不是通过手动编码的方式创建的. 是将其他的类已经初始化好的实例自动注入的目标类中. "依赖注入"也是面向对象编程的 设 ...

  4. 分布式事务Seata源码解析十:AT模式回滚日志undo log详细构建过程

    文章目录 一.前言 二.准备undo log 0.undo log 样例 1)undo log表结构 2)rollback_info(回滚日志数据) 1.before image的构建 1)业务表元数 ...

  5. Myth源码解析系列之九-总结

    本人并非开源产品作者,但深受作者感染,我们都知道开源作品实属不易, 想想 当人家在玩农药时,而你在码代码, 当人家在陪女朋友约会时,你还是在码代码 ~ ~ 这不仅是需要用心投入更多的是需要消耗和牺牲很 ...

  6. java 并发框架源码_Java并发编程高阶技术-高性能并发框架源码解析与实战

    Java并发编程高阶技术-高性能并发框架源码解析与实战 1 _0 Z' @+ l: s3 f6 r% t|____资料3 Z9 P- I2 x8 T6 ^ |____coding-275-master ...

  7. openGauss数据库源码解析系列文章--openGauss简介(一)

    openGauss数据库是华为深度融合在数据库领域多年经验,结合企业级场景要求推出的新一代企业级开源数据库.此前,Gauss松鼠会已经发布了openGauss数据库核心技术系列文章,介绍了openGa ...

  8. Redux 源码解析系列(一) -- Redux的实现思想

    文章来源: IMweb前端社区 黄qiong(imweb.io) IMweb团队正在招聘啦,简历发至jayccchen@tencent.com Redux 其实是用来帮我们管理状态的一个框架,它暴露给 ...

  9. TiKV 源码解析系列文章(二)raft-rs proposal 示例情景分析

    作者:屈鹏 本文为 TiKV 源码解析系列的第二篇,按照计划首先将为大家介绍 TiKV 依赖的周边库 raft-rs .raft-rs 是 Raft 算法的 Rust 语言实现.Raft 是分布式领域 ...

最新文章

  1. HTML5十五大新特性
  2. 【OpenCV 4开发详解】视频加载与摄像头调用
  3. Java通过Maven使用RoaringBitmap
  4. jQuery循环使用相同类的元素
  5. 原生js实现Object.assign和Object.create
  6. 1.6 多项式回归-机器学习笔记-斯坦福吴恩达教授
  7. 【转】求一个类的sizeof应考虑的问题
  8. java 当一个文本框有值时另一个文本框置灰_【农行DevOps进行时】基于PaaS的持续集成/持续交付实践 | IDCF...
  9. 深入开展计算机设备保密检查,江西安远县检察院深入开展网络安全保密自查工作...
  10. java综合案例_综合实例 - Java House - BlogJava
  11. 内存总是不够?HBaseGeoMesa配置优化了解一下
  12. 从python入门到放弃_《Python3从入门到放弃》视频教程
  13. git add 所有修改文件_Git 技术干货!工作中quot;Gitquot;的使用实践和常用命令合集!
  14. Linux防火墙端口设置和mysql端口开放的navicat整合
  15. java中类模型_java中的数据模型类
  16. Leetcode---1818绝对差值和
  17. q-flashplus怎么使用_技嘉主板使用Q-FLASH刷BIOS详解
  18. java 存根_存根键值存储
  19. flutter doctor --android-licenses后Android sdkmanager not found.
  20. flutter 获取定位_Flutter 获取定位

热门文章

  1. linux postgresql 恢复数据库,PostgreSQL数据库备份和恢复
  2. redis 中一个字段 修改map_Redis中bitmap的妙用
  3. html 实现表格控制器,在html动态表格中将数据发布到带有ajax的控制器
  4. idea 生成自己项目 API (跟jdk api 一样的界面) 香吧
  5. php h5视频录制上传,基于koa的h5视频录制异步上传
  6. 学计算机的学期计划书,学习计划表
  7. 批处理定时执行任务_[Abaqus tips ] 分析任务的定时执行
  8. linux克隆后没有eth0,解决linux机器克隆后eth0不见的问题
  9. android 存储空间监控,浅谈 Android 内存监控(中)
  10. php redis 队列抢红包_redis 队列操作的例子(php)