上一篇 Spring框架中的事件和监听器并未对Spring框架中的异步事件涉及太多,所以本篇是对其一个补充。

同步事件有一个主要缺点:它们在所调用线程的本地执行(也就是将所调用线程看成主线程的话,就是在主线程里依次执行)。如果监听器处理同步事件需要5秒钟的响应,则最终结果是用户将在至少5秒内无法看到响应(可以通过Spring框架中的事件和监听器中的例子了解具体)。所以,我们可以通过一个替代方案来解决这个问题 - 异步事件。

接下来也就是介绍Spring框架中的异步事件。老规矩,第一部分深入框架源码,将描述主要组成部分以及它们如何一起协作的。在第二部分,我们将编写一些测试用例来检查异步事件的执行情况。

Spring中的异步事件

在Spring中处理异步事件是基于本地的Java并发解决方案---任务执行器(可以了解下Java Executor框架的内容)。事件由multicastEvent 方法调度。它通过使用java.util.concurrent.Executor接口的实现将事件发送到专用的监听器。Multicaster会调用同步执行器,因为它是默认实现,这点在Spring框架中的事件和监听器有明确的例子,从源码的角度也就是是否设置有SyncTaskExecutor实例。从public void setTaskExecutor(@Nullable Executor taskExecutor)其中,@Nullable 可看出Executor参数可为null,默认不设置的话,multicastEvent也就直接 跳过异步执行了

org.springframework.context.event.SimpleApplicationEventMulticaster

@Overridepublic void multicastEvent(ApplicationEvent event) {multicastEvent(event, resolveDefaultEventType(event));}@Overridepublic void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {Executor executor = getTaskExecutor();if (executor != null) {executor.execute(() -> invokeListener(listener, event));}else {invokeListener(listener, event);}}}private ResolvableType resolveDefaultEventType(ApplicationEvent event) {return ResolvableType.forInstance(event);}/*** Set a custom executor (typically a {@link     org.springframework.core.task.TaskExecutor})* to invoke each listener with.* <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},* executing all listeners synchronously in the calling thread.* <p>Consider specifying an asynchronous task executor here to not block the* caller until all listeners have been executed. However, note that asynchronous* execution will not participate in the caller's thread context (class loader,* transaction association) unless the TaskExecutor explicitly supports this.* @see org.springframework.core.task.SyncTaskExecutor* @see org.springframework.core.task.SimpleAsyncTaskExecutor* @Nullable 可看出Executor参数可为null,默认不设置的话,上面multicastEvent也就直接      * 跳过异步执行了*/public void setTaskExecutor(@Nullable Executor taskExecutor) {this.taskExecutor = taskExecutor;}/*** Return the current task executor for this multicaster.*/@Nullableprotected Executor getTaskExecutor() {return this.taskExecutor;}复制代码

异步执行器的实现可以参考org.springframework.core.task.SimpleAsyncTaskExecutor。这个类为每个提交的任务创建新的线程。然而,它不会重用线程,所以如果我们有很多长执行时间的异步任务需要来处理的时候,线程创建的风险就会变得太大了,会占用大量的资源,不光是cpu还包括jvm。具体源码如下:

    /*** Executes the given task, within a concurrency throttle* if configured (through the superclass's settings).* @see #doExecute(Runnable)*/@Overridepublic void execute(Runnable task) {execute(task, TIMEOUT_INDEFINITE);}/*** Executes the given task, within a concurrency throttle* if configured (through the superclass's settings).* <p>Executes urgent tasks (with 'immediate' timeout) directly,* bypassing the concurrency throttle (if active). All other* tasks are subject to throttling.* @see #TIMEOUT_IMMEDIATE* @see #doExecute(Runnable)*/@Overridepublic void execute(Runnable task, long startTimeout) {Assert.notNull(task, "Runnable must not be null");Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {this.concurrencyThrottle.beforeAccess();doExecute(new ConcurrencyThrottlingRunnable(taskToUse));}else {doExecute(taskToUse);}}@Overridepublic Future<?> submit(Runnable task) {//创建FutureTask<Object> future = new FutureTask<>(task, null);//执行execute(future, TIMEOUT_INDEFINITE);return future;}@Overridepublic <T> Future<T> submit(Callable<T> task) {FutureTask<T> future = new FutureTask<>(task);execute(future, TIMEOUT_INDEFINITE);return future;}/*** Template method for the actual execution of a task.* <p>The default implementation creates a new Thread and starts it.* @param task the Runnable to execute* @see #setThreadFactory* @see #createThread* @see java.lang.Thread#start()*/protected void doExecute(Runnable task) {Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));//可以看出,执行也只是简单的将创建的线程start执行下,别提什么重用了thread.start();}复制代码

为了从线程池功能中受益,我们可以使用另一个Spring的Executor实现,org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor。类如其名,这个Executor允许我们使用线程池。关于线程池的源码,请期待我的Java9的书籍,里面会涉及到这里面的细节分析,也可以参考其他博客的博文(哈哈,我就是打个小广告而已)。

org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

    /*** Return the underlying ThreadPoolExecutor for native access.* @return the underlying ThreadPoolExecutor (never {@code null})* @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet*/public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");return this.threadPoolExecutor;}@Overridepublic void execute(Runnable task) {Executor executor = getThreadPoolExecutor();try {executor.execute(task);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);}}@Overridepublic void execute(Runnable task, long startTimeout) {execute(task);}@Overridepublic Future<?> submit(Runnable task) {ExecutorService executor = getThreadPoolExecutor();try {return executor.submit(task);}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);}}复制代码

写一个Spring中异步事件的例子

我们来编写一个能够同时处理同步和异步事件的multicaster。同步事件将使用本地同步调度程序进行调度(SyncTaskExecutor),异步使用Spring的ThreadPoolTaskExecutor实现。

/*** 下面的注释意思很明显了,不多说了* {@link TaskExecutor} implementation that executes each task <i>synchronously</i>* in the calling thread.** <p>Mainly intended for testing scenarios.** <p>Execution in the calling thread does have the advantage of participating* in it's thread context, for example the thread context class loader or the* thread's current transaction association. That said, in many cases,* asynchronous execution will be preferable: choose an asynchronous* {@code TaskExecutor} instead for such scenarios.** @author Juergen Hoeller* @since 2.0* @see SimpleAsyncTaskExecutor*/
@SuppressWarnings("serial")
public class SyncTaskExecutor implements TaskExecutor, Serializable {/*** Executes the given {@code task} synchronously, through direct* invocation of it's {@link Runnable#run() run()} method.* @throws IllegalArgumentException if the given {@code task} is {@code null}*/@Overridepublic void execute(Runnable task) {Assert.notNull(task, "Runnable must not be null");task.run();}}复制代码

首先,我们需要为我们的测试用例添加一些bean:

<bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />
<bean id="asyncTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><!-- 10 task will be submitted immediately --><property name="corePoolSize" value="10" /><!-- If 10 task are already submitted and treated, we allow to enlarge pool capacity to 15 (10 from core pool size + 5 from max pool size) --><property name="maxPoolSize" value="15" /><!-- Number of tasks that can be placed into waiting queue --><property name="queueCapacity" value="10" />
</bean><bean id="applicationEventMulticaster" class="com.migo.event.SimpleEventMulticaster"><property name="taskExecutor" ref="syncTaskExecutor" /><property name="asyncTaskExecutor" ref="asyncTaskExecutor" />
</bean>
<bean id="taskStatsHolder" class="com.migo.event.TaskStatsHolder" />复制代码

用于测试任务执行结果的两个类:

// TaskStatsHolder.java
/****** Holder bean for all executed tasks.**/
public class TaskStatsHolder {private Map<String, TaskStatData> tasks = new HashMap<String, TaskStatData>();public void addNewTaskStatHolder(String key, TaskStatData value) {tasks.put(key, value);}public TaskStatData getTaskStatHolder(String key) {return tasks.get(key);}
}// TaskStatData.java
/****** Holder class for all statistic data about already executed tasks.**/
public class TaskStatData {private String threadName;private int executionTime;private long startTime;private long endTime;public TaskStatData(String threadName, long startTime, long endTime) {this.threadName = threadName;this.startTime = startTime;this.endTime = endTime;this.executionTime = Math.round((endTime - startTime) / 1000);}public String getThreadName() {return threadName;}public int getExecutionTime() {return this.executionTime;}public long getStartTime() {return this.startTime;}public long getEndTime() {return this.endTime;}@Overridepublic String toString() {StringBuilder result = new StringBuilder();result.append("TaskStatData {thread name: ").append(this.threadName).append(", start time: ").append(new Date(this.startTime));result.append(", end time: ").append(new Date(this.endTime)).append(", execution time: ").append(this.executionTime).append(" seconds}");return result.toString();}}复制代码

如上代码所示,这些都是简单对象。我们会使用这些对象来检查我们的假设和执行结果是否相匹配。两个要分发的事件也很简单:

// ProductChangeFailureEvent.java
/*** This is synchronous event dispatched when one product is modified in the backoffice. * When product's modification fails (database, validation problem), this event is dispatched to* all listeners. It's synchronous because we want to inform the user that some actions were done * after the failure. Otherwise (asynchronous character of event) we shouldn't be able to* know if something was done or not after the dispatch.**/
public class ProductChangeFailureEvent extends ApplicationContextEvent {private static final long serialVersionUID = -1681426286796814792L;public static final String TASK_KEY = "ProductChangeFailureEvent";public ProductChangeFailureEvent(ApplicationContext source) {super(source);}
}// NotifMailDispatchEvent.java
/*** Event dispatched asynchronously every time when we want to send a notification mail. * Notification mails to send should be stored somewhere (filesystem, database...) but in* our case, we'll handle only one notification mail: when one product out-of-stock becomes available again.**/
public class NotifMailDispatchEvent extends ApplicationContextEvent implements AsyncApplicationEvent {private static final long serialVersionUID = 9202282810553100778L;public static final String TASK_KEY = "NotifMailDispatchEvent";public NotifMailDispatchEvent(ApplicationContext source) {super(source);}
}复制代码

而用于处理相应调度事件的监听器也只需要将数据放入TaskStatsHolder实例类中即可:

// ProductChangeFailureListener.java
@Component
public class ProductChangeFailureListener implements ApplicationListener<ProductChangeFailureEvent>{@Overridepublic void onApplicationEvent(ProductChangeFailureEvent event) {long start = System.currentTimeMillis();long end = System.currentTimeMillis();((TaskStatsHolder) event.getApplicationContext().getBean("taskStatsHolder")).addNewTaskStatHolder(ProductChangeFailureEvent.TASK_KEY, new TaskStatData(Thread.currentThread().getName(), start, end));}}// NotifMailDispatchListener.java
@Component
public class NotifMailDispatchListener implements ApplicationListener<NotifMailDispatchEvent>{@Overridepublic void onApplicationEvent(NotifMailDispatchEvent event) throws InterruptedException {long start = System.currentTimeMillis();// sleep 5 seconds to avoid that two listeners execute at the same momentThread.sleep(5000);long end = System.currentTimeMillis();((TaskStatsHolder) event.getApplicationContext().getBean("taskStatsHolder")).addNewTaskStatHolder(NotifMailDispatchEvent.TASK_KEY, new TaskStatData(Thread.currentThread().getName(), start, end));}
}复制代码

用于测试的controller如下所示:

@Controller
public class ProductController {@Autowiredprivate ApplicationContext context;@RequestMapping(value = "/products/change-failure")public String changeFailure() {try {System.out.println("I'm modifying the product but a NullPointerException will be thrown");String name = null;if (name.isEmpty()) {// show error message herethrow new RuntimeException("NullPointerException");}} catch (Exception e) {context.publishEvent(new ProductChangeFailureEvent(context));}return "success";}@RequestMapping(value = "/products/change-success")public String changeSuccess() {System.out.println("Product was correctly changed");context.publishEvent(new NotifMailDispatchEvent(context));return "success";}
}复制代码

最后,测试用例:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:applicationContext-test.xml"})
@WebAppConfiguration
public class SpringSyncAsyncEventsTest {@Autowiredprivate WebApplicationContext wac;@Testpublic void test() {MockMvc mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();// execute both urls simultaneouslymockMvc.perform(get("/products/change-success"));mockMvc.perform(get("/products/change-failure"));// get stats holder and check if both stats are available:// - mail dispatching shouldn't be available because it's executed after a sleep of 5 seconds// - product failure should be available because it's executed synchronously, almost immediately (no operations in listeners)TaskStatsHolder statsHolder = (TaskStatsHolder) this.wac.getBean("taskStatsHolder");TaskStatData mailStatData = statsHolder.getTaskStatHolder(NotifMailDispatchEvent.TASK_KEY);TaskStatData productFailureData = statsHolder.getTaskStatHolder(ProductChangeFailureEvent.TASK_KEY);assertTrue("Task for mail dispatching is executed after 5 seconds, so at this moment, it taskStatsHolder shouldn't contain it", mailStatData == null);assertTrue("productFailureHolder shouldn't be null but it is", productFailureData != null);assertTrue("Product failure listener should be executed within 0 seconds but took "+productFailureData.getExecutionTime()+" seconds", productFailureData.getExecutionTime() == 0);while (mailStatData == null) {mailStatData = statsHolder.getTaskStatHolder(NotifMailDispatchEvent.TASK_KEY);}// check mail dispatching stats again, when availableassertTrue("Now task for mail dispatching should be at completed state", mailStatData != null);assertTrue("Task for mail dispatching should take 5 seconds but it took "+mailStatData.getExecutionTime()+" seconds", mailStatData.getExecutionTime() == 5);assertTrue("productFailureHolder shouldn't be null but it is", productFailureData != null);assertTrue("Product failure listener should be executed within 0 seconds but took "+productFailureData.getExecutionTime()+" seconds", productFailureData.getExecutionTime() == 0);assertTrue("Thread executing mail dispatch and product failure listeners shouldn't be the same", !productFailureData.getThreadName().equals(mailStatData.getThreadName()));assertTrue("Thread executing product failure listener ("+productFailureData.getThreadName()+") should be the same as current thread ("+Thread.currentThread().getName()+") but it wasn't", Thread.currentThread().getName().equals(productFailureData.getThreadName()));assertTrue("Thread executing mail dispatch listener ("+mailStatData.getThreadName()+") shouldn't be the same as current thread ("+Thread.currentThread().getName()+") but it was", !Thread.currentThread().getName().equals(mailStatData.getThreadName()));// make some output to see the informations about tasksSystem.out.println("Data about mail notif dispatching event: "+mailStatData);System.out.println("Data about product failure dispatching event: "+productFailureData);}
}复制代码

因之前整理的笔记此处SimpleEventMulticaster忘了放进去,也懒得去找了,可以通过xml定义去查看下,这个测试用例可以看出两个listener不是由同一个executor启动的,Product failure 监听器由同步执行器执行。因为他们没有做任何操作,几乎立即返回结果。关于邮件调度事件,通过休眠5秒可以得到其执行时间超过Product failure 监听器的执行时间。通过分析输出可以知道,两者在不同的线程中执行,所以由不同的执行器执行(关于这俩执行器的例子可以再搜下相关博文,其实主要还是想表达SyncTaskExecutor是在主线程里执行,而asyncTaskExecutor由线程池里管理的线程执行)。

Product was correctly changed
I'm modifying the product but a NullPointerException will be thrown
Data about mail notif dispatching event: TaskStatData {thread name: asyncTaskExecutor-1(异步线程), start time: Thu Jun 19 21:14:18 CEST 2016, end time: Thu Jun 19 21:14:23 CEST 2016, execution time: 5 seconds}
Data about product failure dispatching event: TaskStatData {thread name: main(主线程), start time: Thu Jun 19 21:14:21 CEST 2016, end time: Thu Jun 19 21:14:21 CEST 2016, execution time: 0 seconds}复制代码

本文简单介绍了如何在Spring中处理异步事件。当监听器需要执行很长时间,而我们又不想阻塞应用程序执行,就可以使用异步执行。异步执行可以通过异步执行器(如ThreadPoolTaskExecutor或SimpleAsyncTaskExecutor)实现。

Spring5源码解析-Spring中的异步事件相关推荐

  1. Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析

    文章目录 Pre 实现原理 应用 配置类 Event事件 事件监听 EventListener 发布事件 publishEvent 源码解析 (反推) Spring默认的事件广播器 SimpleApp ...

  2. Spring5源码 - 13 Spring事件监听机制_@EventListener源码解析

    文章目录 Pre 概览 开天辟地的时候初始化的处理器 @EventListener EventListenerMethodProcessor afterSingletonsInstantiated 小 ...

  3. Spring5源码 - 11 Spring事件监听机制_源码篇

    文章目录 pre 事件监听机制的实现原理[观察者模式] 事件 ApplicationEvent 事件监听者 ApplicationEvent 事件发布者 ApplicationEventMultica ...

  4. Spring5源码 - 07 Spring Bean 生命周期流程 源码解读02

    文章目录 Pre 通俗流程 finishBeanFactoryInitialization Pre Spring5源码 - 06 Spring Bean 生命周期流程 概述 01 接上文 通俗流程 下 ...

  5. Spring源码深度解析(郝佳)-学习-源码解析-Spring MVC(三)-Controller 解析

    在之前的博客中Spring源码深度解析(郝佳)-学习-源码解析-Spring MVC(一),己经对 Spring MVC 的框架做了详细的分析,但是有一个问题,发现举的例子不常用,因为我们在实际开发项 ...

  6. JDK源码解析 Comparator 中的策略模式

    JDK源码解析 Comparator 中的策略模式.在Arrays类中有一个 sort() 方法,如下: public class Arrays{public static <T> voi ...

  7. 源码解析 - Spring如何实现IoC的?

    点击↑上方↑蓝色"编了个程"关注我~ 每周至少一篇原创文章 这是本公众号的第 28 篇原创文章 荒腔走板 上周一冲动买了个游戏手柄. 小时候很喜欢玩游戏,那个时候手柄游戏还是插卡的 ...

  8. java观察者模式在spring中的应用_Spring源码之spring中的观察者模式和监听器的使用...

    声明:本文根据鲁班学院子路老师spring中观察者模式课程整理得来 观察者模式特点: 被观察者持有监听的观察者的引用. 被观察者支持增加和删除的观察者. 被观察者状态改变通知观察者. JDK中观察者i ...

  9. unity urp内置lit材质源码解析(中)

    上一篇(https://blog.csdn.net/qq_30100043/article/details/125725934)解析了内置shader lit的主文件和input文件,接下来,我们将视 ...

最新文章

  1. 直播活动丨BMMeetup第1期:大模型Prompt Tuning技术,8场学术报告和Poster提前下载...
  2. 解决 VCENTER ROOT 密码过期无法登陆 USER PASSWORD EXPIRED
  3. Python 数据类型之字典
  4. c# Selenium 如何模拟滑动geetest 验证码
  5. 新手AS常见问题集锦
  6. 应用ForkJoin –从最佳到快速
  7. 笨方法“学习python笔记之元组(tuple)
  8. Markdown演示格式语法
  9. 利用BioPerl将DNA序列翻译成蛋白序列
  10. git 总是要求输入密码的问题
  11. linux常见的三种shell,几种常见的Shell
  12. linux python3 装pip,linux 安装pip 和python3(示例代码)
  13. 勿让“天地图”成绿坝第二
  14. SD卡windows无法格式化
  15. EHCI主机控制器--周期帧列表(periodic frame list)
  16. android 高德地图设置不能旋转_高德地图行车记录仪AR导航怎么设置使用教程
  17. 使用cipher加密配置文件
  18. 拥有有趣灵魂的程序员们,程序员访谈(四)
  19. 数模(2)——多属性决策模型
  20. 又被黑!百度到底做错了什么?

热门文章

  1. 2016百度之星 - 初赛(Astar Round2A)Gym Class(拓扑排序)
  2. 简单配置snmpd.conf
  3. 12月第1周.BIZ域名总量TOP10:仅中德澳3国持续上涨
  4. 更改chrome底色为护目色
  5. POJ2533解题报告
  6. [转载]C#模拟键盘鼠标事件-SendKeys
  7. ASP.NET连接带密码Access的方法
  8. “平头哥”半导体公司
  9. kubernetes集群网络配置方案——flannel部署
  10. SFB 项目经验-12-为某上市企业的Skype for Business购买Godday证书