什么是异步调用?

异步调用是相对于同步调用而言的,同步调用是指程序按预定顺序一步步执行,每一步必须等到上一步执行完后才能执行,异步调用则无需等待上一步程序执行完即可执行。异步调用指,在程序在执行时,无需等待执行的返回值即可继续执行后面的代码。在我们的应用服务中,有很多业务逻辑的执行操作不需要同步返回(如发送邮件、冗余数据表等),只需要异步执行即可。

本文将介绍 Spring 应用中,如何实现异步调用。在异步调用的过程中,会出现线程上下文信息的丢失,我们该如何解决线程上下文信息的传递。

Spring 应用中实现异步

Spring 为任务调度与异步方法执行提供了注解支持。通过在方法或类上设置 @Async 注解,可使得方法被异步调用。调用者会在调用时立即返回,而被调用方法的实际执行是交给 Spring 的 TaskExecutor 来完成的。所以被注解的方法被调用的时候,会在新的线程中执行,而调用它的方法会在原线程中执行,这样可以避免阻塞,以及保证任务的实时性。

引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

引入 Spring 相关的依赖即可。

入口类

@SpringBootApplication
@EnableAsync
public class AsyncApplication {public static void main(String[] args) {SpringApplication.run(AsyncApplication.class, args);}

入口类增加了 @EnableAsync 注解,主要是为了扫描范围包下的所有 @Async 注解。

对外的接口

这里写了一个简单的接口:

@RestController
@Slf4j
public class TaskController {@Autowiredprivate TaskService taskService;@GetMapping("/task")public String taskExecute() {try {taskService.doTaskOne();taskService.doTaskTwo();taskService.doTaskThree();} catch (Exception e) {log.error("error executing task for {}",e.getMessage());}return "ok";}
}

调用 TaskService 执行三个异步方法。

Service 方法

@Component
@Slf4j
//@Async
public class TaskService {@Asyncpublic void doTaskOne() throws Exception {log.info("开始做任务一");long start = System.currentTimeMillis();Thread.sleep(1000);long end = System.currentTimeMillis();log.info("完成任务一,耗时:" + (end - start) + "毫秒");}@Asyncpublic void doTaskTwo() throws Exception {log.info("开始做任务二");long start = System.currentTimeMillis();Thread.sleep(1000);long end = System.currentTimeMillis();log.info("完成任务二,耗时:" + (end - start) + "毫秒");}@Asyncpublic void doTaskThree() throws Exception {log.info("开始做任务三");long start = System.currentTimeMillis();Thread.sleep(1000);long end = System.currentTimeMillis();log.info("完成任务三,耗时:" + (end - start) + "毫秒");}
}

@Async 可以用于类上,标识该类的所有方法都是异步方法,也可以单独用于某些方法。每个方法都会 sleep 1000 ms。

结果展示

运行结果如下:

可以看到 TaskService 中的三个方法是异步执行的,接口的结果快速返回,日志信息异步输出。异步调用,通过开启新的线程调用的方法,不影响主线程。异步方法实际的执行交给了 Spring 的 TaskExecutor 来完成。

Future:获取异步执行的结果

在上面的测试中我们也可以发现主调用方法并没有等到调用方法执行完就结束了当前的任务。如果想要知道调用的三个方法全部执行完该怎么办呢,下面就可以用到异步回调。

异步回调就是让每个被调用的方法返回一个 Future 类型的值,Spring 中提供了一个 Future 接口的子类:AsyncResult,所以我们可以返回 AsyncResult 类型的值。

public class AsyncResult<V> implements ListenableFuture<V> {private final V value;private final ExecutionException executionException;//...
}

AsyncResult 实现了 ListenableFuture 接口,该对象内部有两个属性:返回值和异常信息。

public interface ListenableFuture<T> extends Future<T> {void addCallback(ListenableFutureCallback<? super T> var1);void addCallback(SuccessCallback<? super T> var1, FailureCallback var2);
}

ListenableFuture 接口继承自 Future,在此基础上增加了回调方法的定义。Future 接口定义如下:

public interface Future<V> {// 是否可以打断当前正在执行的任务boolean cancel(boolean mayInterruptIfRunning);// 任务取消的结果boolean isCancelled();// 异步方法中最后返回的那个对象中的值 V get() throws InterruptedException, ExecutionException;// 用来判断该异步任务是否执行完成,如果执行完成,则返回 true,如果未执行完成,则返回falseboolean isDone();// 与 get() 一样,只不过这里参数中设置了超时时间V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

#get() 方法,在执行的时候是需要等待回调结果的,阻塞等待。如果不设置超时时间,它就阻塞在那里直到有了任务执行完成。我们设置超时时间,就可以在当前任务执行太久的情况下中断当前任务,释放线程,这样就不会导致一直占用资源。

#cancel(boolean) 方法,参数是一个 boolean 类型的值,用来传入是否可以打断当前正在执行的任务。如果参数是 true 且当前任务没有执行完成 ,说明可以打断当前任务,那么就会返回 true;如果当前任务还没有执行,那么不管参数是 true 还是 false,返回值都是 true;如果当前任务已经完成,那么不管参数是 true 还是 false,那么返回值都是 false;如果当前任务没有完成且参数是 false,那么返回值也是 false。即:

  1. 如果任务还没执行,那么如果想取消任务,就一定返回 true,与参数无关。
  2. 如果任务已经执行完成,那么任务一定是不能取消的,所以此时返回值都是false,与参数无关。
  3. 如果任务正在执行中,那么此时是否取消任务就看参数是否允许打断(true/false)。

获取异步方法返回值的实现

public Future<String> doTaskOne() throws Exception {log.info("开始做任务一");long start = System.currentTimeMillis();Thread.sleep(1000);long end = System.currentTimeMillis();log.info("完成任务一,耗时:" + (end - start) + "毫秒");return new AsyncResult<>("任务一完成,耗时" + (end - start) + "毫秒");}//...其他两个方法类似,省略

我们将 task 方法的返回值改为 Future<String>,将执行的时间拼接为字符串返回。

@GetMapping("/task")public String taskExecute() {try {Future<String> r1 = taskService.doTaskOne();Future<String> r2 = taskService.doTaskTwo();Future<String> r3 = taskService.doTaskThree();while (true) {if (r1.isDone() && r2.isDone() && r3.isDone()) {log.info("execute all tasks");break;}Thread.sleep(200);}log.info("n" + r1.get() + "n" + r2.get() + "n" + r3.get());} catch (Exception e) {log.error("error executing task for {}",e.getMessage());}return "ok";}

在调用异步方法之后,可以通过循环判断异步方法是否执行完成。结果正如我们所预期,future 所 get 到的是 AsyncResult 返回的字符串。

配置线程池

前面是最简单的使用方法,使用默认的 TaskExecutor。如果想使用自定义的 Executor,可以结合 @Configuration 注解的配置方式。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class TaskPoolConfig {@Bean("taskExecutor") // bean 的名称,默认为首字母小写的方法名public Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10); // 核心线程数(默认线程数)executor.setMaxPoolSize(20); // 最大线程数executor.setQueueCapacity(200); // 缓冲队列数executor.setKeepAliveSeconds(60); // 允许线程空闲时间(单位:默认为秒)executor.setThreadNamePrefix("taskExecutor-"); // 线程池名前缀// 线程池对拒绝任务的处理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}
}

线程池的配置很灵活,对核心线程数、最大线程数等属性进行配置。其中,rejection-policy,当线程池已经达到最大线程数的时候,如何处理新任务。可选策略有 CallerBlocksPolicy、CallerRunsPolicy 等。CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行。我们验证下,线程池的设置是否生效,在 TaskService 中,打印当前的线程名称:

public Future<String> doTaskOne() throws Exception {log.info("开始做任务一");long start = System.currentTimeMillis();Thread.sleep(1000);long end = System.currentTimeMillis();log.info("完成任务一,耗时:" + (end - start) + "毫秒");log.info("当前线程为 {}", Thread.currentThread().getName());return new AsyncResult<>("任务一完成,耗时" + (end - start) + "毫秒");}

通过结果可以看到,线程池配置的线程名前缀已经生效。在 Spring @Async 异步线程使用过程中,需要注意的是以下的用法会使 @Async 失效:

  • 异步方法使用 static 修饰;
  • 异步类没有使用 @Component 注解(或其他注解)导致 Spring 无法扫描到异步类;
  • 异步方法不能与被调用的异步方法在同一个类中;
  • 类中需要使用 @Autowired 或 @Resource 等注解自动注入,不能手动 new 对象;
  • 如果使用 Spring Boot 框架必须在启动类中增加 @EnableAsync 注解。

线程上下文信息传递

很多时候,在微服务架构中的一次请求会涉及多个微服务。或者一个服务中会有多个处理方法,这些方法有可能是异步方法。有些线程上下文信息,如请求的路径,用户唯一的 userId,这些信息会一直在请求中传递。如果不做任何处理,我们看下是否能够正常获取这些信息。

@GetMapping("/task")public String taskExecute() {try {Future<String> r1 = taskService.doTaskOne();Future<String> r2 = taskService.doTaskTwo();Future<String> r3 = taskService.doTaskThree();ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();log.info("当前线程为 {},请求方法为 {},请求路径为:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());while (true) {if (r1.isDone() && r2.isDone() && r3.isDone()) {log.info("execute all tasks");break;}Thread.sleep(200);}log.info("n" + r1.get() + "n" + r2.get() + "n" + r3.get());} catch (Exception e) {log.error("error executing task for {}", e.getMessage());}return "ok";}

在 Spring Boot Web 中我们可以通过 RequestContextHolder 很方便的获取 request。在接口方法中,输出请求的方法和请求的路径。

public Future<String> doTaskOne() throws Exception {log.info("开始做任务一");long start = System.currentTimeMillis();Thread.sleep(1000);long end = System.currentTimeMillis();log.info("完成任务一,耗时:" + (end - start) + "毫秒");ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();log.info("当前线程为 {},请求方法为 {},请求路径为:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());return new AsyncResult<>("任务一完成,耗时" + (end - start) + "毫秒");}

同时在 TaskService 中,验证是不是也能输出请求的信息。运行程序,结果如下:

在 TaskService 中,每个异步线程的方法获取 RequestContextHolder 中的请求信息时,报了空指针异常。这说明了请求的上下文信息未传递到异步方法的线程中。RequestContextHolder 的实现,里面有两个 ThreadLocal 保存当前线程下的 request。

//得到存储进去的requestprivate static final ThreadLocal<RequestAttributes> requestAttributesHolder =new NamedThreadLocal<RequestAttributes>("Request attributes");//可被子线程继承的requestprivate static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =new NamedInheritableThreadLocal<RequestAttributes>("Request context");

再看 #getRequestAttributes() 方法,相当于直接获取 ThreadLocal 里面的值,这样就使得每一次获取到的 Request 是该请求的 request。如何将上下文信息传递到异步线程呢?Spring 中的 ThreadPoolTaskExecutor 有一个配置属性 TaskDecoratorTaskDecorator 是一个回调接口,采用装饰器模式。装饰模式是动态的给一个对象添加一些额外的功能,就增加功能来说,装饰模式比生成子类更为灵活。因此 TaskDecorator 主要用于任务的调用时设置一些执行上下文,或者为任务执行提供一些监视/统计。

public interface TaskDecorator {Runnable decorate(Runnable runnable);
}

#decorate 方法,装饰给定的 Runnable,返回包装的 Runnable 以供实际执行。

下面我们定义一个线程上下文拷贝的 TaskDecorator

import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;public class ContextDecorator implements TaskDecorator {@Overridepublic Runnable decorate(Runnable runnable) {RequestAttributes context = RequestContextHolder.currentRequestAttributes();return () -> {try {RequestContextHolder.setRequestAttributes(context);runnable.run();} finally {RequestContextHolder.resetRequestAttributes();}};}
}

实现较为简单,将当前线程的 context 装饰到指定的 Runnable,最后重置当前线程上下文。

在线程池的配置中,增加回调的 TaskDecorator 属性的配置:

@Bean("taskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(20);executor.setQueueCapacity(200);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("taskExecutor-");executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);// 增加 TaskDecorator 属性的配置executor.setTaskDecorator(new ContextDecorator());executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}

经过如上配置,我们再次运行服务,并访问接口,控制台日志信息如下:

由结果可知,线程的上下文信息传递成功。

小结

本文结合示例讲解了 Spring 中实现异步方法,获取异步方法的返回值。并介绍了配置 Spring 线程池的方式。最后介绍如何在异步多线程中传递线程上下文信息。线程上下文传递在分布式环境中会经常用到,比如分布式链路追踪中需要一次请求涉及到的 TraceId、SpanId。简单来说,需要传递的信息能够在不同线程中。异步方法是我们在日常开发中用来多线程处理业务逻辑,这些业务逻辑不需要严格的执行顺序。用好异步解决问题的同时,更要用对异步多线程的方式。

源码地址

推荐阅读

微服务合集

订阅最新文章,欢迎关注我的公众号

odciexttableopen 调用出错 error open log_如何在 Spring 异步调用中传递上下文相关推荐

  1. delphi 异步 调用 带参数_如何在 Spring 异步调用中传递上下文

    什么是异步调用? 异步调用是相对于同步调用而言的,同步调用是指程序按预定顺序一步步执行,每一步必须等到上一步执行完后才能执行,异步调用则无需等待上一步程序执行完即可执行.异步调用指,在程序在执行时,无 ...

  2. Spring异步调用传递Request对象问题分析

    一.问题描述 近期在实验室做了个动态插桩工具,在对甲方项目测试过程中,发现对含有线程池异步调用的方法进行插桩时,子线程会报空指针异常. 二.问题原因 动态插桩工具向待测软件注入的代码中包含了如下语句, ...

  3. Spring异步调用原理及SpringAop拦截器链原理

    一.Spring异步调用底层原理 开启异步调用只需一个注解@EnableAsync @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTI ...

  4. 软件中的1、同步调用;2、回调;3、异步调用

    软件模块中存在一定接口,从调用方式上分为三类 1.同步调用:2.回调:3.异步调用 首先,同步调用是一种阻塞式调用,调用方要等待对象执行完毕才返回.它是一种单向调用. 其次,回调是一种双向调用模式,也 ...

  5. 头信息_如何在 Spring REST Controller 中获取 HTTP 头信息

    介绍 在本篇文章中,我们将研究如何在 Spring Rest Controller 中访问 HTTP 头信息. 首先,我们将使用 @RequestHeader 注解分别或同时读取 HTTP 头信息. ...

  6. Spring 异步调用,一行代码实现!舒服,不接受任何反驳~

    本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labs 的 lab-29 目录. 原创不易,给点个 Star 嘿,一起冲鸭! 1. 概述 在日 ...

  7. 如何在Spring Boot App中集成H2数据库

    你好朋友, 在本教程中,我们将尝试探索如何在Spring Boot应用程序中与H2数据库集成. 在进行检查之前,让我们了解有关H2数据库的一些基础知识,如下所述,然后我们将讨论H2数据库与Spring ...

  8. spring 4.3.x_如何在Spring 3.x中使用事件

    spring 4.3.x 创建松耦合应用程序的概念和技术很多,Event是其中之一. 事件可以消除代码中的许多依赖关系. 有时没有事件,很难实施SRP *. Java中的Observable接口可以帮 ...

  9. 如何在Spring 3.x中使用事件

    创建松耦合应用程序的概念和技术很多,Event是其中之一. 事件可以消除代码中的许多依赖关系. 有时没有事件,很难实施SRP *. Java中的Observable接口可以帮助我们实现事件(通过Obs ...

最新文章

  1. TCMalloc(Thread-Caching malloc) 基本设计原理
  2. Android开发之Handler的使用方法(源代码分享)
  3. (转载)配置apue.h
  4. sql中Cast()函数的用法
  5. 信息系统项目管理师论文基础知识
  6. python flask 大文件 下载_python flask 建站之文件上传下载(一)
  7. mysql为什么选innodb_为什么现在的MySQL都要使用innoDB引擎-Go语言中文社区
  8. [Java] 如何学Java
  9. MySQL下bin-log的三种模式(ROW、Statement、Mixed)
  10. Android TextToSpeech TTS中文文本转语音(语音合成)
  11. 查找系统大文件占用WizTree 4.05.64位
  12. 字节跳动CEO梁汝波接棒张一鸣一年:使命是我们前进的动力
  13. 7-9 幂集(回溯法) (20 分)(C语言版)
  14. 实习总结与收获(2021.6.7-2021.8.27)
  15. C#实现jQuery的方法连缀
  16. 奥鹏福师计算机应用基础在线作业答案,福师11秋《计算机应用基础》在线作业一、二...
  17. 一位医疗 AI 创业者的自述:这个行业到底需要什么样的产品?...
  18. DC基础知识介绍-Design Compiler(二)
  19. 微信公众号网页授权产生 错误代码:40029 真正解决
  20. 仿网易163邮箱界面模板

热门文章

  1. How to debug Material delta download
  2. 列出Sell in application所有可以传输的application object
  3. 如何处理错误消息Unable to install breakpoint due to missing line number attributes
  4. SAML request和response的一对典型例子,供将来参考
  5. opencv 4快速入门_基于OpenCV的图像融合
  6. 冒泡排序python例题_零基础学python 15 经典算法:冒泡排序法(课后习题答案)...
  7. python目标检测答案_你好,这里有一份2019年目标检测指南
  8. 禁用内存清理_win10电脑开机内存占用高达80%以上如何解决
  9. python中forward函数的引用_pytorch 调用forward 的具体流程
  10. 计算机为什么检测不到u盘启动项,BIOS设置U盘为第一启动项后检测不到如何解决?...