文章目录

  • 一、前言
  • 二、TransactionSynchronization
    • 1. TransactionSynchronization
      • 1.1 TransactionSynchronization 的定义
      • 1.2 TransactionSynchronizationManager
    • 2. TransactionSynchronization 原理简述
      • 2.1 TransactionSynchronization#beforeCommit
      • 2.2 TransactionSynchronization#beforeCompletion
      • 2.3 TransactionSynchronization#afterCommit
      • 2.4 TransactionSynchronization#afterCompletion
    • 3. 总结
  • 三、TransactionalEventListener
    • 1. TransactionalEventListener
    • 2. 执行流程
      • 2.1 EventListenerMethodProcessor
        • 2.1.1 EventListenerMethodProcessor 的注入
        • 2.1.2 EventListenerMethodProcessor 的调用
      • 2.2 EventListenerFactory
      • 2.3 TransactionalApplicationListenerMethodAdapter
        • 2.3.1 TransactionalApplicationListenerSynchronization
        • 2.3.2 ApplicationListenerMethodAdapter#processEvent

一、前言

本文是 Spring源码分析的衍生文章。主要是因为本人菜鸡,在分析源码的过程中还有一些其他的内容不理解,故开设衍生篇来完善内容以学习。

全集目录:Spring源码分析:全集整理


背景不知道写啥

二、TransactionSynchronization

1. TransactionSynchronization

我们这里以一个 Demo 为例,如下:当调用 DemoService#testTransactionSynchronization 方法时会往sys_role 插入role_id = 1 和 role_id = 2 的两条记录。同时该方法通过 @Transactional(rollbackFor = Exception.class) 开启了事务。

@Service
public class DemoServiceImpl implements DemoService {/*** 节省空间 addRole 和 addPermission 的实现就不给出. 就是简单的往表里插入一条记录* @return*/@Transactional(rollbackFor = Exception.class)@Overridepublic String testTransactionSynchronization() {// step1 : 在 sys_role 表中插入一条 role_id = 1 的记录addRole(1);// step2 : 在 sys_role 表中插入一条 role_id = 2 的记录addRole(2);System.out.println("end");return "";}//  在 sys_role 表中插入一条 role_id = id 的记录@Overridepublic void addRole(int id) {...}// step3 : 在 sys_rermission 表中插入一条 permission_id = id 的记录。节省空间不给出实现@Overridepublic void addPermission(int id) {...}}

那么可以预想到,step1 和 step 2 要么同时成功,要么同时失败、现在更改一下需求 :增加 step3 在 DemoService#testTransactionSynchronization 方法的事务提交后才执行。

实现方法有若干种,这里介绍本文的 TransactionSynchronization 的使用。借助 TransactionSynchronization,可以实现在事务挂起、恢复、提交前后、提交后等时机进行完成指定的操作。具体实现如下:

    @Transactional(rollbackFor = Exception.class)@Overridepublic String testTransactionSynchronization() {// step1 : 在 sys_role 表中插入一条 role_id = 1 的记录addRole(1);// step2 : 在 sys_role 表中插入一条 role_id = 2 的记录addRole(2);// 向事务同步管理器注册一个 TransactionSynchronization 匿名实现类,作用是当前事务提交后,执行 addPermission 方法。TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {// 当前事务提交后触发该方法// step3 : 在 sys_rermission 表中插入一条 permission_id = id 的记录。@Overridepublic void afterCommit() {addPermission(1);}});System.out.println("end");return "";}

上面的实际即是:当 testTransactionSynchronization 的事务提交后,才会执行 TransactionSynchronizationManager#registerSynchronization 注册的TransactionSynchronization。这里我们实现了 TransactionSynchronization#afterCommit,即会在事务提交后执行。


看到了上面的 Demo,下面我们来了解一下涉及到的 TransactionSynchronization 和 TransactionSynchronizationManager

1.1 TransactionSynchronization 的定义

TransactionSynchronization 作为事务同步回调的接口,可以实现 Ordered 接口来影响它们的执行顺序。 未实现 Ordered 接口的同步将附加到同步链的末尾。TransactionSynchronization 提供了很多方法,定义如下 :

public interface TransactionSynchronization extends Flushable {/** Completion status in case of proper commit. */// 正确提交时的完成状态int STATUS_COMMITTED = 0;/** Completion status in case of proper rollback. */// 在正确回滚的情况下完成状态int STATUS_ROLLED_BACK = 1;/** Completion status in case of heuristic mixed completion or system errors. */// 在启发式混合完成或系统错误的情况下的完成状态int STATUS_UNKNOWN = 2;// 事务挂起时调用。 如果管理任何资源,应该从 TransactionSynchronizationManager 取消绑定资源。default void suspend() {}// 事务恢复时调用。如果管理任何资源,应该将资源重新绑定到 TransactionSynchronizationManagerdefault void resume() {}// 将底层会话刷新到数据存储区(如果适用):例如,Hibernate/JPA 会话。@Overridedefault void flush() {}// 事务提交前调用。此处若发生异常,会导致回滚。default void beforeCommit(boolean readOnly) {}// 事务提交前, 在 beforeCommit 后调用。此处若发生异常,不会导致回滚。default void beforeCompletion() {}// 事务提交后调用default void afterCommit() {}// 事务提交 或回滚后执行。default void afterCompletion(int status) {}
}

1.2 TransactionSynchronizationManager

TransactionSynchronizationManager 事务同步管理器,保存的是各个线程中的事务信息。Spring 在事务过程中通过此类来管理事务。部分定义如下:

public abstract class TransactionSynchronizationManager {//线程上下文中保存着【线程池对象:ConnectionHolder】的Map对象。线程可以通过该属性获取到同一个Connection对象。private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");//事务同步器,是Spring交由程序员进行扩展的代码,每个线程可以注册N个事务同步器。private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");// 事务的名称  private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name");// 事务是否是只读  private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");// 事务的隔离级别private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");// 事务是否开启   actual:真实的private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");....
}

其中 TransactionSynchronizationManager#registerSynchronization 实现如下:

 public static void registerSynchronization(TransactionSynchronization synchronization)throws IllegalStateException {Assert.notNull(synchronization, "TransactionSynchronization must not be null");// 获取当前线程绑定的 TransactionSynchronization 集合。Set<TransactionSynchronization> synchs = synchronizations.get();if (synchs == null) {throw new IllegalStateException("Transaction synchronization is not active");}// 将当前 新增的  TransactionSynchronization 添加到集合中synchs.add(synchronization);}

可以看到,在 TransactionSynchronizationManager#registerSynchronization中会将注册的TransactionSynchronization 添加到 TransactionSynchronizationManager#synchronizations集合中,当事务执行到指定阶段后进行触发,我们下面来详细看一看。

2. TransactionSynchronization 原理简述

我们这里不关注 事务的挂起恢复等场景(太复杂,写一半放弃了 ),只关注 beforeCommit、beforeCompletion、afterCommit、afterCompletion 四个常用的方法 的调用时机 。常规的调用顺序为 :

业务代码  -> beforeCommit -> beforeCompletion -> afterCommit -> afterCompletion

在 Spring 中事务提交会调用 AbstractPlatformTransactionManager#processCommit 方法,上述四个方法都在该方法中有调用,其实现如下:

 private void processCommit(DefaultTransactionStatus status) throws TransactionException {try {boolean beforeCompletionInvoked = false;try {boolean unexpectedRollback = false;// 预留操作prepareForCommit(status);// 调用自定义触发器的方法// 1. 触发 TransactionSynchronization#beforeCommittriggerBeforeCommit(status);// 2.1 触发 TransactionSynchronization#beforeCompletiontriggerBeforeCompletion(status);beforeCompletionInvoked = true;// 如果设置了保存点信息if (status.hasSavepoint()) {// 清除保存点信息unexpectedRollback = status.isGlobalRollbackOnly();status.releaseHeldSavepoint();}// 如果是新事物else if (status.isNewTransaction()) {unexpectedRollback = status.isGlobalRollbackOnly();// 如果是独立事务则直接提交doCommit(status);}// 不是新事物并不会直接提交,而是等最外围事务进行提交。else if (isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = status.isGlobalRollbackOnly();}// Throw UnexpectedRollbackException if we have a global rollback-only// marker but still didn't get a corresponding exception from commit.if (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");}}catch (UnexpectedRollbackException ex) {// can only be caused by doCommit// 4.1 触发 TransactionSynchronization#afterCompletiontriggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);throw ex;}catch (TransactionException ex) {// can only be caused by doCommitif (isRollbackOnCommitFailure()) {// 2.2 触发 TransactionSynchronization#beforeCompletiondoRollbackOnCommitException(status, ex);}else {// 4.2 触发 TransactionSynchronization#afterCompletiontriggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);}throw ex;}catch (RuntimeException | Error ex) {if (!beforeCompletionInvoked) {triggerBeforeCompletion(status);}// 提交过程中会出现异常则回滚doRollbackOnCommitException(status, ex);throw ex;}// Trigger afterCommit callbacks, with an exception thrown there// propagated to callers but the transaction still considered as committed.try {// 3. 触发 TransactionSynchronization#afterCommittriggerAfterCommit(status);}finally {// 4.3 触发 TransactionSynchronization#afterCompletiontriggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);}}finally {// 清理事务信息cleanupAfterCompletion(status);}}

上面的代码中我们标注了每个方法的调用,下面我们具体来看

2.1 TransactionSynchronization#beforeCommit

TransactionSynchronization#beforeCommit 在事务提交前触发,在当前方法抛出异常会导致整个事务回滚。

上面代码注释标明在 AbstractPlatformTransactionManager#triggerBeforeCommit 方法中调用了该方法,其实现如下:

 protected final void triggerBeforeCommit(DefaultTransactionStatus status) {if (status.isNewSynchronization()) {// 触发 TransactionSynchronization#beforeCommit 方法。入参是当前事务是否只读TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());}}

TransactionSynchronizationUtils#triggerBeforeCommit 实现如下:

 public static void triggerBeforeCommit(boolean readOnly) {// 从 TransactionSynchronizationManager 中获取所有注册的 TransactionSynchronization 触发 beforeCommit 方法for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {synchronization.beforeCommit(readOnly);}}

2.2 TransactionSynchronization#beforeCompletion

TransactionSynchronization#beforeCompletion 在事务提交前,在 TransactionSynchronization#beforeCommit 后调用。在AbstractPlatformTransactionManager#triggerBeforeCompletion 方法调用时自身捕获了异常打印了 debug 级别日志,所以如果该方法抛出异常并不会导致事务回滚。

AbstractPlatformTransactionManager#triggerBeforeCompletion 实现如下:

 protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {if (status.isNewSynchronization()) {if (status.isDebug()) {logger.trace("Triggering beforeCompletion synchronization");}TransactionSynchronizationUtils.triggerBeforeCompletion();}}

TransactionSynchronizationUtils#triggerBeforeCompletion 实现如下

 public static void triggerBeforeCompletion() {for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {try {synchronization.beforeCompletion();}catch (Throwable ex) {// 捕获beforeCompletion抛出的异常,打印 debug 日志、logger.debug("TransactionSynchronization.beforeCompletion threw exception", ex);}}}

这里注意 beforeCompletion 的调用场景有两处。

  • 代码注释 2.1 处是常规流程的执行
  • 代码注释 2.2 处则是执行出现异常时,即当代码执行了 代码注释1处后抛出异常时执行。

2.3 TransactionSynchronization#afterCommit

TransactionSynchronization#afterCommit 在事务提交后调用。该方法抛出异常也不会导致事务回滚但是会触发 TransactionSynchronization#afterCompletion 方法。

AbstractPlatformTransactionManager#triggerAfterCommit 实现如下:

 private void triggerAfterCommit(DefaultTransactionStatus status) {if (status.isNewSynchronization()) {TransactionSynchronizationUtils.triggerAfterCommit();}}

TransactionSynchronizationUtils#triggerAfterCommit 实现如下:

 public static void triggerAfterCommit() {invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());}public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {if (synchronizations != null) {for (TransactionSynchronization synchronization : synchronizations) {synchronization.afterCommit();}}}

2.4 TransactionSynchronization#afterCompletion

TransactionSynchronization#afterCompletion 在 TransactionSynchronization#afterCommit 执行之后调用。无论事务正常提交还是异常回滚,都会触发该方法。

需要注意的是 该方法在 AbstractPlatformTransactionManager#processRollback 中也有调用。AbstractPlatformTransactionManager#processRollback 方法是事务回滚时执行的方法,其内部也是通过 AbstractPlatformTransactionManager#triggerAfterCompletion 来触发 afterCompletion 方法,因此在这里就不再展开。


AbstractPlatformTransactionManager#triggerAfterCompletion

 private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {if (status.isNewSynchronization()) {List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();TransactionSynchronizationManager.clearSynchronization();if (!status.hasTransaction() || status.isNewTransaction()) {// No transaction or new transaction for the current scope ->// invoke the afterCompletion callbacks immediately// 当前作用域没有事务或有新事务, 触发 TransactionSynchronization#afterCompletion  方法invokeAfterCompletion(synchronizations, completionStatus);}else if (!synchronizations.isEmpty()) {// Existing transaction that we participate in, controlled outside// of the scope of this Spring transaction manager -> try to register// an afterCompletion callback with the existing (JTA) transaction.// 我们参与的现有事务,在此 Spring 事务管理器范围之外控制 -> 尝试使用现有(JTA)事务注册 afterCompletion 回调。registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);}}}protected void registerAfterCompletionWithExistingTransaction(Object transaction, List<TransactionSynchronization> synchronizations) throws TransactionException {// 状态置为未知invokeAfterCompletion(synchronizations, TransactionSynchronization.STATUS_UNKNOWN);}protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);}

3. 总结

总体来说 TransactionSynchronization 的逻辑还是比较简单的,如下:

  1. TransactionSynchronizationManager#registerSynchronization 将 TransactionSynchronization 注册到 TransactionSynchronizationManager#synchronizations 中。
  2. 当当前线程准备提交或回滚时会通过TransactionSynchronizationManager#getSynchronizations 获取到 TransactionSynchronizationManager#synchronizations 中的事务同步类。随后执行相应的方法。

三、TransactionalEventListener

在 Spring framework 4.2 之后还可以使用@TransactionalEventListener处理数据库事务提交成功后再执行操作,这种方式比 TransactionSynchronization 更加优雅。我们仍以上面的Demo为例加以改造。

如下: 同样的功能,事务内在 sys_role 表中插入一条 role_id = 10 和 20 两条记录。事务提交后执行DemoServiceImpl#onApplicationEvent 方法调用 addPermission,在 sys_rermission 表中插入一条记录。需要注意,这里不需要再实现 ApplicationListener 接口,否则会调用两次。

@Service
public class DemoServiceImpl implements DemoService, ApplicationContextAware {@Autowiredprivate SysPermissionDao sysPermissionDao;@Autowiredprivate SysRoleDao sysRoleDao;private ApplicationContext applicationContext;@Transactional(rollbackFor = Exception.class)@Overridepublic String testTransactionalEventListener() {// step1 : 在 sys_role 表中插入一条 role_id = 10 的记录addRole(10);// step2 : 在 sys_role 表中插入一条 role_id = 20 的记录addRole(20);this.applicationContext.publishEvent(new DemoApplicationEvent(10));System.out.println("end");return "";}/*** 在 sys_role 表中插入一条记录*/@Overridepublic void addRole(int id) {...}/*** 在 sys_rermission 表中插入一条记录*/@Overridepublic void addPermission(int id) {...}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}public static class DemoApplicationEvent extends ApplicationEvent {@Getterprivate Integer id;public DemoApplicationEvent(Integer source) {super(source);this.id = source;}}// phase 指定了执行阶段为 事务提交后。@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)public void onApplicationEvent(DemoApplicationEvent demoApplicationEvent) {// step3 : 在 sys_rermission 表中插入一条记录addPermission(demoApplicationEvent.getId());}
}

TransactionalEventListener 通过Spring监听器的方式实现了 TransactionSynchronization 的功能,下面我们来看一看具体原理。

1. TransactionalEventListener

首先来看一下 TransactionalEventListener 的定义,如下:

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {// 执行执行阶段,默认事务提交后、除此之外还有 BEFORE_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETIONTransactionPhase phase() default TransactionPhase.AFTER_COMMIT;// 监听器的唯一id,可为空String id() default "";// 如果没有事务正在运行,是否应处理事件boolean fallbackExecution() default false;/*** EventListener classes 属性的别名, 指定监听事件类型* 此侦听器处理的事件类。如果使用单个值指定此属性,则带注释的方法可以选择接受单个参数。 但是,如果此属性指定了多个值,则注释方法不得声明任何参数。*/@AliasFor(annotation = EventListener.class, attribute = "classes")Class<?>[] value() default {};/*** EventListener classes 属性的别名, 指定监听事件类型* 此侦听器处理的事件类。如果使用单个值指定此属性,则带注释的方法可以选择接受单个参数。 但是,如果此属性指定了多个值,则注释方法不得声明任何参数。*/@AliasFor(annotation = EventListener.class, attribute = "classes")Class<?>[] classes() default {};/*** 用于使事件处理有条件的 Spring 表达式语言 (SpEL) 属性。* 默认值为"" ,表示始终处理事件。*/String condition() default "";}

2. 执行流程

下面我们来看看 TransactionalEventListener 注解是如何实现的。

2.1 EventListenerMethodProcessor

首先我们需要知道,一个被 @TransactionalEventListener 注解修饰的普通方法,如何具有监听器的效果?原因就在于 EventListenerMethodProcessor 类。


EventListenerMethodProcessor 的结构如下:

可以看到 EventListenerMethodProcessor 实现了三个关键接口 :

  • SmartInitializingSingleton :afterSingletonsInstantiated 方法会在容器初始化所有非惰性Bean之后调用。EventListenerMethodProcessor 在 afterSingletonsInstantiated 方法中为被 @TransactionalEventListener 注解修饰的方法动态生成了 ApplicationListener 并添加到容器中。当执行的事件发送时,动态生成的 ApplicationListener 监听器则会触发,通过反射调用 注解方法。
  • ApplicationContextAware :EventListenerMethodProcessor 通过 setApplicationContext 方法 获取到 ApplicationContext 实例。
  • BeanFactoryPostProcessor : EventListenerMethodProcessor 通过 postProcessBeanFactory 方法获取到了Spring容器中的所有 EventListenerFactory 实例对象,保存到 EventListenerFactory#eventListenerFactories 中。

下面我们来看看 EventListenerMethodProcessor 的执行过程,如下:

2.1.1 EventListenerMethodProcessor 的注入

Spring 容器启动时会在通过 SpringApplication#createApplicationContext 创建应用上下文,其中调用链如下:

SpringApplication#createApplicationContext ->
AnnotationConfigServletWebServerApplicationContext构造函数 ->
AnnotatedBeanDefinitionReader构造函数 ->
AnnotationConfigUtils#registerAnnotationConfigProcessors

在 AnnotationConfigUtils#registerAnnotationConfigProcessors 中会向容器中注册 EventListenerMethodProcessor 的 BeanDefinition,如下图:

2.1.2 EventListenerMethodProcessor 的调用

Spring 容器启动时会在 AbstractApplicationContext#finishBeanFactoryInitialization 方法中完成非惰性加载 Bean的 的初始化。初始化结束后会调用SmartInitializingSingleton#afterSingletonsInstantiated 方法。EventListenerMethodProcessor#afterSingletonsInstantiated 的实现如下:

 @Overridepublic void afterSingletonsInstantiated() {ConfigurableListableBeanFactory beanFactory = this.beanFactory;Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");// 获取容器中的所有 beanName String[] beanNames = beanFactory.getBeanNamesForType(Object.class);for (String beanName : beanNames) {if (!ScopedProxyUtils.isScopedTarget(beanName)) {Class<?> type = null;try {// 获取 bean的 类型type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);}catch (Throwable ex) {... 日志打印}if (type != null) {// 如果 bean 是 ScopedObject类型,进一步获取真实类型if (ScopedObject.class.isAssignableFrom(type)) {try {Class<?> targetClass = AutoProxyUtils.determineTargetClass(beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));if (targetClass != null) {type = targetClass;}}catch (Throwable ex) {... 日志打印}}try {// 处理beanprocessBean(beanName, type);}catch (Throwable ex) {... 抛出异常}}}}}

可以看到关键逻辑在 EventListenerMethodProcessor#processBean 中,其实现如下:

 private void processBean(final String beanName, final Class<?> targetType) {// nonAnnotatedClasses 缓存未命中当前类 && 给定的类是可携带指定注释的候选者 && 不是 org.springframework. 包下的类或被@Component 注解修饰if (!this.nonAnnotatedClasses.contains(targetType) &&AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&!isSpringContainerClass(targetType)) {Map<Method, EventListener> annotatedMethods = null;try {// 寻找被 @EventListener 注解修饰的方法annotatedMethods = MethodIntrospector.selectMethods(targetType,(MethodIntrospector.MetadataLookup<EventListener>) method ->AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));}catch (Throwable ex) {... 日志打印}if (CollectionUtils.isEmpty(annotatedMethods)) {// 缓存没有被 @EventListener 注解修饰的类this.nonAnnotatedClasses.add(targetType);   ... 日志打印}else {// Non-empty set of methods// 到这里则说明当前类一定有方法被  @EventListener 注解修饰ConfigurableApplicationContext context = this.applicationContext;Assert.state(context != null, "No ApplicationContext set");// 在 postProcessBeanFactory 方法中获取到的容器中的 EventListenerFactory 实现类List<EventListenerFactory> factories = this.eventListenerFactories;Assert.state(factories != null, "EventListenerFactory List not initialized");// 遍历 @EventListener 注解修饰的方法for (Method method : annotatedMethods.keySet()) {// 遍历所有的 EventListenerFactory for (EventListenerFactory factory : factories) {// 如果 EventListenerFactory 支持当前方法if (factory.supportsMethod(method)) {// 获取可调用的方法:context.getType(beanName) 获取到的可能是代理类,这里会找到真正要调用的诶类的方法Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));// 通过 EventListenerFactory#createApplicationListener 方法为找到的方法创建一个 ApplicationListener实现类ApplicationListener<?> applicationListener =factory.createApplicationListener(beanName, targetType, methodToUse);// ApplicationListenerMethodAdapter类型则调用 init 完成初始化if (applicationListener instanceof ApplicationListenerMethodAdapter) {((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);}// applicationListener 添加到 Spring 容器中context.addApplicationListener(applicationListener);break;}}}... 日志打印}}}

总结如下:

  1. EventListenerMethodProcessor 在 EventListenerMethodProcessor#postProcessBeanFactory 中获取到了所有 EventListenerFactory 类型的 bean。(在ConfigurationClassPostProcessor 中将所有bean 的BeanDefinition 扫描生成。而 ConfigurationClassPostProcessor 的优先级在EventListenerMethodProcessor 之前 )。
  2. 随后在EventListenerMethodProcessor#afterSingletonsInstantiated 方法中获取所有bean,筛选出被 @EventListener 注解修饰的类或方法的类。
  3. 随后找到 EventListenerFactory,通过 EventListenerFactory#supportsMethod 判断是可以处理当前类,如果可以则通过 EventListenerFactory#createApplicationListener 为当前类生成一个 ApplicationListener (ApplicationListenerMethodAdapter 或 TransactionalApplicationListenerMethodAdapter)类并添加到 容器中。
  4. 当有指定事件触发后触发容器中的 ApplicationListenerMethodAdapter 或 TransactionalApplicationListenerMethodAdapter 的 onApplicationEvent 方法。在这些方法中会通过反射调用被 @EventListener 修饰的方法。

2.2 EventListenerFactory

EventListenerFactory 接口定义如下

public interface EventListenerFactory {// 是否支持处理当前指定方法boolean supportsMethod(Method method);// 为当前指定类的指定方法创建一个 ApplicationListenerApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method);}

默认情况下 EventListenerFactory 有 DefaultEventListenerFactory 和 TransactionalEventListenerFactory 两个实现类。我们这里看的是 @TransactionalEventListener 注解,所以来看一下 TransactionalEventListenerFactory 的实现,如下:

public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {...@Overridepublic boolean supportsMethod(Method method) {// 判断当前方式是否被 TransactionalEventListener 注解修饰return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);}@Overridepublic ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {// 创建一个 TransactionalApplicationListenerMethodAdapter 实现类return new TransactionalApplicationListenerMethodAdapter(beanName, type, method);}
}

这里我们可以了解到 TransactionalEventListenerFactory 会判断:如果方法被TransactionalEventListener 注解修饰,则会为其创建一个 TransactionalApplicationListenerMethodAdapter类作为 ApplicationListener。

2.3 TransactionalApplicationListenerMethodAdapter

TransactionalApplicationListenerMethodAdapter 结构如下:

这里我们简单看一下 TransactionalApplicationListenerMethodAdapter#onApplicationEvent 的实现

 @Overridepublic void onApplicationEvent(ApplicationEvent event) {// 如果当前存在事务. 则通过 TransactionSynchronizationManager 注册TransactionalApplicationListenerSynchronizationif (TransactionSynchronizationManager.isSynchronizationActive() &&TransactionSynchronizationManager.isActualTransactionActive()) {TransactionSynchronizationManager.registerSynchronization(new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));}// 没有事务,判断是否执行事件逻辑else if (this.annotation.fallbackExecution()) { // ...processEvent(event);}else {... 日志打印}}

这里我们看到

  1. 如果当前线程存在活跃事务, 则通过 TransactionSynchronizationManager 注册一个 TransactionalApplicationListenerSynchronization 实例。当事务到达指定阶段后会触发TransactionalApplicationListenerSynchronization 中的阶段方法,而这些方法中则还会调用 TransactionalApplicationListenerMethodAdapter#processEvent 方法。
  2. 如果当前线程不存在活跃事务,则根据 TransactionalEventListener#fallbackExecution 判断,如果为 true,则执行 TransactionalApplicationListenerMethodAdapter#processEvent。其中 processEvent 的实现在其父类 ApplicationListenerMethodAdapter中。

下面我们来详细看一看

2.3.1 TransactionalApplicationListenerSynchronization

TransactionalApplicationListenerSynchronization 的实现如下,其中主要实现了 beforeCommit 和 afterCompletion 两个方法。

class TransactionalApplicationListenerSynchronization<E extends ApplicationEvent>implements TransactionSynchronization {...// 事务提交前判断@Overridepublic void beforeCommit(boolean readOnly) {// 如果指定阶段为事务提交前,则执行 processEventWithCallbacksif (this.listener.getTransactionPhase() == TransactionPhase.BEFORE_COMMIT) {processEventWithCallbacks();}}// 事务提交后判断@Overridepublic void afterCompletion(int status) {TransactionPhase phase = this.listener.getTransactionPhase();// 如果执行阶段为事务提交后 && 事务正常提交if (phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {processEventWithCallbacks();}// 如果执行阶段为事务回滚后 && 事务发生回滚else if (phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {processEventWithCallbacks();}// 如果执行阶段为事务结束 && 事务发生回滚else if (phase == TransactionPhase.AFTER_COMPLETION) {processEventWithCallbacks();}}private void processEventWithCallbacks() {// 执行回调类的回调预处理方法this.callbacks.forEach(callback -> callback.preProcessEvent(this.event));try {// 执行监听事件,这里的 listener 即为上层的 TransactionalApplicationListenerMethodAdapter 实例对象this.listener.processEvent(this.event);}catch (RuntimeException | Error ex) {// 执行回调类的回调后置处理方法this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, ex));throw ex;}// 执行回调类的回调后置处理方法this.callbacks.forEach(callback -> callback.postProcessEvent(this.event, null));}}

2.3.2 ApplicationListenerMethodAdapter#processEvent

ApplicationListenerMethodAdapter#processEvent 实现如下,逻辑比较简单,这里不再赘述。

 public void processEvent(ApplicationEvent event) {// 解析 method 的参数。method 指的是被 TransactionalEventListener  注解修饰的方法Object[] args = resolveArguments(event);// 判断是否可以处理。根据注解 TransactionalEventListener#condition 属性判断if (shouldHandle(event, args)) {// 反射调用 指定的方法,以 args 为入参Object result = doInvoke(args);if (result != null) {// 如果监听方法有返回值,则对返回值处理。会将返回值作为 event 再次发送handleResult(result);}else {logger.trace("No result object given - no result to handle");}}}

以上:如有侵扰,联系删除。 内容仅用于自我记录学习使用。如有错误,欢迎指正

Spring 源码分析衍生篇十三 :事务扩展机制 TransactionSynchronization相关推荐

  1. Spring 源码分析衍生篇十 :Last-Modified 缓存机制

    文章目录 一.前言 二.Last-Modify 三.实现方案 1. 实现 org.springframework.web.servlet.mvc.LastModified接口 1.1. 简单演示 1. ...

  2. Spring 源码分析衍生篇三 : lookup-method 和 replaced-method

    文章目录 一.前言 二.基本使用 1. 作用 三.原理实现 1. 预处理 1.1 AbstractBeanDefinition#prepareMethodOverrides 1.2 Autowired ...

  3. dubbo源码分析系列(1)扩展机制的实现

    1 系列目录 dubbo源码分析系列(1)扩展机制的实现 dubbo源码分析系列(2)服务的发布 dubbo源码分析系列(3)服务的引用 dubbo源码分析系列(4)dubbo通信设计 2 SPI扩展 ...

  4. Spring源码分析前篇

    穷举法:把生活所见所闻全部归纳到我们所学的知识体系中来,加以思考总结变成自己的东西.(举例子) 类比法:用自己熟悉的方法(利用自己已有的知识体系),去对比学习新的知识. 学习最好的方法:就是重复 Sp ...

  5. Dubbo源码分析系列之-深入Dubbo扩展机制

    导语:   在之前的博客中分析过Java的SPI机制,其实Dubbo的扩展点加载机制也是从JDK表中的SPI(Service Provider Interface)机制中开发而来,只不过在原生的基础上 ...

  6. spring源码分析之spring-core总结篇

    1.spring-core概览 spring-core是spring框架的基石,它为spring框架提供了基础的支持. spring-core从源码上看,分为6个package,分别是asm,cgli ...

  7. spring源码分析之BeanDefinition相关

    目录 前言: BeanDefinition的家族系列 1.BeanDefintion的UML类图 2.BeanDefintion家族类详解 2.1.通用接口 2.2.BeanDefintion接口 2 ...

  8. Spring 源码分析 (一)——迈向 Spring 之路

    一切都是从 Bean 开始的 在 1996 年,Java 还只是一个新兴的.初出茅庐的编程语言.人们之所以关注她仅仅是因为,可以使用 Java 的 Applet 来开发 Web 应用.但这些开发者很快 ...

  9. Spring源码分析之Bean的创建过程详解

    前文传送门: Spring源码分析之预启动流程 Spring源码分析之BeanFactory体系结构 Spring源码分析之BeanFactoryPostProcessor调用过程详解 本文内容: 在 ...

最新文章

  1. 2021-2027年中国市医疗电子场投资分析及前景预测报告
  2. 所有类是object的子类,但是又可以继承一个其他类解析
  3. Deep Residual Learning for Image Recognition(MSRA-深度残差学习)
  4. WeChat之小工具:基于C++程序代码设计的查看微信撤销、撤回消息(包括文本、图片、视频等)GUI小工具
  5. python调用百度接口实现ocr识别_Python调用百度OCR实现图片文字识别的示例代码
  6. Tomcat container 内部容器
  7. 20155322 《Java程序设计》课堂实践项目 数据库-3-4
  8. spark structured stream的Update模式
  9. Android新的menu实现——ActionMode
  10. 俄罗斯黑客被指攻击本国的工业组织机构
  11. oracle pfile 注释,Oracle pfile/spfile参数文件详解
  12. linux系统vmd软件如何使用,科学网—VMD (linux下分子可视化软件) - 刘雪静的博文...
  13. 无线投影服务器连接投影仪,无线投屏器怎么与投影机连接
  14. 颈椎前路caspar撑开器_Caspar撑开器和颈椎带锁钢板治疗下颈椎骨折脱位
  15. handsontable使用及遇到的坑-前言
  16. 【Android -- 面试】简历模板
  17. 语雀实现收藏网页的功能(借助印象笔记)
  18. 如何更改Linux的ip地址为静态ip(附克隆主机需要做的更改)
  19. android 区位码转汉字,汉字区位码查询系统
  20. openfoam 源代码解析

热门文章

  1. plupload上传整个文件夹
  2. PowerMILL 2018 MTD机床搭建视频教程
  3. oracle数据回流,页面的回流与重绘
  4. 推荐:50个加强表单的jQuery插件
  5. 计算机网络原理 谢希仁(第8版)第四章习题答案
  6. 【python日用】np.linspace
  7. Fiddler 实现手机抓包详解
  8. 【CocosCreator入门】CocosCreator组件 | Mask(遮罩)组件
  9. Linux C编程中_REENTRANT宏
  10. 第一范式,第二范式,第三范式,BCNF的理解