概览

首先我们通过@GlobalTransactional这个注解开启一个全局事务,而GlobalTransactionScanner.wrapIfNecessary()会为所有方法上加了这个注解的bean注入一个包装了GlobalTransactionalInterceptor实例的advisor,然后返回一个代理对象。GlobalTransactionalInterceptor会在该bean的方法调用前进行拦截,判断是否开启全局事务

上源码,关键位置我打了注释

@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {//判断全局事务是否启用if (disableGlobalTransaction) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxyif (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));} else {Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//判断bean里的方法上有没有@GlobalTransactional注解if (!existsAnnotation(new Class[]{serviceInterface})&& !existsAnnotation(interfacesIfJdk)) {return bean;}if (interceptor == null) {//初始化Interceptor,后面会注入代理对象interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener) interceptor);}}//判断当前bean是否已被aop代理过,比如说方法上加了@Transactional就会被spring代理//如果没有被代理,调用父类的模板方法进行代理,advisor通过被重写的//getAdvicesAndAdvisorsForBean返回上面的interceptor进行包装if (!AopUtils.isAopProxy(bean)) {bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);//把GlobalTransactionalInterceptor包装成advisorAdvisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));for (Advisor avr : advisor) {advised.addAdvisor(0, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}
}@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName,          TargetSource customTargetSource) throws BeansException {//返回interceptor[]return new Object[]{interceptor};
}@Overridepublic void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}return;}//使用netty初始化seata client,建立到server端的连接initClient();}

这里可以看出来,只要我们在方法上加了@GlobalTranscational注解,对应的bean就会被seata进行代理,同时重写了afterPropertiesSet,在bean初始化完毕后会进行调用,这个client就是用来跟server端通信的,包括后面会说到的下游服务的事务提交与回滚都与这个有关

GlobalTransactionalInterceptor重写了MethodInterceptor的invoke()方法,在spring执行通知代理对象的通知方法时,最终会调用到这个invoke()

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {Class<?> targetClass = methodInvocation.getThis() != null ?AopUtils.getTargetClass(methodInvocation.getThis()) : null;Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);//如果全局事务可用并且方法上加了@GlobalTransactional注解if (!disable && globalTransactionalAnnotation != null) {//处理全局事务return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);} else if (!disable && globalLockAnnotation != null) {return handleGlobalLock(methodInvocation);} else {return methodInvocation.proceed();}}

这里就会然后调用handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);

private Object handleGlobalTransaction(final MethodInvocation methodInvocation,final GlobalTransactional globalTrxAnno) throws Throwable {try {return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {return methodInvocation.proceed();}public String name() {String name = globalTrxAnno.name();if (!StringUtils.isNullOrEmpty(name)) {return name;}return formatMethod(methodInvocation.getMethod());}@Overridepublic TransactionInfo getTransactionInfo() {TransactionInfo transactionInfo = new TransactionInfo();transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());transactionInfo.setName(name());transactionInfo.setPropagation(globalTrxAnno.propagation());Set<RollbackRule> rollbackRules = new LinkedHashSet<>();for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : globalTrxAnno.rollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : globalTrxAnno.noRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {TransactionalExecutor.Code code = e.getCode();switch (code) {case RollbackDone:throw e.getOriginalException();case BeginFailure:failureHandler.onBeginFailure(e.getTransaction(), e.getCause());throw e.getCause();case CommitFailure:failureHandler.onCommitFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackFailure:failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackRetrying:failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause());throw e.getCause();default:throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));}}}

这里transactionalTemplate.execute()传入了一个匿名实现,其execute()就是放行让后续的通知方法继续执行,这个我们不关心,进入transactionalTemplate.execute()

public Object execute(TransactionalExecutor business) throws Throwable {// 1 get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 get or create a transactionGlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();// 1.2 Handle the Transaction propatation and the branchTypePropagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {case NOT_SUPPORTED:suspendedResourcesHolder = tx.suspend(true);return business.execute();case REQUIRES_NEW:suspendedResourcesHolder = tx.suspend(true);break;case SUPPORTS:if (!existingTransaction()) {return business.execute();}break;case REQUIRED:break;case NEVER:if (existingTransaction()) {throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s",RootContext.getXID()));} else {return business.execute();}case MANDATORY:if (!existingTransaction()) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}try {// 2. begin transactionbeginTransaction(txInfo, tx);Object rs = null;try {// Do Your Businessrs = business.execute();} catch (Throwable ex) {// 3.the needed business exception to rollback.completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.commitTransaction(tx);return rs;} finally {//5. cleartriggerAfterCompletion();cleanUp();}} finally {tx.resume(suspendedResourcesHolder);}}

这里首先初始化一个GlobalTransaction实例tx,用于保存后续生成的xid跟事务状态等一些属性。然后对事务的传播属性做了些校验。然后我们进入beginTransaction(txInfo, tx);顾名思义,这里快要到核心了

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//执行hook的begin()方法做一些额外处理triggerBeforeBegin();tx.begin(txInfo.getTimeOut(), txInfo.getName());triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}}

这里的trigger方法执行我们通过TransactionHookManager.registerHook()注册的一些hook方法,如果我们要在事务开始前后做一些事情,就可以通过这种方式。

进入tx.begin()

    @Overridepublic void begin(int timeout, String name) throws TransactionException {if (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}assertXIDNull();if (RootContext.getXID() != null) {throw new IllegalStateException();}//开启事务并拿到xidxid = transactionManager.begin(null, null, name, timeout);//设置事务状态status = GlobalStatus.Begin;//xid跟当前线程做全局绑定RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}}

继续到 transactionManager.begin

    @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);//通知seata-server开启全局事务,并拿到全局事务id(xid)GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}return response.getXid();}

最后我们就开启了一个全局事务,那么我们的xid是怎么向下游传递的呢,看看对feign的集成是怎么做的?SeataFeignClient.execute

    @Overridepublic Response execute(Request request, Request.Options options) throws IOException {//设置xidRequest modifiedRequest = getModifyRequest(request);//调用下游服务return this.delegate.execute(modifiedRequest, options);}private Request getModifyRequest(Request request) {String xid = RootContext.getXID();if (StringUtils.isEmpty(xid)) {return request;}Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);//设置xid到消息头headers.putAll(request.headers());List<String> seataXid = new ArrayList<>();seataXid.add(xid);headers.put(RootContext.KEY_XID, seataXid);return Request.create(request.method(), request.url(), headers, request.body(),request.charset());}

这里用SeataFeignClient替换了默认的feignClient,把xid放到了requestHeader里。那么下游又是怎么拿的呢?SeataHandlerInterceptor.preHandle()

       @Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response,Object handler) {String xid = RootContext.getXID();//从消息头中获取xidString rpcXid = request.getHeader(RootContext.KEY_XID);if (log.isDebugEnabled()) {log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);}if (xid == null && rpcXid != null) {RootContext.bind(rpcXid);if (log.isDebugEnabled()) {log.debug("bind {} to RootContext", rpcXid);}}return true;}

这里SeataHandlerInterceptor实现了HandlerInterceptor,springMVC会在Controller方法调用之前拿到所有注册到容器中的拦截器链去执行其preHandle()方法,具体可参考DispatcherServlet.doDispatch()。

好了,到这里就大概把seata的全局事务的开启以及xid的传递捋了一遍,后面会聊聊seata跟hystrix做集成时常见的一些坑

seata源码分析之全局事务的开启跟xid的传递相关推荐

  1. 阿里开源一站式分布式事务框架seata源码分析(AT模式下TM与RM分析)

    序言: 对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍.本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo). seata中一个事 ...

  2. Seata 源码分析 - tm、rm 中 xid 传递过程

    一.Seata 前面文章讲解了对 Seata 的 AT 和 TCC 模式的使用,本篇文章为大家讲解下 Seata 中 TM.RM 中 xid 传递过程,如果不了解 Seata 中的 xid,可以理解为 ...

  3. Seata源码分析之TransactionManager(一)

    目录 一.用户开启事务示例 1.GlobalTransaction的api方式 2.@GlobalTransaction注解方式 二.TransactionManager 三.DefaultTrans ...

  4. java开启一个线程_【jdk源码分析】java多线程开启的三种方式

    1.继承Thread类,新建一个当前类对象,并且运行其start()方法 1 packagecom.xiaostudy.thread;2 3 /** 4 * @desc 第一种开启线程的方式5 *@a ...

  5. Bochs源码分析 - 28:bochs开启x2apic与SMP编译说明

    前言 我们现在来根据<x86/x64体系探索及编程>的第十七章分析apic结构,第一个实验就是检测local apic与x2apic,但是现在bochs最新版本配置文件有些变动,需要添加有 ...

  6. srsLTE 源码分析 UE_08 随机接入 之开启前的准备

    前言 通过前面的文章,读者应该对PLMN的搜索和选择有了一定的了解,接下来的几篇文章,我们会对随机接入的代码和流程进行分析.随机接入是MAC层的一项重要的任务,主要目的是实现UE和基站的上行同步,它从 ...

  7. 开启mybatis日志_Mybatis源码分析之Cache二级缓存原理 (五)

    一:Cache类的介绍 讲解缓存之前我们需要先了解一下Cache接口以及实现MyBatis定义了一个org.apache.ibatis.cache.Cache接口作为其Cache提供者的SPI(Ser ...

  8. Linux内核 eBPF基础:ftrace源码分析:过滤函数和开启追踪

    Linux内核 eBPF基础 ftrace基础:过滤函数和开启追踪 荣涛 2021年5月12日 本文相关注释代码:https://github.com/Rtoax/linux-5.10.13 上篇文章 ...

  9. 源码 状态机_阿里中间件seata源码剖析七:saga模式实现

    saga模式是分布式事务中使用比较多的一种模式,他主要应用在长流程的服务,对一个全局事务,如果某个节点抛出了异常,则从这个节点往前依次回滚或补偿事务.今天我们就来看看它的源码实现. 状态机初始化 在之 ...

最新文章

  1. Linux 下查看文件的命令介绍
  2. C++中多态性学习(上)
  3. 解决类似/usr/lib64/libstdc++.so.6:version `GLIBCXX_3.4.21` not found的问题
  4. SecureCRT远程登录ubuntu
  5. 苹果发布App“一年之最”:快手短视频广东播放最多 山东原创第一
  6. 发动机性能测试软件,发动机的性能测试方法
  7. sudoers修改_为用户增加sudo权限(修改sudoers文件) | 学步园
  8. SpringCloud 配置服务器
  9. 终极分类器(识别器),一个人工智能的美好愿景
  10. 谷歌推出全能扒谱AI:只要听一遍歌曲,钢琴小提琴的乐谱全有了
  11. 土木想往土木软件开发方向发展,应该如何准备
  12. iOS蓝牙连接打印机,打印小票
  13. GCN在交通流预测方面的相关文章
  14. linux数据库删除命令大全,linux删除数据库命令
  15. 水星d128路由器虚拟服务器,幻影D128路由器怎么设置?
  16. html5在线编辑器效果和源码
  17. 按键精灵2014如何插入循环语句--win10专业版
  18. MPU6050加速度传感器学习笔记之实验《获取原始数据》
  19. js第13天(事件绑定方式)
  20. [ZT]系统学习Linux的11点建议

热门文章

  1. Altium Designer20 出现Failed to add class member:xxx 和 Unknown Pin 错误解决办法
  2. system verilog $clog2的使用
  3. 客户端SDK开发使用手册 概述
  4. 解决win10下浏览器提示“浏览器已对此页面进行了修改,以帮助阻止跨站脚本”问题
  5. P2P分布式网络简史
  6. ONNX-Simpler报错:Graph must be in single static assignment (SSA) form
  7. 入职前端工程师你需要学会什么?前端实习生告诉你十大必备技能
  8. 树莓派驱动MG996R
  9. Python最好的Excel第三方库——xlwings快速上手
  10. 我国工业互联网平台建设面临四大瓶颈