首先看例子,这例子摘抄自开涛的跟我学spring3。

@Test

public void testPlatformTransactionManager() {

DefaultTransactionDefinition def = new DefaultTransactionDefinition();

def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);

def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

TransactionStatus status = txManager.getTransaction(def);

try {

jdbcTemplate.update(INSERT_SQL, "test");

txManager.commit(status);

} catch (RuntimeException e) {

txManager.rollback(status);

}

}

重要的代码在上面高亮处。

在执行jdbcTemplate.update的时候使用的是datasource.getConection获取连接。

实际上,

  • 在执行txManager.getTransaction(def);的时候,应该会设置:conection.setAutoConmmit(false)。
  • 在执行txManager.commit(status);的时候,应该是执行conection.commit();
  • 在执行txManager. rollback (status);的时候,应该是执行conection. rollback ();

但是,Spring是如何保证,txManager中的conn就是jdbcTemplate中的conn的呢。从这点出发,开始看源代码。

因为是执行的jdbc操作,这里的txManager是DataSourceTransactionManager。我们来看代码:

getTransaction方法:

getTransaction方法在DataSourceTransactionManager的超类中,也就是AbstractPlatformTransactionManager,我们来看方法:

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {

Object transaction = doGetTransaction();

// Cache debug flag to avoid repeated checks.

boolean debugEnabled = logger.isDebugEnabled();

if (definition == null) {

// Use defaults if no transaction definition given.

definition = new DefaultTransactionDefinition();

}

if (isExistingTransaction(transaction)) {

// Existing transaction found -> check propagation behavior to find out how to behave.

return handleExistingTransaction(definition, transaction, debugEnabled);

}

// Check definition settings for new transaction.

if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {

throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());

}

// No existing transaction found -> check propagation behavior to find out how to proceed.

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {

throw new IllegalTransactionStateException(

"No existing transaction found for transaction marked with propagation 'mandatory'");

}

else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

SuspendedResourcesHolder suspendedResources = suspend(null);

if (debugEnabled) {

logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);

}

try {

boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

DefaultTransactionStatus status = newTransactionStatus(

definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

doBegin(transaction, definition);

prepareSynchronization(status, definition);

return status;

}

catch (RuntimeException ex) {

resume(null, suspendedResources);

throw ex;

}

catch (Error err) {

resume(null, suspendedResources);

throw err;

}

}

else {

// Create "empty" transaction: no actual transaction, but potentially synchronization.

boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);

}

}

先看第一句,

Object transaction = doGetTransaction();

方法在AbstractPlatformTransactionManager中,方法为:

protected abstract Object doGetTransaction() throws TransactionException;

这是典型的模板方法设计模式,AbstractPlatformTransactionManager作为抽象类,定义了getTransaction方法,并且设置为final,然后方法内部调用的部分方法是protected abstract的,交给子类去实现。

我们来看在DataSourceTransactionManager类中的doGetTransaction方法的定义:

@Override

protected Object doGetTransaction() {

DataSourceTransactionObject txObject = new DataSourceTransactionObject();

txObject.setSavepointAllowed(isNestedTransactionAllowed());

ConnectionHolder conHolder =

(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);

txObject.setConnectionHolder(conHolder, false);

return txObject;

}

注意这里,是new了一个DataSourceTransactionObject对象,重要的是高亮的两句。txObject中有一个ConnectionHolder对象,这么说来,在这一步的时候有可能已经在事务对象(DataSourceTransactionObject)中,保存了一个ConnectionHolder对象,顾名思义,ConnectionHolder中必然有Connection。如果是这样,我们只要确定,在执行jdbc操作的时候使用的Connection和这个ConnectionHolder中的是同一个就可以了。我们先看ConnectionHolder的结构。

确实如我们所想。

我们再看TransactionSynchronizationManager.getResource(this.dataSource);代码如何获取ConnectionHolder的。

TransactionSynchronizationManager这个名字,应该是支持多线程并发读取的。我们看代码。

public static Object getResource(Object key) {

Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);

Object value = doGetResource(actualKey);

if (value != null && logger.isTraceEnabled()) {

logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +

Thread.currentThread().getName() + "]");

}

return value;

}

看Object value = doGetResource(actualKey);代码:

private static Object doGetResource(Object actualKey) {

Map<Object, Object> map = resources.get();

if (map == null) {

return null;

}

Object value = map.get(actualKey);

// Transparently remove ResourceHolder that was marked as void...

if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {

map.remove(actualKey);

// Remove entire ThreadLocal if empty...

if (map.isEmpty()) {

resources.remove();

}

value = null;

}

return value;

}

高亮代码,看起来就是从一个map中获取了返回的结果,获取的时候使用的key是上一个方法传入的datasource。

看看这个map是什么。

private static final ThreadLocal<Map<Object, Object>> resources =

new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

看来是ThreadLocal对象。

那么这个对象是在什么时候初始化的呢。

经过查看是在这个方法:

public static void bindResource(Object key, Object value) throws IllegalStateException {

那么那个地方调了这个方法呢?

经过查看,又回到了DataSourceTransactionManager类:

@Override

protected void doResume(Object transaction, Object suspendedResources) {

ConnectionHolder conHolder = (ConnectionHolder) suspendedResources;

TransactionSynchronizationManager.bindResource(this.dataSource, conHolder);

}

但是这个是在事务执行完毕的时候执行的,所以如果我们是第一次在当前线程执行事务,那么回到最初的代码:

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {

Object transaction = doGetTransaction();

// Cache debug flag to avoid repeated checks.

boolean debugEnabled = logger.isDebugEnabled();

if (definition == null) {

// Use defaults if no transaction definition given.

definition = new DefaultTransactionDefinition();

}

if (isExistingTransaction(transaction)) {

// Existing transaction found -> check propagation behavior to find out how to behave.

return handleExistingTransaction(definition, transaction, debugEnabled);

}

// Check definition settings for new transaction.

if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {

throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());

}

// No existing transaction found -> check propagation behavior to find out how to proceed.

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {

throw new IllegalTransactionStateException(

"No existing transaction found for transaction marked with propagation 'mandatory'");

}

else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

SuspendedResourcesHolder suspendedResources = suspend(null);

if (debugEnabled) {

logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);

}

try {

boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

DefaultTransactionStatus status = newTransactionStatus(

definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

doBegin(transaction, definition);

prepareSynchronization(status, definition);

return status;

}

catch (RuntimeException ex) {

resume(null, suspendedResources);

throw ex;

}

catch (Error err) {

resume(null, suspendedResources);

throw err;

}

}

else {

// Create "empty" transaction: no actual transaction, but potentially synchronization.

boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);

}

}

Object transaction = doGetTransaction();

这里的transaction中应该是没有connection的。

继续往下看:

if (isExistingTransaction(transaction)) {

// Existing transaction found -> check propagation behavior to find out how to behave.

return handleExistingTransaction(definition, transaction, debugEnabled);

}

其中,isExistingTransaction:

@Override

protected boolean isExistingTransaction(Object transaction) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());

}

这是是判断txObject种有没有ConnectionHolder,也就是当前线程是否已经执行过事务。

我们忽略有的情况,主要看没有的情况,也就是说当前线程第一次处理事务的情况。

继续看最初的代码,主要看这段:

else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||

definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

SuspendedResourcesHolder suspendedResources = suspend(null);

if (debugEnabled) {

logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);

}

try {

boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

DefaultTransactionStatus status = newTransactionStatus(

doBegin(transaction, definition);

prepareSynchronization(status, definition);

return status;

}

catch (RuntimeException ex) {

resume(null, suspendedResources);

throw ex;

}

catch (Error err) {

resume(null, suspendedResources);

throw err;

}

}

看doBegin(transaction, definition);

@Override

protected void doBegin(Object transaction, TransactionDefinition definition) {

DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

Connection con = null;

try {

if (txObject.getConnectionHolder() == null ||

txObject.getConnectionHolder().isSynchronizedWithTransaction()) {

Connection newCon = this.dataSource.getConnection();

if (logger.isDebugEnabled()) {

logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");

}

txObject.setConnectionHolder(new ConnectionHolder(newCon), true);

}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);

con = txObject.getConnectionHolder().getConnection();

Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);

txObject.setPreviousIsolationLevel(previousIsolationLevel);

// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,

// so we don't want to do it unnecessarily (for example if we've explicitly

// configured the connection pool to set it already).

if (con.getAutoCommit()) {

txObject.setMustRestoreAutoCommit(true);

if (logger.isDebugEnabled()) {

logger.debug("Switching JDBC Connection [" + con + "] to manual commit");

}

con.setAutoCommit(false);

}

txObject.getConnectionHolder().setTransactionActive(true);

int timeout = determineTimeout(definition);

if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {

txObject.getConnectionHolder().setTimeoutInSeconds(timeout);

}

// Bind the session holder to the thread.

if (txObject.isNewConnectionHolder()) {

TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());

}

}

catch (Throwable ex) {

if (txObject.isNewConnectionHolder()) {

DataSourceUtils.releaseConnection(con, this.dataSource);

txObject.setConnectionHolder(null, false);

}

throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);

}

}

这里新建了一个Connection,并且将这个Connection绑定到了TransactionSynchronizationManager中,也就是上面的:

private static final ThreadLocal<Map<Object, Object>> resources =

new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

至此,我们只需要确定,我们使用jdbcTemplate.update的时候,connection也是从TransactionSynchronizationManager获取的就好。

在JdbcTemplate中,我们找到它使用获得Connection的方式是:

Connection con = DataSourceUtils.getConnection(getDataSource());

也就是:

public static Connection doGetConnection(DataSource dataSource) throws SQLException {

Assert.notNull(dataSource, "No DataSource specified");

ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);

if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {

conHolder.requested();

if (!conHolder.hasConnection()) {

logger.debug("Fetching resumed JDBC Connection from DataSource");

conHolder.setConnection(dataSource.getConnection());

}

return conHolder.getConnection();

}

// Else we either got no holder or an empty thread-bound holder here.

logger.debug("Fetching JDBC Connection from DataSource");

Connection con = dataSource.getConnection();

if (TransactionSynchronizationManager.isSynchronizationActive()) {

logger.debug("Registering transaction synchronization for JDBC Connection");

// Use same Connection for further JDBC actions within the transaction.

// Thread-bound object will get removed by synchronization at transaction completion.

ConnectionHolder holderToUse = conHolder;

if (holderToUse == null) {

holderToUse = new ConnectionHolder(con);

}

else {

holderToUse.setConnection(con);

}

holderToUse.requested();

TransactionSynchronizationManager.registerSynchronization(

new ConnectionSynchronization(holderToUse, dataSource));

holderToUse.setSynchronizedWithTransaction(true);

if (holderToUse != conHolder) {

TransactionSynchronizationManager.bindResource(dataSource, holderToUse);

}

}

return con;

}

至此,可以发现:JdbcTemplate在执行sql的时候获取的Conncetion和Transaction的doBegin获取的Conncetion都是从TransactionSynchronizationManager获取的。也就是一个线程对一个Datasource只保持了一个Conn。

这里才发现我的理解错误了。我原以为只要是使用DataSource的getConnection执行的sql都可以被Spring事务管理,还以为Spring对DataSource使用了装饰器模式添加了逻辑,原来是我想错了,只有使用Spirng的JdbcTemplate或者DataSourceUtils.getConnection类获得的连接才会被Spring事务管理。

如下代码:

@Transactional

public void transactionTest() throws SQLException {

Connection conn = DataSourceUtils.getConnection(ds);

try {

PreparedStatement st = conn.prepareStatement("update t_person t set t.age = ? where t.id = 1");

st.setInt(1, 1000);

st.execute();

throw new RuntimeException();

}

finally{

//conn.close();

}

}

因为最后抛出了RuntimeException,测试结果显示,最终Spring会将这个事务回滚。

注意注释的那句代码,常理来说我们应该执行关闭,但是关闭之后Spring怎么执行rollback呢,如果放开这句代码,其实Spring仍然可以执行rollback,因为close只是将conn还给连接池,并没有真正的释放链接。但是如果遇到连接真的被关闭,那么在关闭的时候会触发自动提交。所以这里还是不要关闭。交给Spring事务去关闭。

这种写法很难理解,所以尽量不要使用吧。

如果改为:

Connection conn = ds.getConnection();

经过测试,不能回滚。

使用jdbcTemp的方式很简洁,而且能正常回滚:

jdbcTemplate.execute("update t_person t set t.age = 800 where t.id = 1");

hrow new RuntimeException();

转载于:https://www.cnblogs.com/xiaolang8762400/p/7407283.html

Spring事务源码分析相关推荐

  1. Spring源码分析-Spring事务源码分析

    导语      在配置Spring事务管理的时候会用到一个类TransactionInterceptor,从下面的类关系图中可以看到TransactionInterceptor继承了MethodInt ...

  2. Spring事务源码分析责任链事务链事务不生效

    文章目录 前言 带着问题分析源码 事务源码分析 寻找Spring事务源码类 TransactionInterceptor调用栈 分析Spring AOP责任链 分析TransactionInterce ...

  3. spring事务源码分析结合mybatis源码(二)

    让我们继续上篇,分析下如果有第二个调用进入的过程. 代码部分主要是下面这个: if (isExistingTransaction(transaction)) {return handleExistin ...

  4. springboot 事务_原创002 | 搭上SpringBoot事务源码分析专车

    前言 如果这是你第二次看到师长,说明你在觊觎我的美色! 点赞+关注再看,养成习惯 没别的意思,就是需要你的窥屏^_^ 专车介绍 该趟专车是开往Spring Boot事务源码分析的专车 专车问题 为什么 ...

  5. springboot事务回滚源码_002 | 搭上SpringBoot事务源码分析专车

    发车啦,发车啦,上车要求: 点击左上方的"java进阶架构师"进入页面 选择右上角的"置顶公众号"上车 专车介绍 该趟专车是开往Spring Boot事务源码分 ...

  6. Spring Cloud源码分析(二)Ribbon(续)

    因文章长度限制,故分为两篇.上一篇:<Spring Cloud源码分析(二)Ribbon> 负载均衡策略 通过上一篇对Ribbon的源码解读,我们已经对Ribbon实现的负载均衡器以及其中 ...

  7. Spring AOP 源码分析 - 拦截器链的执行过程

    1.简介 本篇文章是 AOP 源码分析系列文章的最后一篇文章,在前面的两篇文章中,我分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在我们的得 ...

  8. Spring AOP 源码分析 - 创建代理对象

    1.简介 在上一篇文章中,我分析了 Spring 是如何为目标 bean 筛选合适的通知器的.现在通知器选好了,接下来就要通过代理的方式将通知器(Advisor)所持有的通知(Advice)织入到 b ...

  9. Spring AOP 源码分析 - 筛选合适的通知器

    1.简介 从本篇文章开始,我将会对 Spring AOP 部分的源码进行分析.本文是 Spring AOP 源码分析系列文章的第二篇,本文主要分析 Spring AOP 是如何为目标 bean 筛选出 ...

最新文章

  1. 23-hadoop-hive的DDL和DML操作
  2. CTOR在区块熵编码中的优点
  3. 组策略 之 驱动器映射
  4. 简单理解:同步、异步、阻塞、非阻塞
  5. MySQL读写分离事务策略实现
  6. 12张图带你彻底理解分布式事务产生的场景和解决方案!!
  7. JavaWeb(十七)——JSP中的九个内置对象
  8. android json 解析图片,JSON解析并获取android中的图像
  9. 采用这套全方位监控方案,立刻规避90%采购风险(附体验demo)
  10. 苹果CMS小俊XG013主题模板
  11. SqlServer数据组织结构
  12. QT中ui更改后不能更新的解决方法
  13. delphi 异步 调用 带参数_Dubbo 关于同步/异步调用的几种方式
  14. 在 Linux 下建立 FTP 搜索引擎
  15. 我的Android进阶之旅------Android【设置】-【语言和输入法】-【语言】列表中找到相应语言所对应的列表项
  16. HTML三只松鼠怎么编写,卖萌的设计体验 ——访三只松鼠首席设计师 李子明(原创文章)...
  17. mac解压缩命令大全
  18. java channel midi_为Java程序中添加播放MIDI音乐功能
  19. 靶机17 GROTESQUE: 3.0.1
  20. 【区块链与密码学】第9-3讲:群签名方案的安全性要求

热门文章

  1. ubuntu下vim的命令及使用方法
  2. 【Linux】一步一步学Linux——gdb命令(258)
  3. android优化最强软件,最强大的安卓优化工具诞生,让手机流畅度提升75%
  4. seq2seq模型_Pytorch学习记录-Seq2Seq模型对比
  5. 【设计模式】C++单例模式
  6. Android 从ImageView中获取Bitmap对象方法
  7. 锐浪报表 多条数据集合到一个二维码中_【小麦课堂】快速查询明细数据的操作...
  8. leetcode(3)---寻找最大字符串
  9. 最小覆盖字串—leetcode76
  10. C++操作符的优先级 及其记忆方法