文章目录

  • 第一阶段
    • 1. 扫描@GlobalTransactional注解,获取全局事务XID
    • 2. TC生成全局事务XID,记录入库
    • 3. 执行业务逻辑,提交本地事务,记录branch_table、undo_log、全局锁loak_table
  • 第二阶段:如果无异常
    • 4. TM向TC发起全局事务提交请求
    • 5. TC异步删除全局事务、分支事务、释放全局锁,并通知RM删除undo_log
  • 第二阶段:如果有异常
    • 6. TM向TC发起全局事务回滚请求
    • 6. TC异步删除global_table、branch_table,RM端回滚数据并删除undo_log
  • 问题:Seata全局锁的作用是什么?

SeataAT模式的核心是对业务无侵入,是一种改进后的两阶段提交,其设计思路如下:

  • 第一阶段:RM端提交本地事务,生成undo日志表,释放本地锁和连接资源。并向TC注册分支事务,通过全局事务的 XID 进行关联。
  • 第二阶段:完全异步
    • 分布式事务操作成功,则TC通知RM异步删除undo日志表
    • 分布式事务操作失败,则TMTC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XIDBranch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。

接下来结合源码分析SeataAT模式下,两阶段提交的原理

第一阶段

1. 扫描@GlobalTransactional注解,获取全局事务XID

从这一篇文章中,我们已经知道了分布式事务Seata如何使用,只要加上@GlobalTransactional即可开启一个全局事务,保证各服务的数据一致性!那么这个注解是如何生效的呢?

我们在使用seata时,会引入seata的依赖,而熟悉springboot的同学就会明白,在引入一个第三方依赖时,一般会有一个XxxAutoConfiguration类用来集成第三方插件,实现自动配置!Seata也不例外!全文搜索SeataAutoConfiguration,进入Seata的逻辑入口!

@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class SeataAutoConfiguration {private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);//失败处理类@Bean(BEAN_NAME_FAILURE_HANDLER)@ConditionalOnMissingBean(FailureHandler.class)public FailureHandler failureHandler() {return new DefaultFailureHandlerImpl();}//全局事务扫描器@Bean@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})@ConditionalOnMissingBean(GlobalTransactionScanner.class)public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Automatically configure Seata");}return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);}
}

可以看到上边的SeataAutoConfiguration注册了一个全局事务扫描器GlobalTransactionScanner,用来扫描@GlobalTransactional注解,而这个扫描器不是单纯的类,主要有以下两个功能!

  • GlobalTransactionScanner实现了InitializingBean接口。

    • 实现InitializingBean接口的类在初始化完毕后必然会执行afterPropertiesSet方法。 GlobalTransactionScanner类在afterPropertiesSet方法中初始化了RM、TM的客户端
        private void initClient() {if (LOGGER.isInfoEnabled()) {LOGGER.info("Initializing Global Transaction Clients ... ");}if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));}//初始化 TM 客户端TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);if (LOGGER.isInfoEnabled()) {LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}//初始化 RM 客户端RMClient.init(applicationId, txServiceGroup);if (LOGGER.isInfoEnabled()) {LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Global Transaction Clients are initialized. ");}registerSpringShutdownHook();}
    
  • GlobalTransactionScanner继承了AbstractAutoProxyCreator类。

    • 看到继承了AbstractAutoProxyCreator类,说明与AOP有关。 Spring的bean对象初始化完毕后,会先检查bean是否被代理过(如果有循环依赖,则在bean实例化之后创建),如果没有,调用AbstractAutoProxyCreator类的 wrapIfNecessary 方法进行代理
     //bean的后置处理器@Overridepublic Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {if (bean != null) {Object cacheKey = getCacheKey(bean.getClass(), beanName);//先检查bean是否在解决循环依赖时代理过了if (this.earlyProxyReferences.remove(cacheKey) != bean) {//如果没有,调用AbstractAutoProxyCreator的 wrapIfNecessary 方法进行代理return wrapIfNecessary(bean, beanName, cacheKey);}}return bean;}
    

主业务逻辑就藏在 wrapIfNecessary 方法中

  • 在此方法中会生成一个全局事务拦截器globalTransactionalInterceptor,然后调用拦截器内部的invoke()方法扫描@GlobalTransactional注解、处理全局事务!invoke()方法如下:

        @Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {Class<?> targetClass =methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//获取到所有带有 @GlobalTransactional 注解的方法final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);if (!localDisable) {//如果存在带有 @GlobalTransactional 注解的方法if (globalTransactionalAnnotation != null) {//则开始处理全局事务!return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);}}}return methodInvocation.proceed();}

开始处理全局事务的方法是handleGlobalTransaction,进入该方法!

    Object handleGlobalTransaction(final MethodInvocation methodInvocation,final GlobalTransactional globalTrxAnno) throws Throwable {boolean succeed = true;//模板方法开始处理全局事务return transactionalTemplate.execute(new TransactionalExecutor() {}....... //省略代码!
    public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.GlobalTransaction tx = GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.Propagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {....... //省略处理传播行为的逻辑try {// 1.开启全局事务!beginTransaction(txInfo, tx);Object rs;try {// 2.进入业务代码,执行业务逻辑rs = business.execute();} catch (Throwable ex) {// 3. 出现了业务异常进行回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. 当所有分支事务无异常,提交全局事务commitTransaction(tx);return rs;} finally {//5. clearresumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {tx.resume(suspendedResourcesHolder);}}}

可以看到execute方法中,囊括了SeataAT模式下处理分布式事务的逻辑:

  • beginTransaction(txInfo, tx):获取全局事务XIDTC服务器把全局事务信息插入glable_table表中
  • business.execute():进入业务代码,执行业务逻辑
  • commitTransaction(tx): 当所有分支事务无异常,提交全局事务

首先来看一下,开启全局事务方法beginTransaction,进入beginTransaction方法内部的tx.begin方法,查看真正的开启事务逻辑

    @Overridepublic void begin(int timeout, String name) throws TransactionException {if (role != GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}assertXIDNull();//先获取一次全局事务ID : XIDString currentXid = RootContext.getXID();//如果存在的话,抛异常,应先去事务协调者TC中获取if (currentXid != null) {throw new IllegalStateException("Global transaction already exists," +" can't begin a new global transaction, currentXid = " + currentXid);}//从事务协调者TC中和获取全局事务ID : XIDxid = transactionManager.begin(null, null, name, timeout);status = GlobalStatus.Begin;//与容器绑定RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}}

其中transactionManager.begin方法就是从事务协调者TC中和获取全局事务ID : XID,由于与TC交互属于RPC通信,所以获取XID步骤如下

  • 构建request请求
  • 以同步的方式向TC发起RPC调用,调用方式采用nety,并从响应结果中获取全局事务XID
    @Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {//构建request请求GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);//同步发起rpc调用GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}//从响应结果中获取XIDreturn response.getXid();}

2. TC生成全局事务XID,记录入库

事务协调者TC在接受到事务管理器TM发来的获取XID的请求后,主要做两件事情

  • 生成全局事务XID,格式为:ip:port:随机数
  • dbfile或者redis 存储全局事务ID,这也对应seata的三种存储模式!

TC端源码位置:io.seata.server.coordinator.DefaultCoordinator # doGlobalBegin

    @Overrideprotected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {//begin方法是真正的TC端的处理逻辑response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),request.getTransactionName(), request.getTimeout()));if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}}


更细的代码就不贴了,源码入口为:DefaultCoordinator # doGlobalBegin(),细节代码逻辑如下 :


TC端上述流程执行完毕后,就会向数据库插入一条记录,保存全局事务ID,如下所示!

3. 执行业务逻辑,提交本地事务,记录branch_table、undo_log、全局锁loak_table

Seata处理全局事务的核心逻辑如下,这段代码摘自上文第1小节的transactionalTemplate.execute方法中

      try {// 1.开启全局事务!beginTransaction(txInfo, tx);Object rs;try {// 2.进入业务代码,执行业务逻辑rs = business.execute();} catch (Throwable ex) {// 3. 出现了业务异常进行回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. 当所有分支事务无异常,提交全局事务commitTransaction(tx);return rs;}

在第12小节的分析中,已经通过beginTransaction(txInfo, tx)获取到了全局事务ID,并记录到global_table全局事务表中,接下来会执行 business.execute():进入业务代码,执行业务逻辑

    //业务代码逻辑:下订单 -- 减库存 -- 扣余额@Override@GlobalTransactional(name="createOrder")public Order saveOrder(OrderVo orderVo) {log.info("=============用户下单=================");log.info("当前 XID: {}", RootContext.getXID());// 生成订单对象Order order = new Order();order.setUserId(orderVo.getUserId());order.setCommodityCode(orderVo.getCommodityCode());order.setCount(orderVo.getCount());order.setMoney(orderVo.getMoney());order.setStatus(OrderStatus.INIT.getValue());//保存订单Integer saveOrderRecord = orderMapper.insert(order);//减库存 storageFeignService.deduct(orderVo.getCommodityCode(), orderVo.getCount());//扣余额Boolean debit= accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney());}

经过调试发现,上述 下订单 – 减库存 – 扣余额 的业务代码,每执行完一步,就会在对应服务的数据库中的对应表下生成branch_idundo_log等信息,理论上单纯的orderMapper.insert的语句并不会插入这些东西啊?而这里的原因就在于:我们在配置数据源时对数据源做了一个代理Proxy,这些逻辑都是在代理时做的!

     //创建数据源@Bean@ConfigurationProperties(prefix = "spring.datasource.druid")public DataSource druidDataSource() {DruidDataSource druidDataSource = new DruidDataSource();return druidDataSource;}//为数据源添加代理DataSourceProxy!@Primary@Bean("dataSource")public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {return new DataSourceProxy(druidDataSource);}

如上所示,在项目启动时,需要为DruidDataSource 包装一层DataSourceProxy代理!在执行增删改查逻辑时:

  • 如果方法上带有@GlobalTransactional注解,则走DataSourceProxy数据源的代理逻辑
  • 如果方法上不带@GlobalTransactional注解,则走正常的增删改查逻辑

正常的增删改查逻辑:PreparedStatement.execute()

  @Overridepublic <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {PreparedStatement ps = (PreparedStatement) statement;//调用PreparedStatement的execute()进行查询数据ps.execute();//处理结果集return resultSetHandler.handleResultSets(ps);}

代理后的增删改查逻辑PreparedStatementProxy.execute()

在执行订单插入方法orderMapper.insert()时,由于方法上加了@GlobalTransactional注解,在插入时会走代理后的插入逻辑!

    public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,StatementProxy<S> statementProxy,StatementCallback<T, S> statementCallback,Object... args) throws SQLException {//如果不需要全局事务锁,并且 不是seata的AT模式,直接走正常的增删改查逻辑                                              if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {// Just work as original statementreturn statementCallback.execute(statementProxy.getTargetStatement(), args);}//否则会走以下代理逻辑String dbType = statementProxy.getConnectionProxy().getDbType();if (CollectionUtils.isEmpty(sqlRecognizers)) {sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(),dbType);}Executor<T> executor;if (CollectionUtils.isEmpty(sqlRecognizers)) {executor = new PlainExecutor<>(statementProxy, statementCallback);} else {if (sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);switch (sqlRecognizer.getSQLType()) {//返回插入的执行器case INSERT:executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},new Object[]{statementProxy, statementCallback, sqlRecognizer});break;//返回更新的执行器case UPDATE:executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;//返回删除的执行器case DELETE:executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;//返回查询并更新的执行器case SELECT_FOR_UPDATE:executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;default:executor = new PlainExecutor<>(statementProxy, statementCallback);break;}} else {executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);}}T rs;try {//代理逻辑下:拿到上边返回的执行器,并执行execute方法!rs = executor.execute(args);} catch (Throwable ex) {if (!(ex instanceof SQLException)) {// Turn other exception into SQLExceptionex = new SQLException(ex);}throw (SQLException) ex;}return rs;}

可以看到,代理后的逻辑对原始的sql执行进行了扩展,拿到增删改查中的某一个执行器,进入executor.execute()执行增删改查,进入execute()内部!

    @Overridepublic T execute(Object... args) throws Throwable {//由于RootContext中的XID在上文第一步已经被填充//此处可直接从RootContext中获取全局事务XIDString xid = RootContext.getXID();if (xid != null) {//与数据库连接绑定,Connection也是代理后的ConnectionProxy,并不是原来的数据库连接//注意:分布式事务下,每一个服务的数据库连接都要绑定同一个XID//如果调用链的某个分支可以不参与分布式事务,也可使用unbind()进行解绑!statementProxy.getConnectionProxy().bind(xid);}//设置是否需要全局锁lockstatementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());// doExecute执行增删改查return doExecute(args);}

进入doExecute(args)方法,该方法主要有以下逻辑

  • 修改自动提交为false,变为手动提交!
  • 执行sql,并准备undo_log表的内容,设置前置镜像 和 后置镜像。用于回滚操作,所以这些镜像的本质其实也是sql语句
  • 提交本地事务
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {//设置为手动提交connectionProxy.setAutoCommit(false);return new LockRetryPolicy(connectionProxy).execute(() -> {//执行sql ,并准备undo_log的内容,设置前置镜像 和 后置镜像!,见下文T result = executeAutoCommitFalse(args);// 提交本地事务connectionProxy.commit();return result;});} catch (Exception e) {// when exception occur in finally,this exception will lost, so just print it hereLOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {connectionProxy.getTargetConnection().rollback();}throw e;} finally {connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}}===================== undo_log前置镜像和后置镜像的设置 ========================protected T executeAutoCommitFalse(Object[] args) throws Exception {if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");}//得到前置镜像TableRecords beforeImage = beforeImage();//执行目标sql//注意:虽然此时sql已经执行完毕,但是数据库并没有该条数据,因为本地事务还没有提交!T result = statementCallback.execute(statementProxy.getTargetStatement(), args);//得到后置镜像TableRecords afterImage = afterImage(beforeImage);// 准备undo_log:填充前置、后置镜像!prepareUndoLog(beforeImage, afterImage);return result;}

connectionProxy.commit()提交本地事务,

    @Overridepublic void commit() throws SQLException {try {// execute(Callable<T> callable) ,利用Callable回调的方式调用doCommit()方法LOCK_RETRY_POLICY.execute(() -> {//提交事务doCommit();return null;});} catch (SQLException e) {if (targetConnection != null && !getAutoCommit()) {//异常回滚rollback();}throw e;} catch (Exception e) {throw new SQLException(e);}}

doCommit方法如下

    private void doCommit() throws SQLException {if (context.inGlobalTransaction()) {//如果是全局事务,执行全局事务的提交逻辑processGlobalTransactionCommit();} else if (context.isGlobalLockRequire()) {//如果需要事务全局锁,会检查全局锁是否有效!processLocalCommitWithGlobalLocks();} else {//如果不是全局事务,直接committargetConnection.commit();}}

processGlobalTransactionCommit方法逻辑如下:

    private void processGlobalTransactionCommit() throws SQLException {try {//RM 向TC发请求,TC注册分支事务,插入一条分支事务数据到 mysql 、oracle等数据库//同时获取全局锁,收集行锁存储到`lock_table`表中register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {//生成 undo_log 回滚日志,用于事务回滚UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//提交 undo_log 回滚日志 和 本地事务targetConnection.commit();} catch (Throwable ex) {LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);//上报异常report(false);throw new SQLException(ex);}if (IS_REPORT_SUCCESS_ENABLE) {report(true);}//清除ConnectionContext 的 xid、branch_id等信息!context.reset();}

上面的代码主要做四件事情

  • RM 向TC发请求,TC端注册分支事务,插入一条分支事务数据到 mysqloracle等数据库的branch_table表中

  • TC端获取全局锁,收集行锁存储到lock_table表中,row_key是唯一的!

  • 生成 undo_log 回滚日志,用于事务回滚

  • 提交本地事务,生成订单信息!

综上所述,一阶段内容如下:

  1. 扫描@GlobalTransactional注解,TMTC发请求,获取全局事务XID
  2. TC生成全局事务XID,并存储到全局事务表global_table
  3. 然后开始执行业务代码,使用数据源代理替换原始的sql执行方式
  4. 准备前置镜像
  5. 执行目标sql,执行但未提交
  6. 准备后置镜像,组装undo_log
  7. TC注册分支事务,TC端获取全局事务锁,把分支事务信息存储到branch_table表,并把全局事务锁信息存储到lock_table表中
  8. RM端提交undo_log信息,把前置镜像、后置镜像存储到对应服务下的 undo_log表中,用于事务回滚!
  9. RM端提交本地事务

第二阶段:如果无异常

4. TM向TC发起全局事务提交请求

第一阶段执行完毕后,如果没有业务异常,TM会向TC发起提交全局事务的请求。Seata处理全局事务的核心逻辑如下,这段代码摘自上文第1小节的transactionalTemplate.execute方法中,第一阶段business.execute()方法执行完毕,如果没有异常会执行commitTransaction(tx);方法

io.seata.tm.api.TransactionalTemplate # commitTransaction

try {// 1.开启全局事务!beginTransaction(txInfo, tx);Object rs;try {// 2.进入业务代码,执行业务逻辑rs = business.execute();} catch (Throwable ex) {// 3. 出现了业务异常进行回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. 当所有分支事务无异常,提交全局事务commitTransaction(tx);return rs;}

进入commitTransaction(tx)内部,执行tx.commit()方法如下:

    @Overridepublic void commit() throws TransactionException {//判断当前角色:是TM才会执行//Participant:RM 直接return//Launcher:TMif (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}assertXIDNotNull();int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {//重复发起提交,最多5次while (retry > 0) {try {// 事务管理器向 TC 发起事务提交请求,调用seata-server服务status = transactionManager.commit(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());//每提交一次 减1次retry--;if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);}}}

5. TC异步删除全局事务、分支事务、释放全局锁,并通知RM删除undo_log

TC端在接收到TM的提交请求后会执行doGlobalCommit方法

TC端源码位置:io.seata.server.coordinator.DefaultCoordinator # doGlobalCommit

    @Overrideprotected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());//设置全局事务状态:异步提交response.setGlobalStatus(core.commit(request.getXid()));}

其中core.commit(request.getXid()是提交的核心逻辑

    @Overridepublic GlobalStatus commit(String xid) throws TransactionException {//获取GlobalSession,DB模式会去查hlable_table表GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {//如果GlobalSession为空,返回Finished状态。比如调用超时会清除GlobalSessionreturn GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {// Highlight: Firstly, close the session, then no more branch can be registered.globalSession.closeAndClean();if (globalSession.getStatus() == GlobalStatus.Begin) {if (globalSession.canBeCommittedAsync()) {//AT模式下异步提交事务globalSession.asyncCommit();return false;} else {globalSession.changeStatus(GlobalStatus.Committing);return true;}}return false;});

globalSession.asyncCommit()逻辑如下:

    public void asyncCommit() throws TransactionException {this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());//设置事务状态为AsyncCommittingthis.setStatus(GlobalStatus.AsyncCommitting);SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);}

可以看到TC端在提交事务时,仅仅把globalSessionstatus设置成AsyncCommitting就不管了,那是由谁来完成后续的提交逻辑呢?

是由DefaultCoordinator类的init方法去完成的:通过异步的方式处理GlobalSession的不同状态,提高了系统性能

  • 初始化定时线程池,每隔1秒执行一次
  • global_table中获取全局事务列表,每次取100
  • 如果某条数据的状态为AsyncCommitting,则从global_table中删除这条全局事务信息
  • 遍历branch_table表,根据要删除的global_tableXID找到对应的分支事务,删除分支事务信息,删除全局锁
  • RM同步发送RPC请求,让RM端删除对应的的undo_log,当然RM端也是有一个监听器去执行删除的!

io.seata.server.coordinator.DefaultCoordinator # init 用多个线程池分别处理不同状态的全局事务!

    public void init() {.....  //线程池处理:异常状态的处理逻辑//线程池处理:异步提交状态asyncCommitting 的处理逻辑asyncCommitting.scheduleAtFixedRate(() -> {boolean lock = SessionHolder.asyncCommittingLock();if (lock) {try {handleAsyncCommitting();} catch (Exception e) {LOGGER.info("Exception async committing ... ", e);} finally {SessionHolder.unAsyncCommittingLock();}}}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);...... //线程池处理:其他状态的处理逻辑}

业务正常,提交全局事务:

第二阶段:如果有异常

6. TM向TC发起全局事务回滚请求

try {// 1.开启全局事务!beginTransaction(txInfo, tx);Object rs;try {// 2.进入业务代码,执行业务逻辑rs = business.execute();} catch (Throwable ex) {// 3. 出现了业务异常进行回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. 当所有分支事务无异常,提交全局事务commitTransaction(tx);return rs;}

如果执行业务逻辑出现了异常,会进入completeTransactionAfterThrowing方法进行全局事务回滚!

    private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {//roll backif (txInfo != null && txInfo.rollbackOn(originalException)) {try {//回滚事务rollbackTransaction(tx, originalException);} catch (TransactionException txe) {// Failed to rollbackthrow new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, originalException);}} else {// not roll back on this exception, so commitcommitTransaction(tx);}}

进入rollbackTransaction(tx, originalException)中的rollback()方法

    @Overridepublic void rollback() throws TransactionException {if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of rollbackif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);}return;}assertXIDNotNull();int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;try {//回滚重试次数默认也是 5 次while (retry > 0) {try {// 事务管理器 TM 发起rollback 请求,调用seata-server服务status = transactionManager.rollback(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {throw new TransactionException("Failed to report global rollback", ex);}}}} finally {if (xid.equals(RootContext.getXID())) {suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] rollback status: {}", xid, status);}}

如上源码所示:事务管理器 TM 发起rollback 请求,同步sync调用seata-server服务!回滚请求如果失败,默认重试5次,提高容错性!

6. TC异步删除global_table、branch_table,RM端回滚数据并删除undo_log

   public void init() {// 线程池处理:异常状态的处理逻辑retryRollbacking.scheduleAtFixedRate(() -> {boolean lock = SessionHolder.retryRollbackingLock();if (lock) {try {handleRetryRollbacking();} catch (Exception e) {LOGGER.info("Exception retry rollbacking ... ", e);} finally {SessionHolder.unRetryRollbackingLock();}}}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//线程池处理:重试状态提交状态的处理逻辑retryCommitting.scheduleAtFixedRate(() -> {boolean lock = SessionHolder.retryCommittingLock();if (lock) {try {handleRetryCommitting();} catch (Exception e) {LOGGER.info("Exception retry committing ... ", e);} finally {SessionHolder.unRetryCommittingLock();}}}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//线程池处理:异步提交状态的处理逻辑asyncCommitting.scheduleAtFixedRate(() -> {boolean lock = SessionHolder.asyncCommittingLock();if (lock) {try {handleAsyncCommitting();} catch (Exception e) {LOGGER.info("Exception async committing ... ", e);} finally {SessionHolder.unAsyncCommittingLock();}}}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);timeoutCheck.scheduleAtFixedRate(() -> {boolean lock = SessionHolder.txTimeoutCheckLock();if (lock) {try {timeoutCheck();} catch (Exception e) {LOGGER.info("Exception timeout checking ... ", e);} finally {SessionHolder.unTxTimeoutCheckLock();}}}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);undoLogDelete.scheduleAtFixedRate(() -> {boolean lock = SessionHolder.undoLogDeleteLock();if (lock) {try {undoLogDelete();} catch (Exception e) {LOGGER.info("Exception undoLog deleting ... ", e);} finally {SessionHolder.unUndoLogDeleteLock();}}}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);}
  1. TCglobal_table中拿到全局事务XID,删除对应的全局事务信息
  2. TC通过全局事务XID与分支事务的绑定关系,从branch_table中拿到分支事务。然后同步的向RM发送rpc请求
  3. RM接收到TC的回滚请求后,根据undo_log表中的前置镜像对数据进行回滚
  4. RM端数据回滚完成后,删除undo_log表(注意此处加了行锁,防止写并发),并返回回滚状态 – 已完成
  5. TC端接受到RM端数据回滚完成状态后,删除branch_table中对应的分支信息!

问题:Seata全局锁的作用是什么?

seataat模式主要实现逻辑是数据源代理,而数据源代理将基于如MySQLOracle等关系事务型数据库实现,基于数据库的隔离级别为可重复读read committed。换而言之,本地事务的支持是seata实现at模式的必要条件,这也将限制seata的at模式的使用场景。

Seata全局锁的作用是:写隔离,保证数据的一致性,如果其他线程不需要获取锁可直接修改前置、后置镜像,那么当出现异常回滚时,镜像中的值就和本次事务中的不一致!造成数据一致性问题!,首先,我们理解一下写隔离的流程

分支事务1-开始
|
V 获取 本地锁
|
V 获取 全局锁    分支事务2-开始
|               |
V 释放 本地锁     V 获取 本地锁
|               |
V 释放 全局锁     V 获取 全局锁|V 释放 本地锁|V 释放 全局锁

如上所示,一个分布式事务的锁获取流程是这样的

  1. 先获取到本地锁,这样你已经可以修改本地数据了,只是还不能提交本地事务commit
  2. 而后,能否提交就是看能否获得全局锁
  3. 获得了全局锁,意味着可以修改了,那么提交本地事务,释放本地锁
  4. 当分布式事务提交,释放全局锁。这样就可以让其它事务获取全局锁,并提交它们对本地数据的修改了。

可以看到,这里有两个关键点

  • 本地锁获取之前,不会去争抢全局锁
  • 全局锁获取之前,不会提交本地锁

这就意味着,数据的修改将被互斥开来。也就不会造成写入脏数据。全局锁可以让分布式修改中的写数据隔离。

上面提到:在执行业务sql之前。会生成一个前置数据镜像,也就是beforeImage方法。

while (true) {try {// 执行sqlrs = statementCallback.execute(statementProxy.getTargetStatement(), args);// 构建数据行TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);// 构建锁KEYString lockKeys = buildLockKey(selectPKRows);if (StringUtils.isNullOrEmpty(lockKeys)) {break;}if (RootContext.inGlobalTransaction()) {// 校验全局锁statementProxy.getConnectionProxy().checkLock(lockKeys);} else if (RootContext.requireGlobalLock()) {statementProxy.getConnectionProxy().appendLockKey(lockKeys);} else {throw new RuntimeException("Unknown situation!");}break;} catch (LockConflictException lce) {if (sp != null) {conn.rollback(sp);} else {conn.rollback();}// 锁冲突,重试lockRetryController.sleep(lce);}
}

如代码所示,每一个分支事务,其实就是通过占用本地锁,然后重试等待全局锁来达到读写隔离的目的,保证前置后置镜像不被脏写!

分布式事务Seata的AT模式下两阶段提交原理相关推荐

  1. 分布式事务 -- seata框架AT模式实现原理

    Seata AT 模式 上一节中我们提到AT模式是基于XA事务模型演变过来的,所以他的整体机制也是一个改进版本的两阶段提交协议. 第一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和链 ...

  2. 分布式事务SEATA的AT模式的简单使用

    目录 下载 修改配置 创建seata数据库 修改配置 启动nacos 启动seata-server 使用 引入包 配置seata 使用 下载 修改配置 seata-server-0.9.0.zip解压 ...

  3. 分布式事务中的Saga模式

    微服务架构(MSA)已经变得非常流行.但是,一个常见问题是如何跨多个微服务管理分布式事务.当微服务架构将单体系统分解为自封装服务时,意味着单体系统中的本地事务现在分布到将按顺序调用的多个服务中. 说到 ...

  4. 分布式事务(Seata) 四大模式详解

    前言 在上一节中我们讲解了,关于分布式事务和seata的基本介绍和使用,感兴趣的小伙伴可以回顾一下<别再说你不知道分布式事务了!> 最后小农也说了,下期会带给大家关于Seata中关于sea ...

  5. 分布式事务Seata原理

    一.Seata 介绍: 1.Seata 简介: Seata 是一款开源的分布式事务解决方案,致力于提供高性能与简单易用的分布式事务服务,为用户提供了 AT.TCC.SAGA 和 XA 几种不同的事务模 ...

  6. 分布式事务实战---XA两阶段提交(2PC)方案详解

    XA,2PC,two-phase commit protocol,两阶段事务提交采⽤的是 X/OPEN 组织定义的DTP 模型所抽象的: AP 应用程序,Application Program,定义事 ...

  7. 3pc在mysql的实现_面试官:了解分布式事务?讲讲你理解的2PC和3PC原理

    分布式事物基本理论:基本遵循CPA理论,采用柔性事物特征,软状态或者最终一致性特点保证分布式事物一致性问题. 分布式事物常见解决方案:2PC两段提交协议 3PC三段提交协议(弥补两端提交协议缺点) T ...

  8. MySQL 为什么需要两阶段提交?

    文章目录 1. 什么是两阶段提交 1.1 binlog 与 redolog binlog redo log 1.2 两阶段提交 2. 为什么需要两阶段提交 3. 小结 为什么要两阶段提交?一阶段提交不 ...

  9. mysql之两阶段提交

    什么是两阶段提交 当有数据修改时,会先将修改redo log cache和binlog cache然后在刷入到磁盘形成redo log file,当redo log file全都刷入到磁盘时(prep ...

最新文章

  1. 关于外包团队的质量管理
  2. html网页缩小之后div框移动,css – DIV在浏览器中放大和缩小时移动
  3. 设置ComboBox控件的边框颜色.
  4. java外挂源码_2.7 万 Star!Github 项目源码辅助阅读神器
  5. MySQLdb安装的错误说明
  6. 获取数据 - 将Excel文件读入矩阵matrix中 - Python代码
  7. 信息学奥赛一本通 1178:成绩排序 | OpenJudge NOI 1.10 03:成绩排序
  8. Yammer Metrics实现服务指标收集与监控
  9. java配置文件强制更新_对Java配置文件Properties的读取、写入与更新操作
  10. php短链接api,PHP通过调用新浪API生成t.cn格式短网址链接的方法详解
  11. Ubuntu配置maven
  12. php7.4报错:Trying to access array offset on value of type null
  13. 安卓源码下载的环境搭建
  14. 聚类分析软件测试,基于复杂网络的软件测试路径聚类分析-计算机工程与应用.PDF...
  15. pdf加水印怎么加?
  16. [PTA]实验5-6 使用函数判断完全平方数
  17. box-sizing属性介绍
  18. 【Unity】 HTFramework框架(十)Resource资源管理器
  19. 在2003服务器上预览时出现:您未被授权查看该页 您不具备使用所提供的凭据查看该目录或页的权限
  20. HMM(三)维特比算法推测隐藏状态序列

热门文章

  1. python中文界面设定_python绘图界面中文显示
  2. vue——vuex mapState,mapGetters,mapMutations,mapActions
  3. 计算机网络基本操作命令的使用,计算机网络-路由器基本命令操作实验指导书--华为...
  4. 有关凸集的证明例题_第1章引言题解1. 用定义验证下列各集合是凸集: (1) S={(X1 ......
  5. Android Studio:解决DataBinding v4包问题
  6. Android导入第三方静态库.a编译成动态库.so
  7. 几个问题,比较急,知道的大侠,帮帮忙
  8. Object Detection: Face Detection using Haar Cascades
  9. GitHub 版本控制 项目托管 04 创建GitHub远程仓库
  10. Android零基础入门第31节:几乎不用但要了解的AbsoluteLayout绝对布局