在实际的开发过程中,有些业务逻辑使用异步的方式处理更为合理。比如在某个业务逻辑中,需要把一些数据存入到redis缓存中,这个操作只是一个辅助的功能,成功或者失败对主业务并不会产生根本影响,这个过程可以通过异步的方法去进行。

Spring中通过在方法上设置@Async注解,可使得方法被异步调用。也就是说该方法会在调用时立即返回,而这个方法的实际执行交给Spring的TaskExecutor去完成。

异步执行的使用

配置类

使用@EnableAsync注解开启异步功能。

package com.morris.spring.config;import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;@Configuration
@EnableAsync // 开启Async
public class AsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {// 自定义线程池ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(2);executor.setMaxPoolSize(4);executor.setQueueCapacity(10);executor.setThreadNamePrefix("MyExecutor-");executor.initialize();return executor;}}

service层的使用

在需要异步执行的方法上面加上@Async注解。

package com.morris.spring.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;@Slf4j
public class AsyncService {@Asyncpublic void noResult() {log.info("execute noResult");}@Asyncpublic Future<String> hasResult() throws InterruptedException {log.info("execute hasResult");TimeUnit.SECONDS.sleep(5);return new AsyncResult<>("hasResult success");}@Asyncpublic CompletableFuture<String> completableFuture() throws InterruptedException {log.info(" execute completableFuture");TimeUnit.SECONDS.sleep(5);return CompletableFuture.completedFuture("completableFuture success");}}

测试类

package com.morris.spring.demo.async;import com.morris.spring.config.AsyncConfig;
import com.morris.spring.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** 异步调用的演示*/
@Slf4j
public class AsyncDemo {@Testpublic void test() throws ExecutionException, InterruptedException {AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();applicationContext.register(AsyncService.class);applicationContext.register(AsyncConfig.class);applicationContext.refresh();AsyncService asyncService = applicationContext.getBean(AsyncService.class);asyncService.noResult(); // 无结果Future<String> future = asyncService.hasResult();log.info("hasResult: {}", future.get()); // 有结果CompletableFuture<String> completableFuture = asyncService.completableFuture();completableFuture.thenAcceptAsync(System.out::println);// 异步回调log.info("completableFuture call down");}
}

运行结果如下:

INFO  MyExecutor-1 AsyncService:16 - execute noResult
INFO  MyExecutor-2 AsyncService:21 - execute hasResult
INFO  main AsyncDemo:29 - hasResult: hasResult success
INFO  MyExecutor-1 AsyncService:28 -  execute completableFuture
INFO  main AsyncDemo:33 - completableFuture call down

通过日志可以发现AsyncService的方法都是通过线程名为MyExecutor-1的线程执行的,这个名称的前缀是在AsyncConfig中指定的,而不是通过main线程执行的。

两个疑问:

  • 是否可以不配置Executor线程池,Spring会默认创建默认的Executor,还是会报错?
  • Executor线程池中执行任务时如果抛出了异常,可否自定义异常的处理类对异常进行捕获处理?

源码分析

@EnableAsync

@EnableAsync主要是向Spring容器中导入了AsyncConfigurationSelector类。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

AsyncConfigurationSelector

AsyncConfigurationSelector的主要方法当然是selectImports(),注意这里会先调用父类的selectImports()
org.springframework.context.annotation.AdviceModeImportSelector#selectImports(org.springframework.core.type.AnnotationMetadata)

public final String[] selectImports(AnnotationMetadata importingClassMetadata) {Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);if (attributes == null) {throw new IllegalArgumentException(String.format("@%s is not present on importing class '%s' as expected",annType.getSimpleName(), importingClassMetadata.getClassName()));}AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());// 模板方法模式,回调子类的selectImportsString[] imports = selectImports(adviceMode);if (imports == null) {throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);}return imports;
}

org.springframework.scheduling.annotation.AsyncConfigurationSelector#selectImports

public String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {case PROXY:// 奇怪???@Transaction、@EnableCaching都是注入两个类,一个config,一个registrar导入aop的入口类// 而这里只有一个config类ProxyAsyncConfigurationreturn new String[] {ProxyAsyncConfiguration.class.getName()};case ASPECTJ:return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return null;}
}

AsyncConfigurationSelector又导入了配置类ProxyAsyncConfiguration。

ProxyAsyncConfiguration

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {/*** 先看父类AbstractAsyncConfiguration* @return*/@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");// 实例化AsyncAnnotationBeanPostProcessorAsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();bpp.configure(this.executor, this.exceptionHandler);Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}}

ProxyAsyncConfiguration向容器中注入了一个AsyncAnnotationBeanPostProcessor。

疑问:这里为啥是BeanPostProcessor,不应该像事务切面或者缓存切面一样,注入一个Advisor和XxxxInterceptor(Advice)吗?

AbstractAsyncConfiguration

AbstractAsyncConfiguration是ProxyAsyncConfiguration的父类。

@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {@Nullableprotected AnnotationAttributes enableAsync;@Nullableprotected Supplier<Executor> executor;@Nullableprotected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;/*** 实现了ImportAware.setImportMetadata* 在ProxyAsyncConfiguration初始化后被调用* @param importMetadata*/@Overridepublic void setImportMetadata(AnnotationMetadata importMetadata) {// 取得@EnableAsync注解this.enableAsync = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));if (this.enableAsync == null) {throw new IllegalArgumentException("@EnableAsync is not present on importing class " + importMetadata.getClassName());}}/*** Collect any {@link AsyncConfigurer} beans through autowiring.*/@Autowired(required = false)void setConfigurers(Collection<AsyncConfigurer> configurers) {// configurers默认为空,除非手动注入AsyncConfigurerif (CollectionUtils.isEmpty(configurers)) {return;}if (configurers.size() > 1) {throw new IllegalStateException("Only one AsyncConfigurer may exist");}AsyncConfigurer configurer = configurers.iterator().next();this.executor = configurer::getAsyncExecutor;this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;}}

从这里可以看出,可以通过向spring容器中注入AsyncConfigurer来指定执行异步任务的线程池和异常处理器。

AsyncAnnotationBeanPostProcessor

AsyncAnnotationBeanPostProcessor的继承结构图:

AsyncAnnotationBeanPostProcessor主要实现了BeanFactoryAware和BeanPostProcessor接口。

org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor#setBeanFactory

public void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);// 实例化AdvisorAsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);if (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}advisor.setBeanFactory(beanFactory);this.advisor = advisor;
}

在AsyncAnnotationBeanPostProcessor实例化时实例化了切面AsyncAnnotationAdvisor。

每个bean实例化完后都会调用AsyncAnnotationBeanPostProcessor.postProcessAfterInitialization()判断是否要生成代理对象。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {... .../*** @see AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible(java.lang.Object, java.lang.String)*/// isEligible会判断哪些bean要生成代理// 就是使用advisor中的pointcut进行匹配if (isEligible(bean, beanName)) {// 创建代理ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);if (!proxyFactory.isProxyTargetClass()) {evaluateProxyInterfaces(bean.getClass(), proxyFactory);}proxyFactory.addAdvisor(this.advisor);customizeProxyFactory(proxyFactory);return proxyFactory.getProxy(getProxyClassLoader());}// No proxy needed.return bean;
}

AsyncAnnotationAdvisor

切面AsyncAnnotationAdvisor包括通知AnnotationAsyncExecutionInterceptor和切点ComposablePointcut。

public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);asyncAnnotationTypes.add(Async.class);try {asyncAnnotationTypes.add((Class<? extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}this.advice = buildAdvice(executor, exceptionHandler); // 创建AnnotationAsyncExecutionInterceptorthis.pointcut = buildPointcut(asyncAnnotationTypes); // 创建ComposablePointcut
}protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor, exceptionHandler);return interceptor;
}protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); // 类Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); // 方法if (result == null) {result = new ComposablePointcut(cpc);}else {result.union(cpc); // 类和方法的组合切点}result = result.union(mpc);}return (result != null ? result : Pointcut.TRUE);
}

AnnotationMatchingPointcut切面其实就是查看类或者方法上面有没有@Async注解。

AnnotationAsyncExecutionInterceptor

AnnotationAsyncExecutionInterceptor类主要负责增强逻辑的实现。

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

public Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);// 获得线程池AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}// 将目标方法的执行封装为Callable,方便提交到线程池Callable<Object> task = () -> {try {// 执行目标方法Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};// 提交任务return oSubmit(task, executor, invocation.getMethod().getReturnType());
}

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;/*** @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier(java.lang.reflect.Method)*/// 获得@Async注解中的value属性中指定的taskExecutor名称String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {// 获取默认的taskExecutortargetExecutor = this.defaultExecutor.get();}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;
}

determineAsyncExecutor()负责获取异步任务执行的线程池,线程池的查找步骤如下:

  1. 从spring容器中寻找@Async注解中的value属性中指定的taskExecutor
  2. 寻找默认的defaultExecutor

默认的defaultExecutor是怎么来的?

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#configure

public void configure(@Nullable Supplier<Executor> defaultExecutor,@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {// defaultExecutor默认为从beanFactory获取TaskExecutor或者bean名字为taskExecutor的Executor,beanFactory.getBean(TaskExecutor.class)this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));// exceptionHandler默认为SimpleAsyncUncaughtExceptionHandlerthis.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}

defaultExecutor首先取参数传入的defaultExecutor,这个参数来自接口AsyncConfigurer.getAsyncExecutor(),如果参数为null,那么就调用getDefaultExecutor(),注意这个方法子类AsyncExecutionInterceptor重写了:

org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {Executor defaultExecutor = super.getDefaultExecutor(beanFactory);return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

如果找不到defaultExecutor就会创建一个SimpleAsyncTaskExecutor。

再来看看父类的AsyncExecutionAspectSupport#getDefaultExecutor:
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {// Search for TaskExecutor bean... not plain Executor since that would// match with ScheduledExecutorService as well, which is unusable for// our purposes here. TaskExecutor is more clearly designed for it.return beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean", ex);try {// 找名为taskExecutor的Executorreturn beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskExecutor bean found within the context, and none is named " +"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {logger.info("No task executor bean found for async processing: " +"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}return null;
}

先从beanFactory中获取TaskExecutor类型的对象,然后再找名为taskExecutor的Executor对象。

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {// 执行任务if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}else if (ListenableFuture.class.isAssignableFrom(returnType)) {return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}else if (Future.class.isAssignableFrom(returnType)) {return executor.submit(task);}else {executor.submit(task);return null;}
}

doSubmit()负责将任务提交至线程池中,并对各种方法的返回值进行处理。

【spring】spring异步执行的使用与源码分析相关推荐

  1. 【spring】Spring事件监听器ApplicationListener的使用与源码分析

    ApplicationEvent以及Listener是Spring为我们提供的一个事件监听.订阅的实现,内部实现原理是观察者设计模式,设计初衷也是为了系统业务逻辑之间的解耦,提高可扩展性以及可维护性. ...

  2. 聊聊Spring中的数据绑定 --- DataBinder本尊(源码分析)

    每篇一句 唯有热爱和坚持,才能让你在程序人生中屹立不倒,切忌跟风什么语言或就学什么去~ 相关阅读 [小家Spring]聊聊Spring中的数据转换:Converter.ConversionServic ...

  3. 聊聊Spring中的数据绑定 --- DataBinder本尊(源码分析)【享学Spring】

    每篇一句 唯有热爱和坚持,才能让你在程序人生中屹立不倒,切忌跟风什么语言或就学什么去~ 前言 数据绑定 这个概念在任何一个成型的框架中都是特别重要的(尤其是web框架),它能让框架更多的自动化,更好容 ...

  4. 【小家Spring】聊聊Spring中的数据绑定 --- DataBinder本尊(源码分析)

    每篇一句 > 唯有热爱和坚持,才能让你在程序人生中屹立不倒,切忌跟风什么语言或就学什么去~ 相关阅读 [小家Spring]聊聊Spring中的数据绑定 --- 属性访问器PropertyAcce ...

  5. Spring自定义注解驱动开发使用及源码分析

    目录 前言 注解驱动开发使用 需求 代码实现 测试效果 源码分析 BeanDefinitionRegistryPostProcessor接口 解析BeanDefinition 处理Bean上配置的注解 ...

  6. 深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析

    这篇博文,我们就来讲讲Executor启动后,是如何在Executor上执行Task的,以及其后续处理. 执行Task 我们在<深入理解Spark 2.1 Core (三):任务调度器的原理与源 ...

  7. spring boot实战(第九篇)Application创建源码分析

    前言 通过前面的文章了解到在spring boot的启动时,利用的是编写的Application类,使用了注解@SpringBootApplication,本篇将阐述该Bean的加载过程. [html ...

  8. Spring IOC和Bean生命周期以及源码分析

    这篇文章主要讲解 IOC 容器的创建过程,让大家对整体有一个全局的认识,文章目录如图: 1. 基础知识 1.1 什么是 Spring IOC ? IOC 不是一种技术,只是一种思想,一个重要的面向对象 ...

  9. 【spring】AOP引入的使用与源码分析

    通知是对目标对象方法的增强,而引入可以动态为目标对象实现新的接口,实现对类的增强. 引入的使用 目标类 public class DogService {public void hi() {Syste ...

最新文章

  1. 设计Optaplanner下实时规划服务的失败经历
  2. 欧盟正研究用三种标准技术应对DNS漏洞
  3. 串口同步通信和串口异步通信
  4. Oracle序列使用:建立、删除
  5. 【iVX 初级工程师培训教程 10篇文拿证】07 08 新闻页制作
  6. 吉林农业科技学院计算机大师,吉林农业科技学院
  7. 如何使用digiKam进行照片管理
  8. 多比图形控件教程:基于Flex/Javascript的网页绘图控件
  9. 大学数学实验习题--统计推断 (附答案)判断alpha,n与mu,sigma的估计区间长度的关系
  10. 看数据模型界两大长老的神仙打架
  11. css美化table的方法
  12. vmware使用显卡 | vmware切换显卡 | vmware显卡配置
  13. AHRS和INS的区别
  14. 钉钉小程序的坑 么有开启通讯录权限,导致后台报错“没有调用该接口的权限”
  15. 江苏计算机一级证书考试试题,2016年江苏省计算机一级考试试题
  16. python蟒蛇画法
  17. FPGA实现UDP传输视频,提供2套verilog工程源码和接收显示上位机程序
  18. Git 官网无法下载 解决
  19. FPGA-出租车计价器的实现
  20. sql server使用DAC连接查询系统表

热门文章

  1. 海康服务器装win7系统,详解win7旗舰版系统必须重装的四种情况
  2. 2016-7-20 奶奶走了
  3. C语言学习笔记第五天_项目训练
  4. 可编程数据平面(论文阅读)
  5. Python 给图片加文字,加图片水印
  6. windows控制台命令合集
  7. 宠物店 java 报告_宠物店社会实践报告通用范文
  8. C# linq的学习及使用
  9. ASP.NET页面在IE缓存的清除办法 (转)
  10. 有道云笔记Markdown图片插入居中方法