14-CompletableFuture异步编排
一,前置知识
1,用户线程和守护线程
1.所有用户线程执行完毕,程序就会退出,不再等待守护线程。
public static void main(String[] args) {new Thread(()-> System.out.println("当前线程"+Thread.currentThread().getName()+"是"+(Thread.currentThread().isDaemon()?"守护线程":"用户线程"))).start();}
2.用户线程不结束,程序就不会退出
public class DemoA {public static void main(String[] args) {Thread thread = new Thread(DemoA::run,"线程1");thread.start();}private static void run() {System.out.println("当前线程" + Thread.currentThread().getName() + "是" + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程"));while (true) ;}
}
3.不管守护线程结束没有,只要用户线程结束,程序就会退出
public class DemoA {public static void main(String[] args) {Thread thread = new Thread(DemoA::run,"线程1");//将线程1设置为守护线程thread.setDaemon(true);thread.start();}private static void run() {System.out.println("当前线程" + Thread.currentThread().getName() + "是" + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程"));while (true) ;}
}
2.回首FutureTask
1.FutureTask存在的问题
public class Demob {public static void main(String[] args)throws Exception {FutureTask<String> futureTask = new FutureTask<>(Demob::call);new Thread(futureTask,"t1").start();//异步结果集的展现 ---- 会阻塞//System.out.println(futureTask.get());//轮询的方式去问 --- 消耗cpu资源while (true) {if (futureTask.isDone()){System.out.println(futureTask.get());break;}}System.out.println("main结束");}private static String call() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "yhd";}
}
从上面的代码可以看出FutureTask存在的问题:
1.get()异步结果集的展现 ---- 会阻塞
2.futureTask.isDone()轮询的方式去问 — 消耗cpu资源
2.想完成一些复杂的任务
1.应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知。
2.将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果。
3.当Future集合中某个任务最快结束时,返回结果。
4.等待Future集合中的所有任务都完成。
二,CompletableFuture
1.介绍
异步函数式编程,Future接口的扩展与增强版。
可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
1.完成通知回调
2.类似Linux管道符的形式,可以异步的计算,上一个计算结果可以给下一个,结合lambda表达式和函数式编程串起来转换和组合获得结果。
2.实例化
如果不指定线程池,默认的线程都是守护线程
public class DemoC {private static ThreadPoolExecutor pool=new ThreadPoolExecutor(1,5,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1));public static void main(String[] args) throws Exception{//创建一个有返回结果的实例CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "yhd", pool);//创建一个没有返回结果的实例CompletableFuture<Void> runAsync = CompletableFuture.runAsync(System.out::println);pool.shutdown();}
}
3.异步+回调
whenComplete 和 join 的区别:
一个时执行完返回结果,一个是主动获取结果
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
public class DemoC {private static ThreadPoolExecutor pool=new ThreadPoolExecutor(1,5,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1));public static void main(String[] args) throws Exception{//创建一个有返回结果的实例 计算完成时回调 发生异常时执行CompletableFuture.supplyAsync(() -> "yhd", pool).whenComplete(DemoC::accept).exceptionally(Throwable::toString);pool.shutdown();}private static void accept(String result, Throwable e) {System.out.println(result);}
}
4.结果处理
handle 是执行任务完成时对结果的处理。
handle 是在任务完成后再执行,还可以处理异常的任务。
/*** @author yhd* @createtime 2020/11/15 23:10*/
public class DemoB {public static void main(String[] args) {CompletableFuture.supplyAsync(()->"尹会东").handle(DemoB::apply).whenCompleteAsync(((s, throwable) -> System.out.println(s)));}private static String apply(String s, Throwable throwable) {if (null != throwable)return throwable.getMessage();return s + " 牛逼!";}
}
5.线程串行化
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作
带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。
public class DemoE {public static void main(String[] args) {CompletableFuture<Integer> thenApply = CompletableFuture.supplyAsync(() -> 1).thenApply(integer -> 1);CompletableFuture<Void> thenAccept = CompletableFuture.supplyAsync(() -> 1).thenAccept(System.out::println);CompletableFuture<Void> thenRun = CompletableFuture.supplyAsync(() -> 1).thenRun(System.out::println);System.out.println(thenApply.join());}
}
6.任务组合
1.两任务组合 - 都要完成
两个任务必须都完成,触发该任务。
thenCombine:组合两个future,获取两个future任务的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。
public class DemoF {public static void main(String[] args) {CompletableFuture.supplyAsync(() -> "hello").thenApplyAsync(t -> t + " world!").thenCombineAsync(CompletableFuture.completedFuture(" CompletableFuture"), (t, u) -> t + u).whenComplete(DemoF::accept);}private static void accept(String t, Throwable u) {System.out.println(t);}
}
2.两任务组合 - 一个完成
当两个任务中,任意一个future任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值
3.多任务组合
allOf:等待所有任务完成
anyOf:只要有一个任务完成
7.计算接口性能
1.一淘
/*** @author yhd* @createtime 2020/11/15 15:11* 查询多个电商网站同一个商品的价格* 同步和异步两种方式*/
public class DemoD {static List<Gmall> gmalls= Arrays.asList(new Gmall("京东"),new Gmall("拼多多"),new Gmall("淘宝"),new Gmall("唯品会"),new Gmall("分期乐"));/*** 同步* @param gmalls* @param productName* @return*/public static List<String> getPriceSync(List<Gmall> gmalls,String productName){return gmalls.stream().map(gmall->String.format(productName+"in %s price is %d ",gmall.getGmallName(),gmall.getPriceByProductName(productName))).collect(Collectors.toList());}/*** 异步* @param gmalls* @param productName* @return*/public static List<String> getPriceAsync(List<Gmall> gmalls,String productName){return gmalls.stream().map(gmall->CompletableFuture.supplyAsync(()->String.format(productName+"in %s price is %d ",gmall.getGmallName(),gmall.getPriceByProductName(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());}public static void main(String[] args) {long startTime = System.currentTimeMillis();getPriceSync(gmalls,"金鳞岂是池中物").forEach(System.out::println);long endTime = System.currentTimeMillis();System.out.println("同步"+ (endTime-startTime));System.out.println("------------------------");long s = System.currentTimeMillis();getPriceAsync(gmalls,"金鳞岂是池中物").forEach(System.out::println);long e = System.currentTimeMillis();System.out.println("异步"+ (e-s));}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class Gmall{private String gmallName;public Integer getPriceByProductName(String productName){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return (int)(Math.random()*100)+1;}
}
结果:
=================================
金鳞岂是池中物in 京东 price is 100
金鳞岂是池中物in 拼多多 price is 12
金鳞岂是池中物in 淘宝 price is 25
金鳞岂是池中物in 唯品会 price is 35
金鳞岂是池中物in 分期乐 price is 60
同步5081
------------------------
金鳞岂是池中物in 京东 price is 57
金鳞岂是池中物in 拼多多 price is 68
金鳞岂是池中物in 淘宝 price is 84
金鳞岂是池中物in 唯品会 price is 16
金鳞岂是池中物in 分期乐 price is 20
异步1387
=================================
2.检索
/*** @author yhd* @createtime 2020/11/15 23:40*/
public class DemoC {//模拟数据库private static Map<String,Company> db=new HashMap<>();//模拟查询条件private static List<String> persons;//初始化数据static {persons=Arrays.asList("马云","马化腾","李彦宏","张朝阳","刘强东","王兴");db.put("马云",new Company("马云","阿里巴巴"));db.put("马化腾",new Company("马化腾","腾讯"));db.put("李彦宏",new Company("李彦宏","百度"));db.put("张朝阳",new Company("张朝阳","搜狐"));db.put("刘强东",new Company("刘强东","京东"));db.put("王兴",new Company("王兴","美团"));}//模拟去数据库查询public static Company selectDB(String key){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return db.get(key);}public static void main(String[] args) {Sync();System.out.println();Async();System.out.println();System.out.println(result());}//同步查询public static void Sync(){Long startTime=System.currentTimeMillis();persons.stream().map(key->selectDB(key).getWorks()).collect(Collectors.toList()).forEach(System.out::println);Long endTime=System.currentTimeMillis();System.out.println(endTime-startTime);}//异步查询public static void Async(){Long startTime=System.currentTimeMillis();persons.stream().map(key->CompletableFuture.supplyAsync(()->selectDB(key).getWorks())).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()).forEach(System.out::println);Long endTime=System.currentTimeMillis();System.out.println(endTime-startTime);}//转换成map返回public static Map<String,String> result(){return persons.stream().map(key -> CompletableFuture.supplyAsync(() -> selectDB(key))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toMap(Company::getPerson,Company::getWorks));}}
@Data
@NoArgsConstructor
@AllArgsConstructor
class Company{private String person;private String works;
}
结果
阿里巴巴
腾讯
百度
搜狐
京东
美团
6053阿里巴巴
腾讯
百度
搜狐
京东
美团
1406{张朝阳=搜狐, 马云=阿里巴巴, 王兴=美团, 李彦宏=百度, 刘强东=京东, 马化腾=腾讯}
3.商品详情
/*** @author yhd* @createtime 2020/11/16 0:45*/
public class DemoD {private static Map<String, Result> result = new HashMap<>();private static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));public static void main(String[] args) {Long s = System.currentTimeMillis();CompletableFuture<Result> skuInfoFuture = CompletableFuture.supplyAsync((DemoD::get), pool);CompletableFuture<Void> skuPriceFuture = CompletableFuture.runAsync(DemoD::run, pool);CompletableFuture<Result> skuImgsFuture = skuInfoFuture.thenApplyAsync(DemoD::apply, pool);CompletableFuture<Result> spuInfoFuture = skuInfoFuture.thenApplyAsync(DemoD::apply2, pool);CompletableFuture.allOf(skuInfoFuture, skuPriceFuture, skuImgsFuture, spuInfoFuture).join();System.out.println(result);Long e = System.currentTimeMillis();System.out.println("查詢總計耗時:"+(e-s));pool.shutdown();}/*** 查询sku基本信息*/public static Result getSkuInfo(Integer key) {sleep();return new Result(1, "skuInfo");}/*** 查询sku图片信息*/public static Result getSkuImgs(Integer key) {sleep();return new Result(1, "skuImgs");}/*** 查询spu销售属性*/public static Result getSpuInfo(Integer key) {sleep();return new Result(1, "spuInfo");}/*** 查询sku价格*/public static Result getSkuPrice(Integer key) {sleep();return new Result(1, "skuPrice");}public static void sleep() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}}private static Result get() {Result skuInfo = getSkuInfo(1);result.put(skuInfo.getName(), skuInfo);return skuInfo;}private static void run() {Result skuPrice = getSkuPrice(1);result.put(skuPrice.getName(), skuPrice);}private static Result apply(Result res) {Result skuImgs = getSkuImgs(res.getId());result.put(skuImgs.getName(), skuImgs);return res;}private static Result apply2(Result res) {Result spuInfo = getSpuInfo(res.getId());result.put(spuInfo.getName(), spuInfo);return res;}
}@Data
@NoArgsConstructor
@AllArgsConstructor
class Result {private Integer id;private String name;
}
14-CompletableFuture异步编排相关推荐
- 谷粒商城十五商品详情CompletableFuture异步编排
多线程异步任务的问题 例如a,b,c三个异步任务,不是随机运行就可以,它们还有一定的关系,c需要等待a的返回结果,b不需要等待谁的结果. 当异步任务产生一些关系和顺序之后,我们要编排好它们的关系进行调 ...
- Day140-142.尚品汇:AOP+Redis缓存+redssion分布式锁、CompletableFuture异步编排、首页三级分类展示、Nginx静态代理
目录 Day08 一.获取商品详情 加入缓存 二.全局缓存:分布式锁与aop 整合 三.布隆过滤器 四.CompletableFuture 异步编排 jdk1.8 Day09 1. 将item 改为多 ...
- CompletableFuture异步编排(2)
1.handle public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? e ...
- 九、CompletableFuture异步编排
一.认识Future任务机制和FutureTask 1.Future概述 1.在JDK1.5之后提供了一个多线程的新的处理接口Callable,该接口需要与Future接口整合在一起,而后再进行最终的 ...
- 【多线程】优雅使用线程池结合CompletableFuture实现异步编排
文章目录 参考 1.线程池引入 2.Executors 2.1.概述 2.2.Executors缺陷 3.优雅的创建线程池 3.1.正确挑选方法 3.2.线程池配置类 4.线程池执行流程 5.Comp ...
- 谷粒商城高级篇(38)——异步编排之商品详情查询
异步编排之商品详情查询 异步编排 CompletableFuture介绍 创建异步对象 计算完成时回调方法 handle 方法 线程串行化方法 两任务组合 全部完成 一个完成即可 多任务组合 业务描述 ...
- Java异步编排 2021-09-11
Java 异步并发与池化技术 异步编排 Java 异步并发与池化技术 一.什么是异步调用 二 Future接口 三.异步开启 四.CompletableFuture接口实现异步编排 4.1串行关系执行 ...
- 谷粒商城笔记+踩坑(15)——商品详情搭建+异步编排
导航: 谷粒商城笔记+踩坑汇总篇 目录 1.搭建页面环境 1.1.配置 Nginx 和 网关 1.2.动静资源配置 1.3.搜索页到详情页跳转 2.模型类抽取和controller 2.1.分析首页需 ...
- java 并发 异步_Java并发 CompletableFuture异步编程的实现
前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作.如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得 ...
- 尚硅谷谷粒商城第十二天 商品详情页及异步编排
1. 商品详情 当用户搜索到商品,肯定会点击查看,就会进入商品详情页,接下来我们完成商品详情页的展示. 商品详情浏览量比较大,并发高,我们会独立开启一个微服务,用来展示商品详情. 1.1. 创建mod ...
最新文章
- win10鼠标灵敏度怎么调_和平精英最稳压枪灵敏度怎么调教程,适合所有段位以及适合国际版PUBG手游压枪...
- .NET零基础入门之01:开篇及CSharp程序、解决方案的结构
- CM: How is attachment version implemented
- jqGrid细节备注—pager文字的设置,更改默认的提交方式
- Go学习笔记—多线程
- SpringAOP的Aspectj方式*
- Django基本概念、安装、配置到实现框架,Xmind学习笔记
- Oracle 备份与还原
- 流浪地球票房43亿元 今起电影最低票价降10元
- AndroidのTextView之CompoundDrawable那些坑
- Spring之JDBC模板jdbcTemplate
- MySQL数据库反向生成powerdesigner模型
- java成员方法tostring_Java 工具类-toString
- 高等数学公式【上册+下册】
- 智慧交通信息服务平台产品解决方案
- 认识电子计算机评课稿,《走进计算机》评课稿.docx
- Ubuntu下ATI显卡风扇转速调节脚本
- 半圆形进度条(html)
- 使用Excel VBA制作成绩统计图----(嵌入模式)
- Shiro和SpringSecurity