CompletableFuture基本介绍

  • 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture

核心的四个静态方法(分为两组)

利用核心的四个静态方法创建一个异步操作 | 不建议用new

关键就是 |有没有返回值|是否用了线程池|

参数说明:

没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。

如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。

1.runAsync无返回值

public class CompletableFutureBuildDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());//停顿几秒线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(voidCompletableFuture.get());}
}
//ForkJoinPool.commonPool-worker-9 //默认的线程池
//null --- 没有返回值

2.runAsync+线程池

public class CompletableFutureBuildDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());//停顿几秒线程try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}},executorService);System.out.println(voidCompletableFuture.get());executorService.shutdown(); }
}
//pool-1-thread-1   ----指定的线程池
//null ----没有返回值

supplyAsync有返回值

3 supplyAsync

public class CompletableFutureBuildDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "helllo supplyasync";});System.out.println(objectCompletableFuture.get());}
}
//ForkJoinPool.commonPool-worker-9---------默认的线程池
//helllo supplyasync-------------supplyasync有返回值了

4 supplyAsync+线程池

public class CompletableFutureBuildDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "helllo supplyasync";},executorService);System.out.println(objectCompletableFuture.get());executorService.shutdown();}
}
//pool-1-thread-1
//helllo supplyasync-------------supplyasync有返回值了

减少阻塞和轮询whenComplete

 CompletableFuture通过whenComplete来减少阻塞和轮询(自动回调)

public class CompletableFutureUseDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName()+"--------副线程come in");int result = ThreadLocalRandom.current().nextInt(10);//产生随机数try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return result;}).whenComplete((v,e) -> {//没有异常,v是值,e是异常if(e == null){System.out.println("------------------计算完成,更新系统updataValue"+v);}}).exceptionally(e->{//有异常的情况e.printStackTrace();System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());return null;});//线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程//ForkJoinPool 类似于守护线程mian线程结束的太快,CompletableFuture还没执行完也会结束System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}}
}
//ForkJoinPool.commonPool-worker-9--------副线程come in(这里用的是默认的ForkJoinPool)
//main线程先去忙其他任务
//------------------计算完成,更新系统updataValue3

假如换用自定义线程池/异常情况的展示,设置一个异常 int i = 10 / 0 ;

public class CompletableFutureUseDemo
{public static void main(String[] args) throws ExecutionException, InterruptedException{ExecutorService threadPool = Executors.newFixedThreadPool(3);try{CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "----come in");int result = ThreadLocalRandom.current().nextInt(10);try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("-----1秒钟后出结果:" + result);if(result > 2){int i=10/0;}return result;},threadPool).whenComplete((v,e) -> {if (e == null) {System.out.println("-----计算完成,更新系统UpdateValue:"+v);}}).exceptionally(e -> {e.printStackTrace();System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());return null;});System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");}catch (Exception e){e.printStackTrace();}finally {threadPool.shutdown();}//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程//try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }}}

join和get对比

  • 功能几乎一样,区别在于编码时是否需要抛出异常

    • get()方法需要抛出异常

    • join()方法不需要抛出异常

public class Chain {public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {return "hello 12345";});System.out.println(completableFuture.get());}}public class Chain {public static void main(String[] args)  {//不需要抛出异常CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {return "hello 12345";});System.out.println(completableFuture.join());}
}

CompletableFuture常用API

getNow调用的时候如果计算完了,就拿取这个计算完的值;否则就拿备胎值
1.获得结果和触发计算
     获取结果

public T get() 不见不散,容易阻塞

 public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常

   public T join() 类似于get(),区别在于是否需要抛出异常

 public T getNow(T valueIfAbsent) 没有计算完成的情况下,给一个替代结果

立即获取结果不阻塞

计算完,返回计算完成后的结果

没算完,返回设定的valueAbsent(直接返回了备胎值xxx)

主动触发计算

public boolean complete(T value) 是否立即打断get()方法返回括号值

(执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)

public class CompletableFutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);//执行需要2秒} catch (InterruptedException e) {e.printStackTrace();}return "abc";});try {TimeUnit.SECONDS.sleep(1);//等待需要1秒} catch (InterruptedException e) {e.printStackTrace();}// System.out.println(uCompletableFuture.getNow("xxx"));//执2-等1 返回xxx//执2-等1 返回true+备胎值1111111
//  反之 则是 false 输出 abc  System.out.println(uCompletableFuture.complete("1111111")+"\t"+uCompletableFuture.get());}
}

对计算结果进行处理

thenApply 

计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。

public class CompletableFutureDemo2
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{ExecutorService threadPool = Executors.newFixedThreadPool(3);//当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("111");return 1024;},threadPool ).thenApply(f -> {System.out.println("222");return f + 1;}).thenApply(f -> {//int age = 10/0; // 异常情况:那步出错就停在那步。System.out.println("333");return f + 1;}).whenCompleteAsync((v,e) -> {System.out.println("*****v: "+v);}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("-----主线程结束,END");threadPool .shutdown();}
}
//-----正常情况
//111
//222
//333
//----计算结果: 6//-----异常情况
//111
//异常.....

handle 

 类似于thenApply,但是有异常的话仍然可以往下走一步。

public class CompletableFutureDemo2
{public static void main(String[] args) throws ExecutionException, InterruptedException{//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("111");return 1024;}).handle((f,e) -> {int age = 10/0;//异常语句System.out.println("222");return f + 1;}).handle((f,e) -> {System.out.println("333");return f + 1;}).whenCompleteAsync((v,e) -> {System.out.println("*****v: "+v);}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("-----主线程结束,END");// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }}
}
//-----异常情况
//111
//333
//异常,可以看到多走了一步333

对计算结果进行消费

接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口

thenAccept

不需要return 

public static void main(String[] args) throws ExecutionException, InterruptedException
{CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f + 2;}).thenApply(f -> {return f + 3;}).thenAccept(r -> System.out.println(r));
}
//6
//消费一下,直接得到6

补充:Code之任务之间的顺序执行

1.thenRun

thenRun(Runnable runnable)

任务A执行完执行B,并且B不需要A的结果

2.thenAccept

thenAccept(Consumer action)

任务A执行完执行B,B需要A的结果,但是任务B无返回值

3.thenApply

thenApply(Function fn)

任务A执行完执行B,B需要A的结果,同时任务B有返回值


System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
//null System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
//resultA打印出来的 null因为没有返回值System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
//resultAresultB 返回值

CompleteFuture和线程池说明(非常重要)

上面的几个方法都有普通版本和后面加Async的版本

以 thenRun 和 thenRunAsync 为例,有什么区别?

先看结论

1. 没有传入自定义线程池,都用默认线程池ForkJoinPool

2. 传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池

1.调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池

2.调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

3.也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)

2-1


public class CompletableFutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";},threadPool).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});}
}
//1号任务  pool-1-thread-1
//2号任务  pool-1-thread-1
//3号任务  pool-1-thread-1
//4号任务  pool-1-thread-1

2-2


public class CompletableFutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";},threadPool).thenRunAsync(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});}
}
//1号任务  pool-1-thread-1
//2号任务  ForkJoinPool.commonPool-worker-9---这里另起炉灶重新调用了默认的ForkJoinPool
//3号任务  ForkJoinPool.commonPool-worker-9
//4号任务  ForkJoinPool.commonPool-worker-9

3

public class CompletableFutureAPIDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(5);CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
//            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";},threadPool).thenRun(()->{// try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{//  try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{//try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});}
}
//1号任务  1号任务  pool-1-thread-1
//2号任务  main
//3号任务  main
//4号任务  main

对计算速度进行选用

  • applyToEither方法,那个快用哪个
public class CompletableFutureDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException{CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }return "play1 ";});CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }return "play2";});CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> {//对计算速度进行选用return f + " is winner";});System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());}
}
//ForkJoinPool.commonPool-worker-9  ---come in
//ForkJoinPool.commonPool-worker-2  ---come in
//main  play2 is winner

对计算结果进行合并

  • thenCombine 合并

    • 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理

    • 先完成的先等着,等待其它分支任务

public class CompletableFutureDemo2
{public static void main(String[] args) throws ExecutionException, InterruptedException{CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return 10;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return 20;});CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");return x + y;});System.out.println(thenCombineResult.get());}
}
//30
  • 合并版本
public class CompletableFutureDemo2
{public static void main(String[] args) throws ExecutionException, InterruptedException{CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");return 10;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");return 20;}), (x,y) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");return x + y;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");return 30;}),(a,b) -> {System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");return a + b;});System.out.println("-----主线程结束,END");System.out.println(thenCombineResult.get());}
}

JUC并发编程-CompletableFuture相关推荐

  1. 【尚硅谷】大厂必备技术之JUC并发编程——笔记总结

    [JUC并发编程01]JUC概述 关键字:进程和线程.进程和线程.wait和sleep.并发与并行.管程.用户线程和守护线程 [JUC并发编程02]Lock接口 关键字:synchronized.Lo ...

  2. ❤️《JUC并发编程从入门到高级》(建议收藏)❤️

    JUC并发编程 1.什么是JUC JUC的意思就是java并发编程工具包,与JUC相关的有三个包:java.util.concurrent.java.util.concurrent.atomic.ja ...

  3. 多线程进阶=》JUC并发编程02

    在JUC并发编程01中说到了,什么是JUC.线程和进程.Lock锁.生产者和消费者问题.8锁现象.集合类不安全.Callable(简单).常用辅助类.读写锁 https://blog.csdn.net ...

  4. 基于《狂神说Java》JUC并发编程--学习笔记

    前言: 本笔记仅做学习与复习使用,不存在刻意抄袭. -------------------------------------------------------------------------- ...

  5. Java JUC并发编程详解

    Java JUC并发编程详解 1. JUC概述 1.1 JUC简介 1.2 进程与线程 1.2 并发与并行 1.3 用户线程和守护线程 2. Lock接口 2.1 Synchronized 2.2 什 ...

  6. 多线程进阶=》JUC并发编程

    多线程进阶=>JUC并发编程 1.什么是JUC ​ JUC是java.util.concurrent的简写. ​ 用中文概括一下,JUC的意思就是java并发编程工具包. ​ 并发编程的本质就是 ...

  7. 周阳老师JUC并发编程

    1. 序章 1)JUC是什么? java.util.concurrent在并发编程中使用的工具包 对JUC知识的高阶内容讲解和实战增强 2)为什么学习并用好多线程极其重要? 硬件方面: 摩尔定律: 它 ...

  8. JUC并发编程(java util concurrent)(哔站 狂神说java juc并发编程 摘录笔记)

    JUC并发编程(java util concurrent) 1.什么是JUC JUC并不是一个很神秘的东西(就是 java.util 工具包.包.分类) 业务:普通的线程代码 Thread Runna ...

  9. JUC并发编程小总结

    JUC是Java编发编程中使用的工具类,全称为java.util.concurrent.近期在大厂面试中屡屡被问到关于JUC的相关知识点问题,其重要性不言而喻,学好用好JUC可以说是每一个Java程序 ...

最新文章

  1. 20181213_任务(3D奖品设计+30天单词练习)
  2. JAVA程序设计----java面向对象基础(下)
  3. createdroptargets_使用DUILIB建立项目
  4. 互联网医生-ICMP协议
  5. 山东自考c语言程序设计停考了吗,山东省自考教育类停考专业遗留问题的通知...
  6. 发布 学习进度条 博客要求
  7. Java IO学习7:打印流
  8. 2019杭州云栖大会探营:神龙的秘密
  9. Android中注册一个 BroadcastReceiver的代码
  10. java图书馆借书问题_图书馆借书系统-Java异常的学习和处理
  11. st8s003 c语言编译器,stm8s003f3p6
  12. 第十八届全国大学生智能车竞赛竞速比赛规则(讨论版)
  13. 10套精美而实用的CSS3按钮
  14. linux中磁盘清理方法(简单好用)
  15. 关于模拟器拉取文件的多种方式
  16. 转速/线速度/角速度计算FC
  17. [MySQL]-压力测试之Sysbench
  18. ESP32C3 驱动DS18B20成功
  19. Carla设置同步模式
  20. c语言 任意自然数的分解加法,第三章 1. 代数系,自然数,整数,有理数,实数,复数...

热门文章

  1. Python123 练习6
  2. WPF CheckBox自定义样式
  3. Eclipse如何导入Maven项目
  4. 主动式电容笔好用吗?主动式和被动式电容笔的区别
  5. 大疆Pocket手持记录仪恢复方法
  6. 图形编程中,旋转的三种表示方法
  7. 提高工作效率可以缓解压力吗?
  8. GPS数据格式的分析
  9. Java多线程的同步优化的6种方案
  10. 10个让程序员瞬间炸毛的奇葩需求