异步编排之商品详情查询

  • 异步编排
    • CompletableFuture介绍
      • 创建异步对象
      • 计算完成时回调方法
      • handle 方法
      • 线程串行化方法
      • 两任务组合
        • 全部完成
        • 一个完成即可
      • 多任务组合
  • 业务描述
  • 代码
    • 准备工作
      • gulimall-product
        • application.yml
    • 业务代码
      • ItemController
      • SkuInfoService
      • SkuInfoServiceImpl

异步编排

CompletableFuture介绍

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了addListener等多个扩展方法;Google guava也提供了通用的扩展Future;Scala也提供了简单易用且功能强大的Future/Promise异步编程模式。

作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?

在Java 8中, 新增加了一个包含50个方法左右的类:

CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。

计算完成时回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);

whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务

whenComplete 和 whenCompleteAsync 的区别:

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。 whenCompleteAsync:是执行把
whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

代码示例:

public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {@Overridepublic Object get() {//异步线程执行并返回运算值System.out.println(Thread.currentThread().getName() + "\t completableFuture");int i = 10 / 0;return 10;}}).whenComplete(new BiConsumer<Object, Throwable>() {@Overridepublic void accept(Object o, Throwable throwable) {//接受上一步返回值以及可能出现的异常System.out.println("接收到的对象:" + o.toString());System.out.println("抛出异常为:" + throwable);}}).exceptionally(new Function<Throwable, Object>() {@Overridepublic Object apply(Throwable throwable) {System.out.println("接受到的异常:" + throwable);//对异常进行处理,返回其他可能因为异常导致的其他结果return 100;}});System.out.println(future.get());}

handle 方法

handle 是执行任务完成时对结果的处理。
handle 是在任务完成后再执行,还可以处理异常的任务。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

线程串行化方法

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
  • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
  • thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
  • thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

 public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {@Overridepublic Object get() {//异步线程执行并返回运算值System.out.println(Thread.currentThread().getName() + "\t completableFuture");int i = 10 / 0;return 10;}}).thenApply(new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer o) {System.out.println("thenApply方法,上次返回结果:" + o);return  o * 2;}}).whenComplete(new BiConsumer<Object, Throwable>() {@Overridepublic void accept(Object o, Throwable throwable) {//接受上一步返回值以及可能出现的异常System.out.println("接收到的对象:" + o.toString());System.out.println("抛出异常为:" + throwable);}}).exceptionally(new Function<Throwable, Object>() {@Overridepublic Object apply(Throwable throwable) {System.out.println("接受到的异常:" + throwable);//对异常进行处理,返回其他可能因为异常导致的其他结果return 100;}}).handle(new BiFunction<Integer, Throwable, Integer>() {@Overridepublic Integer apply(Integer integer, Throwable throwable) {System.out.println("接收到的对象:" + integer);System.out.println("接受到的异常:" + throwable);return 8888;}});System.out.println(future.get());}

两任务组合

全部完成

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

两个任务必须都完成,触发该任务。

thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值

thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。

runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。

一个完成即可

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

当两个任务中,任意一个future任务完成的时候,执行任务。

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

allOf:等待所有任务完成

anyOf:只要有一个任务完成

业务描述

我们这个功能主要是满足,在商品详情页面查询时,通过一个接口去获取商品的有关的所有信息

  1. sku基本信息
  2. sku图片信息
  3. 获取spu销售属性组合
  4. 获取spu介绍
  5. 获取spu规格参数信息

其中有些查询是可以同时进行的,有些操作则需要在其他步骤返回结果后,拿到结果去继续查询,最后返回结果。
所以我们为了优化接口的加载速度可以选择异步编排

代码

准备工作

因为我们准备使用异步编排的代码方式去查询数据
异步编排需要用到线程池我们需要在程序初始化之前创建线程池

gulimall-product

线程池配置类,参数从配置文件获取

@ConfigurationProperties(prefix = "gulimall.thread")
@Component
@Data
public class ThreadPoolConfigProperties {private Integer coreSize;private Integer maxSize;private Integer keepAliveTime;
}

spring容器类,初始化线程池

/*** <p>Title: MyThreadConfig</p>* Description:配置线程池* date:2020/6/25 10:58*/
// 开启这个属性配置
//@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {@Beanpublic ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties threadPoolConfigProperties){return new ThreadPoolExecutor(threadPoolConfigProperties.getCoreSize() == null ? 20 : threadPoolConfigProperties.getCoreSize(), threadPoolConfigProperties.getMaxSize() == null ? 200 : threadPoolConfigProperties.getMaxSize(),threadPoolConfigProperties.getKeepAliveTime() == null ? 10 : threadPoolConfigProperties.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingDeque<>(10000), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());}
}

另外我们可以在pom文件中引入jar包来在配置文件中配置时达到提示的效果

     <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>

application.yml

配置文件中

glmall:thread:core-size: 20max-size: 200keep-alive-time: 10

业务代码

ItemController

/*** 查询返回商品详情*/
@RestController
@RequestMapping("/item")
public class ItemController {@Autowiredprivate SkuInfoService skuInfoService;@GetMapping("/{skuId}")public R skuItem(@PathVariable("skuId") Long skuId) throws ExecutionException, InterruptedException {SkuItemVo vo = skuInfoService.item(skuId);return R.ok().put("data", vo);}
}

SkuInfoService

SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException;

SkuInfoServiceImpl

@Service("skuInfoService")
public class SkuInfoServiceImpl extends ServiceImpl<SkuInfoDao, SkuInfoEntity> implements SkuInfoService {@Autowiredprivate ThreadPoolExecutor executor;@Autowiredprivate SkuImagesService imagesService;@Autowiredprivate SpuInfoDescService spuInfoDescService;@Autowiredprivate AttrGroupService attrGroupService;@Autowiredprivate SkuSaleAttrValueService skuSaleAttrValueService;@Overridepublic SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {SkuItemVo skuItemVo = new SkuItemVo();CompletableFuture<SkuInfoEntity> infoFutrue = CompletableFuture.supplyAsync(() -> {//1 sku基本信息SkuInfoEntity info = getById(skuId);skuItemVo.setInfo(info);return info;}, executor);CompletableFuture<Void> ImgageFuture = CompletableFuture.runAsync(() -> {//2 sku图片信息List<SkuImagesEntity> images = imagesService.getImagesBySkuId(skuId);skuItemVo.setImages(images);}, executor);CompletableFuture<Void> saleAttrFuture =infoFutrue.thenAcceptAsync(res -> {//3 获取spu销售属性组合List<ItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrsBuSpuId(res.getSpuId());skuItemVo.setSaleAttr(saleAttrVos);},executor);CompletableFuture<Void> descFuture = infoFutrue.thenAcceptAsync(res -> {//4 获取spu介绍SpuInfoDescEntity spuInfo = spuInfoDescService.getById(res.getSpuId());skuItemVo.setDesc(spuInfo);},executor);CompletableFuture<Void> baseAttrFuture = infoFutrue.thenAcceptAsync(res -> {//5 获取spu规格参数信息List<SpuItemAttrGroup> attrGroups = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());skuItemVo.setGroupAttrs(attrGroups);}, executor);// 等待所有任务都完成再返回CompletableFuture.allOf(ImgageFuture,saleAttrFuture,descFuture,baseAttrFuture).get();return skuItemVo;}}

其他代码可以参考github

高级篇代码地址

谷粒商城高级篇(38)——异步编排之商品详情查询相关推荐

  1. 谷粒商城高级篇上(未完待续)

    谷粒商城高级篇(上)保姆级整理 之前整理了基础篇,Typora提示将近20000词,谷粒商城基础篇保姆级整理 在学高级篇的时候,不知不觉又整理了两万多词,做了一阶段,先发出来,剩余部分整理好了再发.自 ...

  2. java 商城 商品查询_Javaweb网上商城项目实战(17)实现商品详情查询

    原理分析 具体实现 下面是商品详情页面product_info.jsp显示的样子,我们最初的模板的静态资源已经写死了, 这里我们需要先对这个页面进行改造,使得到时候主页点击商品能输出对应的商品详情页面 ...

  3. 谷粒商城高级篇笔记1

    这里写自定义目录标题 0.ElasticSearch 1.Nginx配置域名问题 01.Nginx(反向代理) 配置 02.Nginx(负载均衡)+ 网关 配置 03.Nginx动静分离 2.JMet ...

  4. 谷粒商城笔记+踩坑(15)——商品详情搭建+异步编排

    导航: 谷粒商城笔记+踩坑汇总篇 目录 1.搭建页面环境 1.1.配置 Nginx 和 网关 1.2.动静资源配置 1.3.搜索页到详情页跳转 2.模型类抽取和controller 2.1.分析首页需 ...

  5. 【谷粒商城高级篇】商品服务 商品上架

    谷粒商城笔记合集 分布式基础篇 分布式高级篇 高可用集群篇 ===简介&环境搭建=== ===Elasticsearch=== 项目简介与分布式概念(第一.二章) Elasticsearch: ...

  6. 谷粒商城-高级篇-aiueo

    105 初步检索 105.1 _cat GET /_cat/nodes : 查看所有节点 GET /_cat/health : 查看es健康状况 GET /_cat/master : 查看主节点 GE ...

  7. 【谷粒商城高级篇】商城业务:商品检索

    谷粒商城笔记合集 分布式基础篇 分布式高级篇 高可用集群篇 ===简介&环境搭建=== ===Elasticsearch=== 项目简介与分布式概念(第一.二章) Elasticsearch: ...

  8. 【谷粒商城高级篇】Elasticsearch:全文检索

    谷粒商城笔记合集 分布式基础篇 分布式高级篇 高可用集群篇 ===简介&环境搭建=== ===Elasticsearch=== 项目简介与分布式概念(第一.二章) Elasticsearch: ...

  9. 谷粒商城高级篇资料_一文搞定剑指offer面试题【分文别类篇】

    点击上方"蓝字",关注了解更多 数组: 面试题3:数组中重复的数字 面试题4:二维数组中的查找 面试题21:调整数组顺序使奇数位于偶数前面 面试题39:数组中出现次数超过一半的数字 ...

最新文章

  1. 统计简单学_回归分析
  2. mysql数据迁移到teradata_Mysql迁移到达梦数据库-Mysql到DM的应用迁移-给自增列赋值-GroupBy语法不兼容...
  3. python列表功能默写_Python list(列表)功能详解
  4. vuejs怎么在服务器上发布部署
  5. 【译】XNA Shader 程序设计(二)
  6. Qt for Android环境配置
  7. c语言语音控制游戏文献,C语言课程设计-基于C语言推箱子游戏设计-毕业论文文献.doc...
  8. (转载)arcgis for js - 解决加载天地图和WMTS服务,WMTS服务不显示的问题,以及wmts服务密钥。...
  9. C#基础4:函数+ref和out参数
  10. java画板代码_java 画板画图程序
  11. Asp.net自定义控件开发任我行(附1)-属性一览众山小
  12. 10个CSS技巧,极大提升用户体验
  13. pta森森快递(线段树 + 贪心 + 区间修改)
  14. 108个Mac电脑快捷键大全
  15. 火柴人小程序linux,推荐这3款射击类的火柴人小程序,一起冲冲冲吧!
  16. 交互式shell脚本实操
  17. Python爬虫天气预报(小白入门)
  18. CCSv5.3的安装
  19. 记那些心不安分的日子
  20. P110 课时111.多线程更新UI数据

热门文章

  1. android 电量性能优化
  2. 解决 ajax 跨域
  3. 我怎么看技术人员去创业公司这件事
  4. Android 将文本和图片写入到pdf文件以及读取手机里的pdf文件
  5. Keras的BN你真的冻结对了吗
  6. QrCode类生成二维码海报
  7. 使用Python对股票数据进行数据分析(二)-使用ta-lib库获取日线行情、5日均线、10日均线行情并显示
  8. 京东数据分析工具,同行商家数据快速查看对比
  9. 高德地图不显示定位点
  10. android os x86下载,Android-x86 Lineage OS 14.1-r3下载(2019/10/23官方更新版)