Spring异步切面源码解析
spring中提供了一种异步调用的方式去让我们在方法的调用过程中来达到异步调用的结果,比如service1的a方法调用了service2的b方法,正常使用的话是a同步调用b的,但是如果我们想a异步调用b的话,也就是说可能b方法调用耗时比较长,a方法不想阻塞在b方法上面,而是b方法开启一个线程去异步调用,这时我们只需要开启spring提供的异步切面就可以简单地达到我们想要的异步调用的效果了
一.异步切面使用
添加@EnableAsync开启spring异步切面
@EnableAsync
@ComponentScan("com.zyh.springtest.async")
public class AppConfig {private int corePoolSize = 10;private int maxPoolSize = 200;private int queueCapacity = 10;private String ThreadNamePrefix = "JackExecutor-";@Beanpublic Executor executor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(queueCapacity);executor.setThreadNamePrefix(ThreadNamePrefix);// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}
在需要被异步调用的方法或者类上面加上@Async注解
@Service
public class OrderService {@Asyncpublic void queryOrder() {System.out.println("OrderService.queryOrder:" + Thread.currentThread().getName());}
}
UserService调用OrderService
@Service
public class UserService {@Autowiredprivate OrderService orderService;public void queryOrder() {System.out.println("UserOrder.queryOrder:" + Thread.currentThread().getName());orderService.queryOrder();}
}
调用UserService的queryOrder方法
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(AppConfig.class);
UserOrder userOrder = applicationContext.getBean(UserOrder.class);
userOrder.queryOrder();
结果:
二.异步切面原理
之所以OrderService的queryOrder方法会被放到一个子线程中去异步调用,它的原理其实就是OrderService被代理了,在调用OrderSservice的被加上了@Async注解的queryOrder的时候,它的代理对象会去寻找适用于增强queryOrder方法异步调用的advisor,如果有advisor就会去对queryOrder方法进行开启线程异步调用的增强,那么这个用于增强异步调用功能的advisor是怎么来的呢?我们还是按照惯例,直接去看@EnableAsync注解里面做了什么
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
在@EnableAsync注解中通过@Import注解往容器中导入了一个AsyncConfigurationSelector组件,很明显这是一个实现了ImportSelect的类,spring在加载bd的时候会去调用它的selectImports方法
public String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {case PROXY:return new String[] {ProxyAsyncConfiguration.class.getName()};case ASPECTJ:return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return null;}
}
其中在这里又去导入了ProxyAsyncConfiguration这个配置类,直接看这个配置类
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();bpp.configure(this.executor, this.exceptionHandler);Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}}
可以看到这个配置类直接给容器放入了一个AsyncAnnotationBeanPostProcessor,而这个类又是一个实现了BeanPostProcessor接口的类,所以我们直接去看它实现的前值方法和后置方法
org.springframework.aop.framework.AbstractAdvisingBeanPostProcessor#postProcessBeforeInitialization
public Object postProcessBeforeInitialization(Object bean, String beanName) {return bean;
}
前置方法并没有做什么,继续看后置方法
org.springframework.aop.framework.AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization
public Object postProcessAfterInitialization(Object bean, String beanName) {if (this.advisor == null || bean instanceof AopInfrastructureBean) {// Ignore AOP infrastructure such as scoped proxies.return bean;}if (bean instanceof Advised) {Advised advised = (Advised) bean;if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {// Add our local Advisor to the existing proxy's Advisor chain...if (this.beforeExistingAdvisors) {advised.addAdvisor(0, this.advisor);}else {advised.addAdvisor(this.advisor);}return bean;}}// 判断该bean是否适用于这个advisorif (isEligible(bean, beanName)) {// 创建一个ProxyFactoryProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);if (!proxyFactory.isProxyTargetClass()) {// 判断生成代理的方式,jdk,cglibevaluateProxyInterfaces(bean.getClass(), proxyFactory);}// 把advisor添加到代理工厂中proxyFactory.addAdvisor(this.advisor);customizeProxyFactory(proxyFactory);// 生成代理对象return proxyFactory.getProxy(getProxyClassLoader());}// No proxy needed.return bean;
}
可以看到上面通过创建了一个ProxyFactory,通过这个创建的ProxyFactory最终给bena生成了代理对象,而在生成代理对象之前,还需要去判断这个bean是否需要被代理,而判断的过程是由advisor里面的pointcut去做的,但是这个advisor是哪里来的呢?
org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor#setBeanFactory
public void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);if (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}advisor.setBeanFactory(beanFactory);this.advisor = advisor;
}
在子类AsyncAnnotationBeanPostProcessor中的setBeanFactory方法中创建了一个AsyncAnnotationAdvisor,而这个setBeanFactory方法是在bean初始化的阶段被回调的,也就是在上面的后置方法之前被回调,所以当来到后置方法的时候,这个advisor是肯定已经被实例化的了,所以下面我们重点来看下这个AsyncAnnotationAdvisor
AsyncAnnotationAdvisor
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {private Advice advice;private Pointcut pointcut;public AsyncAnnotationAdvisor() {this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);}public AsyncAnnotationAdvisor(@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));}@SuppressWarnings("unchecked")public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);asyncAnnotationTypes.add(Async.class);try {asyncAnnotationTypes.add((Class<? extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}this.advice = buildAdvice(executor, exceptionHandler);this.pointcut = buildPointcut(asyncAnnotationTypes);}public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();asyncAnnotationTypes.add(asyncAnnotationType);this.pointcut = buildPointcut(asyncAnnotationTypes);}@Overridepublic void setBeanFactory(BeanFactory beanFactory) {if (this.advice instanceof BeanFactoryAware) {((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);}}@Overridepublic Advice getAdvice() {return this.advice;}@Overridepublic Pointcut getPointcut() {return this.pointcut;}protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor, exceptionHandler);return interceptor;}protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);if (result == null) {result = new ComposablePointcut(cpc);}else {result.union(cpc);}result = result.union(mpc);}return (result != null ? result : Pointcut.TRUE);}
众所周知一个advisor里面肯定会有advice以及pointcut,直接看advice是什么
protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor, exceptionHandler);return interceptor;
}
可以看到advice就是AnnotationAsyncExecutionInterceptor,不用说AnnotationAsyncExecutionInterceptor肯定是实现了MethodInterceptor接口,最后还调用了configure方法
public void configure(@Nullable Supplier<Executor> defaultExecutor,@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
configure方法传了两个对象,这两个对象是通过AsyncAnnotationAdvisor的带参构造方法传进来的,逐层往上找可以看到源头就是从ProxyAsyncConfiguration的父类AbstractAsyncConfiguration传过来的
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {@Nullableprotected AnnotationAttributes enableAsync;@Nullableprotected Supplier<Executor> executor;@Nullableprotected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;@Overridepublic void setImportMetadata(AnnotationMetadata importMetadata) {this.enableAsync = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));if (this.enableAsync == null) {throw new IllegalArgumentException("@EnableAsync is not present on importing class " + importMetadata.getClassName());}}/*** Collect any {@link AsyncConfigurer} beans through autowiring.*/@Autowired(required = false)void setConfigurers(Collection<AsyncConfigurer> configurers) {if (CollectionUtils.isEmpty(configurers)) {return;}if (configurers.size() > 1) {throw new IllegalStateException("Only one AsyncConfigurer may exist");}AsyncConfigurer configurer = configurers.iterator().next();this.executor = configurer::getAsyncExecutor;this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;}}
可以看到在AbstractAsyncConfiguration中通过从容器找到所有实现了AsyncConfigurer接口的bean然后分别调用getAsyncExecutor和getAsyncUncaughtExceptionHandler方法拿到executor和exceptionHandler,所以也就是说如果我们想要给spring的异步调用自定义配置executor和exceptionHandler,可以自己去实现一个类继承AsyncConfigurer接口然后实现里面的方法返回自定义的executor和exceptionHandler就可以了,如下:
@Component
public class MyAsyncConfigurer implements AsyncConfigurer {private int corePoolSize = 10;private int maxPoolSize = 200;private int queueCapacity = 10;private String ThreadNamePrefix = "MyAsyncConfigurer-executor";@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(queueCapacity);executor.setThreadNamePrefix(ThreadNamePrefix);// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}
但是假如我们并没有通过这个方式给提供自定义的executor和exceptionHandler呢?spring是通过会有默认的executor和exceptionHandler提供吗?我们再来看AnnotationAsyncExecutionInterceptor的configure方法
public void configure(@Nullable Supplier<Executor> defaultExecutor,@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
可以看到如果第一个参数为空的话就会返回第二个参数的值,而对于executor这个组件来说的话,如果我们没有通过上面的方式给spring容器提供一个executor组件的话,那么就会调用getDefaultExecutor方法返回一个executor
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {Executor defaultExecutor = super.getDefaultExecutor(beanFactory);return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
可以看到现实会去调用父类的getDefaultExecutor方法,如果父类返回的executor是空的话,那么就返回一个SimpleAsyncTaskExecutor,我们看下父类中是怎么拿到executor的
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {return beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskExecutor bean found within the context, and none is named " +"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {logger.info("No task executor bean found for async processing: " +"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}return null;
}
可以父类是从spring容器中直接找实现了TaskExecutor接口的bean,如果找不到或者找到了多个bean那么就继续去容器中寻找名称为taskExecutor并且实现了Executor接口的bean,如果还找不到就直接返回空
接下来我们直接看下advice的增强逻辑
org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
public Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);// 确定该方法需要使用哪个executorAsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}Callable<Object> task = () -> {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
首先会去找到该方法需要的executor,看下是怎么去找的
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {// 从缓存中拿到该方法对应的executorAsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;// 获取@Async注解上面的value值String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {// 如果@Async注解的value不为空,按照value值去spring容器中找到名称为该value值的executortargetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {// 如果没有在@Async注解上指定value值,就使用自己自定义的executor或者是默认的executortargetExecutor = this.defaultExecutor.get();}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));// 放进缓存this.executors.put(method, executor);}return executor;
}
可以看到spring会根据@Async注解的value值去容器中找到对应的executor,也就是说每一个方法都可以使用不用的executor去被执行,只要executor的名称与@Async注解的value属性声明的名称一样就可以了
最后会把方法调用方到一个task中,然后调用doSubmit方法
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}else if (ListenableFuture.class.isAssignableFrom(returnType)) {return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}else if (Future.class.isAssignableFrom(returnType)) {return executor.submit(task);}else {executor.submit(task);return null;}
}
在doSubmit方法中会根据方法的返回值去选择调用不同的executor执行方法,至此,spring异步切面调用流程就走完了
Spring异步切面源码解析相关推荐
- Spring AOP面向切面源码解析
IoC 和 AOP 被称为 Spring 两大基础模块 AOP(Aspect-Oriented Programming) 在程序设计领域拥有其不可替代的适用场景和地位.Spring AOP 作为 AO ...
- 【Spring源码】AOP切面源码
[Spring源码]AOP切面源码 关键词 后置处理器BeanPostProcessor后置方法:applyBeanPostProcessorsAfterInitialization() 切面后置处理 ...
- Spring 之 @Cacheable 源码解析(上)
一.@EnableCaching 源码解析 当要使用 @Cacheable 注解时需要引入 @EnableCaching 注解开启缓存功能.为什么呢?现在就来看看为什么要加入 @EnableCachi ...
- spring aop 注入源码解析
spring aop 注入源码解析 aop启动 AbstractApplicationContext.java @Overridepublic void refresh() throws BeansE ...
- spring aop 注入源码解析 1
spring aop 注入源码解析 aop启动 AbstractApplicationContext.java @Overridepublic void refresh() throws BeansE ...
- Spring 之 @Cacheable 源码解析(下)
CacheInterceptor 缓存切面处理逻辑 接着上篇 Spring 之 @Cacheable 源码解析(上) 说起,代理对象已经创建成功,接着分析调用流程.那么应该从哪里入手呢?当然是去看 A ...
- FileInputFormat切片源码解析
文章目录 FileInputFormat切片源码解析 1.MapTask并行度决定机制 2.源码步骤 3.FileInputFormat切片机制 3.1 源代码中计算切片大小的公式 3.2 获取切片信 ...
- Spring Cloud Gateway 源码解析(3) —— Predicate
目录 RoutePredicateFactory GatewayPredicate AfterRoutePredicateFactory RoutePredicateHandlerMapping Fi ...
- ThinkPHP门面源码解析
本文主要描述了门面的使用和实现过程以及源码的深度解析. 框架门面解析 前言 一.简单认识一下在框架中的门面的好处 二.学习框架中facade的使用 三.优化在框架中facade的使用 四.门面类源码解 ...
- spring 多线程 事务 源码解析(一)
大家好,我是烤鸭: 今天分享的是spring 多线程事务源码分析. 环境: spring-jdbc 5.0.4.REALEASE 今天分享一下spring事务的方法,这一篇还没涉及到多线程. 简单说一 ...
最新文章
- Swoole实现私聊群聊
- 什么是SAP CRM的Custom Transaction Context
- 俄罗斯将封杀LinkedIn 推动个人数据本地化
- 网络基础一(协议的概念,网络应用程序设计模式)
- ./mysqld: error while loading shared libraries: libnuma.so.1: cannot open shared object file: No suc
- Windows10系统下,彻底删除卸载MySQL
- 动手为王 | Oracle 数据库跨版本升级迁移实践
- 从0成为Facebook广告高手系列教程
- 快捷添加请求头的方法
- 网络常用端口号大全----2
- 阿里云盘初体验——丝滑
- 图形可视化2:matlab画散点图加拟合趋势线
- android停止补间动画,android 帧动画,补间动画,属性动画的简单总结
- SATA硬盘的数据和电源接口定义
- 【Educoder作业】冯·诺依曼体系结构及工作原理理解
- 宋鸿兵在《货币战争5》中谈到的美国医疗
- NRF24L01(1)
- 论文笔记:联邦学习——Federated Learning: Challenges, Methods, and Future Directions
- MAC地址查询 Linux/Unix操作系统mac地址怎么查
- HTML5七夕情人节表白网页_(唯美满天星)多功能展示(网状球状)3D相册_HTML+CSS+JS 求婚 html生日快乐祝福代码网页 520情人节告白代码 程序员表白源码 抖音3D旋转相册
热门文章
- linux下把 python 程序运行的输出结果记录到 log 文件中
- Tor源码分析十 -- 连接和链路
- 剑指offer之扑克牌的顺子
- tcpip路由技术卷一_计算机网络题库考(2020.9.10晚18.320.30 北京卷)
- Spring Cloud随记---分布式配置中心初探--一个单节点的配置中心
- cs231n学习笔记-激活函数-BN-参数优化
- 【机器学习】Cross-Validation(交叉验证)详解
- 服务器上搭shinyApp:shiny-server配置及报错解决
- 如何在 Codeforces 上出题?
- 数位DP算法概述及习题