流程图

该图介绍了事务的传播行为

该流通图展示的是TransactionManager具体如何结合事务的传播行为进行事务获取

该流通图展示的是TransactionManager具体如何结合事务的传播行为进行事务提交

该流通图展示的是TransactionManager具体如何结合事务的传播行为进行事务回滚

TransactionManager

public interface TransactionManager {}

PlatformTransactionManager

这是 Spring 事务基础类中的中心接口。

对于实现者,建议从提供的 {@link org.springframework.transaction.support.AbstractPlatformTransactionManager} 类派生,该类实现了事务的传播行为并负责事务同步处理。该类使用了模板模式,子类必须为底层事务的特定状态实现模板方法,例如:begin, suspend, resume, commit。

该策略接口的默认实现是{@link org.springframework.transaction.jta.JtaTransactionManager}{@link org.springframework.jdbc.datasource.DataSourceTransactionManager},可以作为其他事务策略的实现指南。

public interface PlatformTransactionManager extends TransactionManager {}

getTransaction(TransactionDefinition definition)

根据事务的传播特性,返回一个有效的事务或者创建一个新事务

  1. 隔离级别、超时时间,只应用于新创建的事务
  2. 并不是所有的事务定义的属性都会被每一个事务管理器支持,如果事务管理器不支持,应该抛异常
  3. 上述规则的一个例外是只读标志,如果不支持显式只读模式,则忽略。
 TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException;

commit(TransactionStatus status)

提交给定的事务

  1. 如果事务已被标记为仅回滚rollback-only,则执行回滚(对应rollback方法中的1)
  2. 如果先前的事务已被挂起以便能够创建新事务,则在提交新事务后恢复先前的事务。
  3. 当commit调用完成时,无论是正常还是抛出异常,事务都必须完全完成并清理。
 void commit(TransactionStatus status) throws TransactionException;

rollback(TransactionStatus status)

执行跟定事务的回滚操作

  1. 如果事务不是新事务,只需将其设置为仅回滚(rollback-only)以正确参与外层事务。
  2. 如果先前的事务已暂停以便能够创建新事务,则在回滚新事务后恢复先前的事务。
  3. 如果事务commit时抛出异常,则不要在调用回滚;因为当commit返回时,事务已经completed和clean up,在AbstractPlatformTransactionManager#processRollback中会有cleanupAfterCompletion方法,并且在AbstractPlatformTransactionManager#commit中首先会检查事务状态,如果事务已经结束,则抛异常
 void rollback(TransactionStatus status) throws TransactionException;

AbstractPlatformTransactionManager

该抽象类实现了Spring 标准事务工作流,作为具体平台事务管理器的基础,例如 {@link org.springframework.transaction.jta.JtaTransactionManager}

该类使用了模板模式,子类必须为底层事务的特定状态实现模板方法,例如:begin, suspend, resume, commit。

此基类提供以下工作流处理:

  • 确定是否已经存在了一个事务
  • 应用适当的事务传播特性;
  • 必要时挂起和恢复事务;
  • 在commit时检查rollback-only标志;
  • 对rollback应用适当的修改(实际回滚或设置仅回滚标志rollback-only);
  • 触发已注册的同步回调(如果事务同步处于活动状态)。

事务同步是一种通用机制,用于注册在事务完成时调用的回调。

getTransaction

该方法处理事务的传播行为,委托给{@code doGetTransaction},{@code isExistingTransaction}

获取到Object transaction后,根据当前事务是否已经存在和事务的传播行为进行不同的处理

  • 如果存在一个事务,则根据事务的传播属性进行相关处理处理
  • 如果不存在事务,但是有需要强制有事务MANDATORY,则抛异常
  • 如果不存在事务,但是需要有一个事务,则新创建一个事务,同时使用Object transaction开启事务。
 public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException {// 如果没有给定的TransactionDefinition就使用默认的TransactionDefinitionTransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());//获取底层的数据库事务,该方法由子类实现Object transaction = doGetTransaction();//1.判断是否存在一个事务,该方法由子类实现if (isExistingTransaction(transaction)) {//如果存在一个事务,则检查事务的传播行为以了解行为方式,并创建一个TransactionStatusreturn handleExistingTransaction(def, transaction, debugEnabled);}// Check definition settings for new transaction.if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());}//2.没有找到一个已经存在的事务//2.1 如果没有找到一个已经存在的事务,而且事务的传播特性是MANDATORY,则抛异常if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");}//2.2 如果没有找到一个已经存在的事务,而且事务的传播特性是REQUIRED,REQUIRES_NEW,NESTED,则创建新的事务else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {SuspendedResourcesHolder suspendedResources = suspend(null);try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(def, transaction, true, newSynchronization, debugEnabled, suspendedResources);//开启一个事务,该方法由子类实现doBegin(transaction, def);//事务同步prepareSynchronization(status, def);return status;}catch (RuntimeException | Error ex) {resume(null, suspendedResources);throw ex;}}//创建“空”事务:没有实际事务,但可能同步。else {if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + def);}boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);}}

handleExistingTransaction

根据事务的传播行为为已经存在的事务创建一个TransactionStatus,也就是说如果当前已经存在了一个事务,则会调用该方法

 private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)throws TransactionException {//NEVER总是以非事务执行,不支持当前事务;如果存在事务,则抛出异常。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");}//NOT_SUPPORTED总是以非事务执行,不支持当前事务;如果存在事务,则挂起当前事务。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);//创建一个TransactionStatus,newTransaction=falsereturn prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);}//REQUIRES_NEW总是开启新事务;如果存在事务,则将这个存在的事务挂起if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {SuspendedResourcesHolder suspendedResources = suspend(transaction);try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);//创建一个新事务状态,newTransaction=falseDefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}}//NESTED嵌套事务,创建一个嵌套事务if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {//判断是否允许存在嵌套型事务,不存在则抛异常if (!isNestedTransactionAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");}//1.通过savepoint支持嵌套型事务if (useSavepointForNestedTransaction()) {//创建一个TransactionStatus,newTransaction=falseDefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);//创建并持有savepointstatus.createAndHoldSavepoint();return status;}//2.通过嵌套的begin,commit,rollback支持嵌套型事务else {// Nested transaction through nested begin and commit/rollback calls.// Usually only for JTA: Spring synchronization might get activated here// in case of a pre-existing JTA transaction.boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}}//PROPAGATION_SUPPORTS或者PROPAGATION_REQUIRED.if (debugEnabled) {logger.debug("Participating in existing transaction");}//在参与到存在的事务前,是否需要校验存在的事务,默认不校验if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] specifies isolation level which is incompatible with existing transaction: " +(currentIsolationLevel != null ?isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :"(unknown)"));}}if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] is not marked as read-only but existing transaction is");}}}boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);//准备事务状态,newTransaction=falsereturn prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);}

prepareTransactionStatus

根据给定的参数创建一个新的TransactionStatus,同时适当地初始化事务同步

 protected final DefaultTransactionStatus prepareTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {//创建一个DefaultTransactionStatusDefaultTransactionStatus status = newTransactionStatus(definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);//事务同步prepareSynchronization(status, definition);return status;}

newTransactionStatus

根据给定的参数创建一个TransactionStatus,与上面的方法的主要区别在于没有进行事务同步

 protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {boolean actualNewSynchronization = newSynchronization &&!TransactionSynchronizationManager.isSynchronizationActive();return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization,definition.isReadOnly(), debug, suspendedResources);}

prepareSynchronization

适当的初始化事务同步,因多线程安全问题,这里使用了线程私有变量来存这些属性

 protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {if (status.isNewSynchronization()) {TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?definition.getIsolationLevel() : null);TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());TransactionSynchronizationManager.initSynchronization();}}

rollback(TransactionStatus status)

该方法根据事务的信息执行各种操作,比如回滚事务,或者设置rollbackOnly属性为true

核心方法

  • doRollback:调用具体的事务管理器进行事务回滚
  • doSetRollbackOnly:将事务标为仅回滚,由外层事务进行回滚
 public final void rollback(TransactionStatus status) throws TransactionException {//检查事务状态是否为已结束,如果已经结束则会抛异常if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");}DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;//处理回滚processRollback(defStatus, false);}

processRollback(DefaultTransactionStatus status, boolean unexpected)

 private void processRollback(DefaultTransactionStatus status, boolean unexpected) {try {boolean unexpectedRollback = unexpected;try {//触发事务同步回调方法TransactionSynchronization#beforeCompletion()triggerBeforeCompletion(status);//如果有savepoint,回滚至savepoint,只有一种情况//1.内层PROPAGATION_NESTED事务会开启保存点if (status.hasSavepoint()) {status.rollbackToHeldSavepoint();}//如果是新事务,则回滚事务,有两种情况//1.最外层事务,事务是PROPAGATION_REQUIRED//2.外层事务是PROPAGATION_REQUIRED,内层事务是PROPAGATION_REQUIRES_NEWelse if (status.isNewTransaction()) {//调用具体的事务管理器子类进行实际的事务回滚doRollback(status);}//如果有事务但不是新事务,则把标记事务状态为仅回滚,等到外层事务那一层进行回滚//而是当前事务方法参与到了外层事务中或者没有开启事务,比如//1.内层PROPAGATION_REQUIRED//2.内层PROPAGATION_SUPPORTS//3.内层PROPAGATION_MANDATORY,//这表示内外层方法处于同一个事务中,因此在内层事务设置rollback-only,最终会在外层进行事务回滚else {// 当前事务方法参与了一个更大的事务if (status.hasTransaction()) {//if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {//设置rollbackOnly为true,等到外层事务的那一层进行回滚//注意,这里不是给当前的TransactionStatus设置rollback-only//而是通过TransactionStatus获取到TransactionObject,然后将TransactionObject持有的ConnectionHolder中的表示全局的rollback-only设置为true,//表示全局事务需要进行回滚,外层事务会通过判断GlobalRollback判断是否需要进行回滚doSetRollbackOnly(status);}}// Unexpected rollback only matters here if we're asked to fail earlyif (!isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = false;}}}catch (RuntimeException | Error ex) {//触发事务同步回调方法TransactionSynchronization#afterCompletion()triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);throw ex;}//触发事务同步回调方法TransactionSynchronization#afterCompletion()triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);// Raise UnexpectedRollbackException if we had a global rollback-only marker//如果是unexpectedRollback,那么会继续抛异常UnexpectedRollbackException//通常来说全局rollback-only=true,在调用processRollback方法是,会给unexpectedRollback传trueif (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");}}//清空记录的资源并将挂起的资源恢复finally {cleanupAfterCompletion(status);}}

commit(TransactionStatus status)

public final void commit(TransactionStatus status) throws TransactionException {//1.如果事务已经完成了,则抛异常    if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");}DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;//2.如果当前事务状态表示事务需要回滚,则对事务进行回滚if (defStatus.isLocalRollbackOnly()) {processRollback(defStatus, false);return;}//3.当前方法的事务是commit,但是全局的事务状态是rollback-only,则也对事务进行回滚//在多层事务方法调用中会出现if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {processRollback(defStatus, true);return;}//4.进行事务提交processCommit(defStatus);}

processCommit(DefaultTransactionStatus status)

处理实际的事务提交commit

 private void processCommit(DefaultTransactionStatus status) throws TransactionException {try {boolean beforeCompletionInvoked = false;try {boolean unexpectedRollback = false;prepareForCommit(status);//触发事务同步回调方法TransactionSynchronization#beforeCommit()triggerBeforeCommit(status);//触发事务同步回调方法TransactionSynchronization#beforeCompletion()triggerBeforeCompletion(status);beforeCompletionInvoked = true;if (status.hasSavepoint()) {if (status.isDebug()) {logger.debug("Releasing transaction savepoint");}unexpectedRollback = status.isGlobalRollbackOnly();status.releaseHeldSavepoint();}else if (status.isNewTransaction()) {if (status.isDebug()) {logger.debug("Initiating transaction commit");}unexpectedRollback = status.isGlobalRollbackOnly();doCommit(status);}else if (isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = status.isGlobalRollbackOnly();}// Throw UnexpectedRollbackException if we have a global rollback-only// marker but still didn't get a corresponding exception from commit.if (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");}}catch (UnexpectedRollbackException ex) {// can only be caused by doCommittriggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);throw ex;}catch (TransactionException ex) {// can only be caused by doCommitif (isRollbackOnCommitFailure()) {doRollbackOnCommitException(status, ex);}else {triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);}throw ex;}catch (RuntimeException | Error ex) {if (!beforeCompletionInvoked) {triggerBeforeCompletion(status);}doRollbackOnCommitException(status, ex);throw ex;}// Trigger afterCommit callbacks, with an exception thrown there// propagated to callers but the transaction still considered as committed.try {//触发事务同步回调方法TransactionSynchronization#afterCommit()triggerAfterCommit(status);}finally {//触发事务同步回调方法TransactionSynchronization#afterCompletion()triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);}}//事务完成后进行cleanupfinally {cleanupAfterCompletion(status);}}

DataSourceTransactionManager

public class DataSourceTransactionManager extends AbstractPlatformTransactionManagerimplements ResourceTransactionManager, InitializingBean {private DataSource dataSource;private boolean enforceReadOnly = false;public DataSourceTransactionManager() {setNestedTransactionAllowed(true);}public DataSourceTransactionManager(DataSource dataSource) {this();setDataSource(dataSource);afterPropertiesSet();}
}

doGetTransaction

获取一个事务对象

 protected Object doGetTransaction() {//创建一个事务对象 transaction objectDataSourceTransactionObject txObject = new DataSourceTransactionObject();//如果支持嵌套事务,则允许设置savepointtxObject.setSavepointAllowed(isNestedTransactionAllowed());//从事务同步管理器中获取一个ConnectionHolder,有可能为nullConnectionHolder conHolder =(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());//将connection holder设置给事务对象txObject.setConnectionHolder(conHolder, false);return txObject;}

isExistingTransaction

ConnectHolder存在,且transactionActive=true,才表示存在一个事务。

 protected boolean isExistingTransaction(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());}

doBegin

开启事务

 protected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;Connection con = null;try {//如果事务没有ConnectionHolder或者该connectionHolder没有与事务同步,则从DataSource中获取一个,并且设置给事务对象if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {Connection newCon = obtainDataSource().getConnection();//将该connection holder设置给事务,并且该connection holder是一个新的connection holdertxObject.setConnectionHolder(new ConnectionHolder(newCon), true);}//将该connection holder设置为与事务同步txObject.getConnectionHolder().setSynchronizedWithTransaction(true);con = txObject.getConnectionHolder().getConnection();Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);txObject.setPreviousIsolationLevel(previousIsolationLevel);txObject.setReadOnly(definition.isReadOnly());//将事务提交设置为手动,非自动if (con.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);con.setAutoCommit(false);}prepareTransactionalConnection(con, definition);//设置事务为活动的txObject.getConnectionHolder().setTransactionActive(true);int timeout = determineTimeout(definition);if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}// Bind the connection holder to the thread.//如果是新的connection holder,则将其绑定到当前线程上if (txObject.isNewConnectionHolder()) {TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());}}catch (Throwable ex) {if (txObject.isNewConnectionHolder()) {DataSourceUtils.releaseConnection(con, obtainDataSource());txObject.setConnectionHolder(null, false);}throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);}}

prepareTransactionalConnection

在事务begin的时候,准备一个事务Connection,

 protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)throws SQLException {if (isEnforceReadOnly() && definition.isReadOnly()) {try (Statement stmt = con.createStatement()) {stmt.executeUpdate("SET TRANSACTION READ ONLY");}}}

doSuspend

挂起当前事务,将当前事务的connection holder设置为null,并且与当前线程解绑。

 @Overrideprotected Object doSuspend(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;txObject.setConnectionHolder(null);return TransactionSynchronizationManager.unbindResource(obtainDataSource());}

doResume

恢复事务,将挂起的资源重新与当前线程绑定

 @Overrideprotected void doResume(@Nullable Object transaction, Object suspendedResources) {TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);}

doCommit

提交事务,获取到connection,并执行commit操作

 protected void doCommit(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();try {con.commit();}catch (SQLException ex) {throw new TransactionSystemException("Could not commit JDBC transaction", ex);}}

doRollback

回滚,获取到connection,并执行rollback

 protected void doRollback(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();try {con.rollback();}catch (SQLException ex) {throw new TransactionSystemException("Could not roll back JDBC transaction", ex);}}

doSetRollbackOnly

给TransactionObject对象设置回滚,TransactionObject内部本身没有rollback-only字段,这里是给其持有的ConnectionHolder中的表示全局的rollback-only设置为true,表示全局事务需要进行回滚,外层事务会通过判断GlobalRollback判断是否需要进行回滚。
例如,在两层事务,外层REQUIRED,内层REUIRED,当内层事务遇到异常时,就会通过调用该方法,将全局的事务设置为回滚状态。

 protected void doSetRollbackOnly(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();txObject.setRollbackOnly();}

doCleanupAfterCompletion

 @Overrideprotected void doCleanupAfterCompletion(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;// Remove the connection holder from the thread, if exposed.if (txObject.isNewConnectionHolder()) {TransactionSynchronizationManager.unbindResource(obtainDataSource());}// Reset connection.Connection con = txObject.getConnectionHolder().getConnection();try {if (txObject.isMustRestoreAutoCommit()) {con.setAutoCommit(true);}DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());}catch (Throwable ex) {logger.debug("Could not reset JDBC Connection after transaction", ex);}if (txObject.isNewConnectionHolder()) {if (logger.isDebugEnabled()) {logger.debug("Releasing JDBC Connection [" + con + "] after transaction");}DataSourceUtils.releaseConnection(con, this.dataSource);}txObject.getConnectionHolder().clear();}

DataSourceTransactionObject

DataSource事务对象,是DataSourceTransactionManager的静态内部类,会持有一个ConnectionHolder,被DataSourceTransactionManager当作一个transaction对象使用
DataSourceTransactionManager实现了SmartTransactionObject接口。

 private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {//该事务对象的connection holder是否是新生成的private boolean newConnectionHolder;private boolean mustRestoreAutoCommit;public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {super.setConnectionHolder(connectionHolder);this.newConnectionHolder = newConnectionHolder;}public boolean isNewConnectionHolder() {return this.newConnectionHolder;}public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {this.mustRestoreAutoCommit = mustRestoreAutoCommit;}public boolean isMustRestoreAutoCommit() {return this.mustRestoreAutoCommit;}public void setRollbackOnly() {getConnectionHolder().setRollbackOnly();}//实现了SmartTransactionObject接口的isRollbackOnly方法//通过ConnectionHolder判断rollback-only值public boolean isRollbackOnly() {return getConnectionHolder().isRollbackOnly();}@Overridepublic void flush() {if (TransactionSynchronizationManager.isSynchronizationActive()) {TransactionSynchronizationUtils.triggerFlush();}}}

JdbcTransactionObjectSupport

是DataSourceTransactionObject的抽象父类,包含带有 JDBC {@code Connection}{@link ConnectionHolder},并基于该 {@code ConnectionHolder} 实现 {@link SavepointManager} 接口相关的功能。

public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {//数据库连接持有者private ConnectionHolder connectionHolder;@Nullableprivate Integer previousIsolationLevel;private boolean readOnly = false;private boolean savepointAllowed = false;//其他代码省略......public void flush() {}//实现了savepoint的功能//public Object createSavepoint() throws TransactionException {ConnectionHolder conHolder = this.getConnectionHolderForSavepoint();try {if (!conHolder.supportsSavepoints()) {throw new NestedTransactionNotSupportedException("Cannot create a nested transaction because savepoints are not supported by your JDBC driver");} else if (conHolder.isRollbackOnly()) {throw new CannotCreateTransactionException("Cannot create savepoint for transaction which is already marked as rollback-only");} else {return conHolder.createSavepoint();}} catch (SQLException var3) {throw new CannotCreateTransactionException("Could not create JDBC savepoint", var3);}}public void rollbackToSavepoint(Object savepoint) throws TransactionException {ConnectionHolder conHolder = this.getConnectionHolderForSavepoint();try {conHolder.getConnection().rollback((Savepoint)savepoint);conHolder.resetRollbackOnly();} catch (Throwable var4) {throw new TransactionSystemException("Could not roll back to JDBC savepoint", var4);}}public void releaseSavepoint(Object savepoint) throws TransactionException {ConnectionHolder conHolder = this.getConnectionHolderForSavepoint();try {conHolder.getConnection().releaseSavepoint((Savepoint)savepoint);} catch (Throwable var4) {logger.debug("Could not explicitly release JDBC savepoint", var4);}}protected ConnectionHolder getConnectionHolderForSavepoint() throws TransactionException {if (!this.isSavepointAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions");} else if (!this.hasConnectionHolder()) {throw new TransactionUsageException("Cannot create nested transaction when not exposing a JDBC transaction");} else {return this.getConnectionHolder();}}
}

TransactionStatus

public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {//返回此事务内部是否带有保存点savepoint,即是否已创建了基于保存点的嵌套事务。boolean hasSavepoint();//将底层session刷新到数据库(如果适用):例如,所有受影响的 Hibernate/JPA session。@Overridevoid flush();
}

TransactionExecution

事务当前状态通用接口。
每进入到一个事务方法时,都会生成一个TransactionStatus对象,里面的属性值根据事务传播行为进行不同的设置。

public interface TransactionExecution {//返回当前是否是否是一个新的事务,否则就参与到一个已经存在的事务中//如:在两层REQUIRED事务中,外层事务状态的isNewTransaction=true,内层的isNewTransaction=falseboolean isNewTransaction();//设置事务rollback-only,指示事务管理器唯一的可能结果就是回滚事务void setRollbackOnly();boolean isRollbackOnly();//表示该事务是否结束,即是否已经commit或者roll backboolean isCompleted();
}

AbstractTransactionStatus

TransactionStatus抽象类,预先实现rollback-only 和completed标志的处理,以及对底层 {@link org.springframework.transaction.SavepointManager} 的委托。

还提供了在事务中保存保存点的选项。

public abstract class AbstractTransactionStatus implements TransactionStatus {//rollback-only状态值private boolean rollbackOnly = false;//是否是否已经结束,已经结束的事务不能进行commit,rollback等private boolean completed = false;//savepointprivate Object savepoint;@Overridepublic void setRollbackOnly() {this.rollbackOnly = true;}//判断是否是否rollback-only,需要同时判断当前事务的rollback-only和全局rollback-only的值public boolean isRollbackOnly() {return (isLocalRollbackOnly() || isGlobalRollbackOnly());}public void setCompleted() {this.completed = true;}public boolean isLocalRollbackOnly() {return this.rollbackOnly;}//模板方法,确定事务的全局rollback-only标志//子类可以根据情况进行覆盖重写该方法,例如DefaultTransactionStatus就对其进行了重写public boolean isGlobalRollbackOnly() {return false;}@Overridepublic boolean isCompleted() {return this.completed;}//---------------------------------------------------------------------// 处理当前savepoint的状态//---------------------------------------------------------------------@Overridepublic boolean hasSavepoint() {return (this.savepoint != null);}//为事务设置savepoint,对PROPAGATION_NESTED是有用的protected void setSavepoint(@Nullable Object savepoint) {this.savepoint = savepoint;}protected Object getSavepoint() {return this.savepoint;}//创建一个savepoint,并保存到事务中public void createAndHoldSavepoint() throws TransactionException {setSavepoint(getSavepointManager().createSavepoint());}//回滚到为事务保留的savepoint,然后立即释放savepointpublic void rollbackToHeldSavepoint() throws TransactionException {Object savepoint = getSavepoint();if (savepoint == null) {throw new TransactionUsageException("Cannot roll back to savepoint - no savepoint associated with current transaction");}getSavepointManager().rollbackToSavepoint(savepoint);getSavepointManager().releaseSavepoint(savepoint);setSavepoint(null);}//释放savepointpublic void releaseHeldSavepoint() throws TransactionException {Object savepoint = getSavepoint();if (savepoint == null) {throw new TransactionUsageException("Cannot release savepoint - no savepoint associated with current transaction");}getSavepointManager().releaseSavepoint(savepoint);setSavepoint(null);}//---------------------------------------------------------------------// Implementation of SavepointManager//---------------------------------------------------------------------/*** This implementation delegates to a SavepointManager for the* underlying transaction, if possible.* @see #getSavepointManager()* @see SavepointManager#createSavepoint()*/@Overridepublic Object createSavepoint() throws TransactionException {return getSavepointManager().createSavepoint();}/*** This implementation delegates to a SavepointManager for the* underlying transaction, if possible.* @see #getSavepointManager()* @see SavepointManager#rollbackToSavepoint(Object)*/@Overridepublic void rollbackToSavepoint(Object savepoint) throws TransactionException {getSavepointManager().rollbackToSavepoint(savepoint);}/*** This implementation delegates to a SavepointManager for the* underlying transaction, if possible.* @see #getSavepointManager()* @see SavepointManager#releaseSavepoint(Object)*/@Overridepublic void releaseSavepoint(Object savepoint) throws TransactionException {getSavepointManager().releaseSavepoint(savepoint);}//为底层的事务返回一个SavepointManager//默认是抛异常,子类可以进行覆盖protected SavepointManager getSavepointManager() {throw new NestedTransactionNotSupportedException("This transaction does not support savepoints");}

DefaultTransactionStatus

默认的事务状态类
注意:该类对isGlobalRollbackOnly方法进行了重写,通过检查事务对象来确定全局的事务rollback-only值,DataSourceTransactionObject就是SmartTransactionObject类型的对象,DataSourceTransactionObject通过其持有的ConnectionHolder中的rollback-only值进行返回

public class DefaultTransactionStatus extends AbstractTransactionStatus {//事务对象private final Object transaction;//是否是新事务private final boolean newTransaction;//是否是新的事务同步private final boolean newSynchronization;//是否是只读事务private final boolean readOnly;private final boolean debug;//挂起的资源private final Object suspendedResources;public DefaultTransactionStatus(@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,boolean readOnly, boolean debug, @Nullable Object suspendedResources) {this.transaction = transaction;this.newTransaction = newTransaction;this.newSynchronization = newSynchronization;this.readOnly = readOnly;this.debug = debug;this.suspendedResources = suspendedResources;}public Object getTransaction() {Assert.state(this.transaction != null, "No transaction active");return this.transaction;}public boolean hasTransaction() {return (this.transaction != null);}@Overridepublic boolean isNewTransaction() {return (hasTransaction() && this.newTransaction);}public boolean isNewSynchronization() {return this.newSynchronization;}public boolean isReadOnly() {return this.readOnly;}public boolean isDebug() {return this.debug;}@Nullablepublic Object getSuspendedResources() {return this.suspendedResources;}//---------------------------------------------------------------------// Enable functionality through underlying transaction object//---------------------------------------------------------------------//对AbstractTransactionStatus中的该方法进行重写,通过检查事务对象中的rollback-only来确定返回值//如果事务对象是SmartTransactionObject,就从该事务对象中获取rollback-only状态值//DataSourceTransactionObject就是SmartTransactionObject类型的对象,DataSourceTransactionObject通过其持有的ConnectionHolder中的rollback-only值进行返回public boolean isGlobalRollbackOnly() {return ((this.transaction instanceof SmartTransactionObject) &&((SmartTransactionObject) this.transaction).isRollbackOnly());}@Overrideprotected SavepointManager getSavepointManager() {Object transaction = this.transaction;if (!(transaction instanceof SavepointManager)) {throw new NestedTransactionNotSupportedException("Transaction object [" + this.transaction + "] does not support savepoints");}return (SavepointManager) transaction;}public boolean isTransactionSavepointManager() {return (this.transaction instanceof SavepointManager);}@Overridepublic void flush() {if (this.transaction instanceof SmartTransactionObject) {((SmartTransactionObject) this.transaction).flush();}}
}

扩展学习

  • Spring 的注解方式的事务实现机制
  • Spring的BeanPostProcessor扩展点实现类AbstractAutoProxyCreator
  • Spring AOP创建代理类之JdkDynamicAopProxy
  • Spring 的注解方式的事务实现机制
  • Spring的TransactionDefinition中定义的事务的传播特性和隔离级别
  • 详解Spring的事务管理PlatformTransactionManager
  • Spring 5.x 源码之旅七十四事务细节分析三

Spring AOP事务实现原理之事务管理器TransactionManager相关推荐

  1. 自定义事务管理器TransactionManager对象

    自定义事务管理器TransactionManager对象 以aop思想,实现事务管理切面 1. DataSource注册容器 <?xml version="1.0" enco ...

  2. spring AOP自定义注解方式实现日志管理

    转:spring AOP自定义注解方式实现日志管理 今天继续实现AOP,到这里我个人认为是最灵活,可扩展的方式了,就拿日志管理来说,用Spring AOP 自定义注解形式实现日志管理.废话不多说,直接 ...

  3. Spring AOP的实现原理及应用场景(通过动态代理)

    点击关注公众号,利用碎片时间学习 AOP的作用 作用:在不修改源代码的情况下,可以实现功能的增强. 传统的纵向体系代码复用: 横向抽取机制(AOP思想): AOP 思想:基于代理思想,对原来目标对象, ...

  4. Spring AOP底层实现原理(动态代理)

    什么是AOP? AOP(面向切面编程)通过预编译的方式 和 运行期动态代理的方式来实现程序功能统一维护的一种方式,是OOP(面向对象编程)的延续.利用AOP可以对业务逻辑的各个部分进行隔离,从而使得业 ...

  5. Spring AOP 执行流程原理

    AOP 执行流程原理 // 执行目标方法时 会先执行 org.springframework.aop.framework.CglibAopProxy .DynamicAdvisedIntercepto ...

  6. Spring事物管理器TransactionManager解析

    Spring框架支持事务管理的核心是事务管理器抽象,对于不同的数据访问框架(如Hibernate)通过实现策略接口PlatformTransactionManager,从而能支持各种数据访问框架的事务 ...

  7. php进程原理_PHP进程管理器php-fpm的工作原理

    PHP进程管理器php-fpm的工作原理 发布时间:2020-07-21 17:46:39 来源:亿速云 阅读:133 作者:小新 今天小编给大家分享的是PHP进程管理器php-fpm的工作原理,相信 ...

  8. oracle 事务实现原理,数据库事务的实现原理

    1. 前言 都知道数据库事务有ACID特性(原子性.一致性.隔离型.持久性),本文简单聊一下它们的实现原理. 2. 日志文件 2.1. redo log redo log叫做重做日志,是用来实现事务的 ...

  9. Spring AOP 的工作原理

    概念 IOC负责将对象动态的注入到容器,从而达到一种需要谁就注入谁,什么时候需要就什么时候注入的效果,可谓是招之则来,挥之则去.想想都觉得爽,如果现实生活中也有这本事那就爽歪歪了,至于有多爽,各位自己 ...

最新文章

  1. winpcap4.1.2手动清理关键
  2. django前后端结合_一图看懂Django和DRF
  3. Nginx(四):Nginx配置实战
  4. 如何获取Google地图API密钥?(翻译版)
  5. 2018前端学习总结
  6. 【mysql5.6】数据类型
  7. 微软“照片”应用Raw 格式图像编码器漏洞 (CVE-2021-24091)的技术分析
  8. Android 解决qq分享后返回程序出现的Bug
  9. 【mysql】Filesort on too many rows解决方法
  10. 打印机驱动下载后只能打印单面(设置双面打印)解决方法
  11. android在体检报告叫什么,体检报告检测分析
  12. 【DG】物理DG中LNSn、NSS、NSA进程
  13. PYTHON-音视频合并方法
  14. IEEE格式如何使用在线参考文献生成器
  15. vue3 script setup写法
  16. 全球霸榜的Dell EMC VxRail,靠什么赢得超融合客户认可?
  17. python爬虫爬微信数据可信吗_我用 Python 爬取微信好友,最后发现一个大秘密
  18. c语言程序表达语句,《C语言程序设计》讲稿.doc
  19. D神文木源:创业和做生意不同,区块链现在没有什么创业精神
  20. 使用selenium和phantomjs解决爬虫中对渲染页面的爬取

热门文章

  1. coco128-seg数据集分析
  2. SpringCloud微服务基础 Eureka、Feign、Ribbon、Zuul、Hystrix、配置中心的基础使用
  3. 【数学建模学习】偏最小二乘回归PLSR原理和板子
  4. C++数据类型转化使用方法,static_cast,dynamic_cast,dynamic_pointer_cast,dynamic_pointer_cast等
  5. 【Linux】system V 消息队列 | system V 信号量(简单赘述)
  6. pythonjson中list操作_Python中json的简单读写操作
  7. txt文件转为mat文件
  8. L2-003 月饼 (25 分) c语言
  9. Spark之cache ,persist ,checkpoint ,广播变量及其案例 : 根据IP地址(浏览器访问日志获取) / 经度纬度定位地理位置案例(7)
  10. ES10新特性01-ES10新特性