JUC并发编程-CompletableFuture
CompletableFuture基本介绍
- 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture
核心的四个静态方法(分为两组)
利用核心的四个静态方法创建一个异步操作 | 不建议用new
关键就是 |有没有返回值|是否用了线程池|
参数说明:
没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。
如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。
1.runAsync无返回值
public class CompletableFutureBuildDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());//停顿几秒线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(voidCompletableFuture.get());}
}
//ForkJoinPool.commonPool-worker-9 //默认的线程池
//null --- 没有返回值
2.runAsync+线程池
public class CompletableFutureBuildDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());//停顿几秒线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}},executorService);System.out.println(voidCompletableFuture.get());executorService.shutdown(); }
}
//pool-1-thread-1 ----指定的线程池
//null ----没有返回值
supplyAsync有返回值
3 supplyAsync
public class CompletableFutureBuildDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "helllo supplyasync";});System.out.println(objectCompletableFuture.get());}
}
//ForkJoinPool.commonPool-worker-9---------默认的线程池
//helllo supplyasync-------------supplyasync有返回值了
4 supplyAsync+线程池
public class CompletableFutureBuildDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "helllo supplyasync";},executorService);System.out.println(objectCompletableFuture.get());executorService.shutdown();}
}
//pool-1-thread-1
//helllo supplyasync-------------supplyasync有返回值了
减少阻塞和轮询whenComplete
CompletableFuture
通过whenComplete
来减少阻塞和轮询(自动回调)
public class CompletableFutureUseDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName()+"--------副线程come in");int result = ThreadLocalRandom.current().nextInt(10);//产生随机数try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return result;}).whenComplete((v,e) -> {//没有异常,v是值,e是异常if(e == null){System.out.println("------------------计算完成,更新系统updataValue"+v);}}).exceptionally(e->{//有异常的情况e.printStackTrace();System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());return null;});//线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程//ForkJoinPool 类似于守护线程mian线程结束的太快,CompletableFuture还没执行完也会结束System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}}
}
//ForkJoinPool.commonPool-worker-9--------副线程come in(这里用的是默认的ForkJoinPool)
//main线程先去忙其他任务
//------------------计算完成,更新系统updataValue3
假如换用自定义线程池/异常情况的展示,设置一个异常 int i = 10 / 0 ;
public class CompletableFutureUseDemo
{public static void main(String[] args) throws ExecutionException, InterruptedException{ExecutorService threadPool = Executors.newFixedThreadPool(3);try{CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("-----1秒钟后出结果:" + result);if(result > 2){int i=10/0;}return result;},threadPool).whenComplete((v,e) -> {if (e == null) {System.out.println("-----计算完成,更新系统UpdateValue:"+v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());return null;});System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");}catch (Exception e){e.printStackTrace();}finally {threadPool.shutdown();}//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程//try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }}}
join和get对比
功能几乎一样,区别在于编码时是否需要抛出异常
get()方法需要抛出异常
join()方法不需要抛出异常
public class Chain {public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {return "hello 12345";});System.out.println(completableFuture.get());}}public class Chain {public static void main(String[] args) {//不需要抛出异常CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {return "hello 12345";});System.out.println(completableFuture.join());}
}
CompletableFuture常用API
getNow调用的时候如果计算完了,就拿取这个计算完的值;否则就拿备胎值
1.获得结果和触发计算
获取结果
public T get() 不见不散,容易阻塞
public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常
public T join() 类似于get(),区别在于是否需要抛出异常
public T getNow(T valueIfAbsent) 没有计算完成的情况下,给一个替代结果
立即获取结果不阻塞
计算完,返回计算完成后的结果
没算完,返回设定的valueAbsent(直接返回了备胎值xxx)
主动触发计算
public boolean complete(T value) 是否立即打断get()方法返回括号值
(执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)
public class CompletableFutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);//执行需要2秒} catch (InterruptedException e) {e.printStackTrace();}return "abc";});try {TimeUnit.SECONDS.sleep(1);//等待需要1秒} catch (InterruptedException e) {e.printStackTrace();}// System.out.println(uCompletableFuture.getNow("xxx"));//执2-等1 返回xxx//执2-等1 返回true+备胎值1111111
// 反之 则是 false 输出 abc System.out.println(uCompletableFuture.complete("1111111")+"\t"+uCompletableFuture.get());}
}
对计算结果进行处理
thenApply
计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。
public class CompletableFutureDemo2
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{ExecutorService threadPool = Executors.newFixedThreadPool(3);//当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("111");return 1024;},threadPool ).thenApply(f -> {System.out.println("222");return f + 1;}).thenApply(f -> {//int age = 10/0; // 异常情况:那步出错就停在那步。System.out.println("333");return f + 1;}).whenCompleteAsync((v,e) -> {System.out.println("*****v: "+v);}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("-----主线程结束,END");threadPool .shutdown();}
}
//-----正常情况
//111
//222
//333
//----计算结果: 6//-----异常情况
//111
//异常.....
handle
类似于thenApply,但是有异常的话仍然可以往下走一步。
public class CompletableFutureDemo2
{public static void main(String[] args) throws ExecutionException, InterruptedException{//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("111");return 1024;}).handle((f,e) -> {int age = 10/0;//异常语句System.out.println("222");return f + 1;}).handle((f,e) -> {System.out.println("333");return f + 1;}).whenCompleteAsync((v,e) -> {System.out.println("*****v: "+v);}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("-----主线程结束,END");// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }}
}
//-----异常情况
//111
//333
//异常,可以看到多走了一步333
对计算结果进行消费
接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口
thenAccept
不需要return
public static void main(String[] args) throws ExecutionException, InterruptedException
{CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f + 2;}).thenApply(f -> {return f + 3;}).thenAccept(r -> System.out.println(r));
}
//6
//消费一下,直接得到6
补充:Code之任务之间的顺序执行
1.thenRun
thenRun(Runnable runnable)
任务A执行完执行B,并且B不需要A的结果
2.thenAccept
thenAccept(Consumer action)
任务A执行完执行B,B需要A的结果,但是任务B无返回值
3.thenApply
thenApply(Function fn)
任务A执行完执行B,B需要A的结果,同时任务B有返回值
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
//null System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
//resultA打印出来的 null因为没有返回值System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
//resultAresultB 返回值
CompleteFuture和线程池说明(非常重要)
上面的几个方法都有普通版本和后面加Async的版本
以 thenRun 和 thenRunAsync 为例,有什么区别?
先看结论
1. 没有传入自定义线程池,都用默认线程池ForkJoinPool
2. 传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池
1.调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池
2.调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
3.也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)
2-1
public class CompletableFutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";},threadPool).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});}
}
//1号任务 pool-1-thread-1
//2号任务 pool-1-thread-1
//3号任务 pool-1-thread-1
//4号任务 pool-1-thread-1
2-2
public class CompletableFutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";},threadPool).thenRunAsync(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});}
}
//1号任务 pool-1-thread-1
//2号任务 ForkJoinPool.commonPool-worker-9---这里另起炉灶重新调用了默认的ForkJoinPool
//3号任务 ForkJoinPool.commonPool-worker-9
//4号任务 ForkJoinPool.commonPool-worker-9
3
public class CompletableFutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
// try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";},threadPool).thenRun(()->{// try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{// try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{//try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});}
}
//1号任务 1号任务 pool-1-thread-1
//2号任务 main
//3号任务 main
//4号任务 main
对计算速度进行选用
applyToEither
方法,那个快用哪个
public class CompletableFutureDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException{CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }return "play1 ";});CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }return "play2";});CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> {//对计算速度进行选用return f + " is winner";});System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());}
}
//ForkJoinPool.commonPool-worker-9 ---come in
//ForkJoinPool.commonPool-worker-2 ---come in
//main play2 is winner
对计算结果进行合并
thenCombine
合并两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理
先完成的先等着,等待其它分支任务
public class CompletableFutureDemo2
{public static void main(String[] args) throws ExecutionException, InterruptedException{CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return 10;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return 20;});CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return x + y;});System.out.println(thenCombineResult.get());}
}
//30
- 合并版本
public class CompletableFutureDemo2
{public static void main(String[] args) throws ExecutionException, InterruptedException{CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");return 20;}), (x,y) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");return x + y;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");return 30;}),(a,b) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");return a + b;});System.out.println("-----主线程结束,END");System.out.println(thenCombineResult.get());}
}
JUC并发编程-CompletableFuture相关推荐
- 【尚硅谷】大厂必备技术之JUC并发编程——笔记总结
[JUC并发编程01]JUC概述 关键字:进程和线程.进程和线程.wait和sleep.并发与并行.管程.用户线程和守护线程 [JUC并发编程02]Lock接口 关键字:synchronized.Lo ...
- ❤️《JUC并发编程从入门到高级》(建议收藏)❤️
JUC并发编程 1.什么是JUC JUC的意思就是java并发编程工具包,与JUC相关的有三个包:java.util.concurrent.java.util.concurrent.atomic.ja ...
- 多线程进阶=》JUC并发编程02
在JUC并发编程01中说到了,什么是JUC.线程和进程.Lock锁.生产者和消费者问题.8锁现象.集合类不安全.Callable(简单).常用辅助类.读写锁 https://blog.csdn.net ...
- 基于《狂神说Java》JUC并发编程--学习笔记
前言: 本笔记仅做学习与复习使用,不存在刻意抄袭. -------------------------------------------------------------------------- ...
- Java JUC并发编程详解
Java JUC并发编程详解 1. JUC概述 1.1 JUC简介 1.2 进程与线程 1.2 并发与并行 1.3 用户线程和守护线程 2. Lock接口 2.1 Synchronized 2.2 什 ...
- 多线程进阶=》JUC并发编程
多线程进阶=>JUC并发编程 1.什么是JUC JUC是java.util.concurrent的简写. 用中文概括一下,JUC的意思就是java并发编程工具包. 并发编程的本质就是 ...
- 周阳老师JUC并发编程
1. 序章 1)JUC是什么? java.util.concurrent在并发编程中使用的工具包 对JUC知识的高阶内容讲解和实战增强 2)为什么学习并用好多线程极其重要? 硬件方面: 摩尔定律: 它 ...
- JUC并发编程(java util concurrent)(哔站 狂神说java juc并发编程 摘录笔记)
JUC并发编程(java util concurrent) 1.什么是JUC JUC并不是一个很神秘的东西(就是 java.util 工具包.包.分类) 业务:普通的线程代码 Thread Runna ...
- JUC并发编程小总结
JUC是Java编发编程中使用的工具类,全称为java.util.concurrent.近期在大厂面试中屡屡被问到关于JUC的相关知识点问题,其重要性不言而喻,学好用好JUC可以说是每一个Java程序 ...
最新文章
- 20181213_任务(3D奖品设计+30天单词练习)
- JAVA程序设计----java面向对象基础(下)
- createdroptargets_使用DUILIB建立项目
- 互联网医生-ICMP协议
- 山东自考c语言程序设计停考了吗,山东省自考教育类停考专业遗留问题的通知...
- 发布 学习进度条 博客要求
- Java IO学习7:打印流
- 2019杭州云栖大会探营:神龙的秘密
- Android中注册一个 BroadcastReceiver的代码
- java图书馆借书问题_图书馆借书系统-Java异常的学习和处理
- st8s003 c语言编译器,stm8s003f3p6
- 第十八届全国大学生智能车竞赛竞速比赛规则(讨论版)
- 10套精美而实用的CSS3按钮
- linux中磁盘清理方法(简单好用)
- 关于模拟器拉取文件的多种方式
- 转速/线速度/角速度计算FC
- [MySQL]-压力测试之Sysbench
- ESP32C3 驱动DS18B20成功
- Carla设置同步模式
- c语言 任意自然数的分解加法,第三章 1. 代数系,自然数,整数,有理数,实数,复数...