【Java8新特性--->异步处理】CompletableFuture
一、引入
假设一个商品详情页需要以下操作:
- 查询展示商品的基本信息耗时:0.5s
- 查询展示商品的销售信息耗时:0.7s
- 查询展示商品的图片信息耗时:1s
- 查询展示商品销售属性耗时:0.3s
- 查询展示商品规格属性耗时:1.5s
- 查询展示商品详情信息耗时:1s
即使每个查询时间耗时不多,但是加起来却需要很长耗时。为了减少线性执行造成耗时的累积,这就需要引入异步处理做优化。
二、Future介绍
Future是Java 5添加的类,用来描述一个异步计算的结果。
优点:
- 可以使用
isDone
方法检查计算是否完成。 - 使用
get
阻塞住调用线程,直到计算完成返回结果。 - 可以使用
cancel
方法停止任务的执行。
缺点:
- 对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
- 阻塞的方式与我们想及时得到计算结果的期望相违背。
- 轮询的方式会消耗大量CPU资源,并且不能及时得到计算结果。
功能:
- boolean cancel(boolean mayInterruptIfRunning);
1、尝试取消执行此任务。如果任务已经完成、已被取消或由于其他原因无法取消,则此尝试将失败。如果成功,并且调用
cancel
时此任务尚未启动,则此任务永远不会运行。
2、 如果任务已经开始,则mayInterruptIfRunning
参数确定是否应中断执行此任务的线程以尝试停止任务。 参数mayInterruptIfRunning
为true
,表示执行此任务的线程应该被中断;否则,允许进行中的任务完成。
3、此方法返回后,后续调用isDone
将始终返回true
。
4、如果此方法返回true
,则对isCancelled
的后续调用将始终返回true
。
- boolean isCancelled();
如果此任务在正常完成之前被取消,则返回
true
,否则返回false
。
- boolean isDone();
如果此任务完成,则返回
true
。任务完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true
。
- V get() throws InterruptedException, ExecutionException;
阻塞直至任务完成,然后检索其结果。
throws CancellationException:如果计算被取消
throws ExecutionException:如果计算抛出异常
throws InterruptedException:如果当前线程在等待时被中断
- V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
如有必要,最多等待计算完成的给定时间,然后检索其结果(如果可用)。
参数timeout
:等待的最长时间
参数unit
:超时参数的时间单位
throws CancellationException:如果计算被取消
throws ExecutionException:如果计算抛出异常
throws InterruptedException:如果当前线程在等待时被中断
throws TimeoutException:如果等待超时
三、CompletableFuture
1、介绍
在Java 8中, 新增加了一个包含50个方法左右的类:CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过 get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。
2、创建异步对象(runAsync、supplyAsync)
CompletableFuture 提供了四个静态方法来创建一个异步操作。
(带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
方法分为两类:
- runAsync 没有返回结果
- supplyAsync 有返回结果
测试代码:
public class CompletableFutureDemo {// corePoolSize:线程池的核心线程数量 线程池创建出来后就会 new Thread() 5个// maximumPoolSize:最大的线程数量,线程池支持的最大的线程数// keepAliveTime:存活时间,当线程数大于核心线程,空闲的线程的存活时间 50-5=45// unit:存活时间的单位// BlockingQueue<Runnable> workQueue:阻塞队列 当线程数超过了核心线程数据,那么新的请求到来的时候会加入到阻塞的队列中// new LinkedBlockingQueue<>() 默认队列的长度是 Integer.MAX 那这个就太大了,所以我们需要指定队列的长度// threadFactory:创建线程的工厂对象// RejectedExecutionHandler handler:当线程数大于最大线程数的时候会执行的淘汰策略private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {System.out.println("线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 50;System.out.println("线程结束了...");}, executor);System.out.println("main方法结束了…………");System.out.println("-----------------------------");CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 50;System.out.println("线程结束了...");return i;}, executor);System.out.println("integerCompletableFuture=" + integerCompletableFuture.get());}
}
测试结果:
3、whenCompleteAsync、exceptionally和handleAsync
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) ;
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) ;
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) ;
3.1、whenCompleteAsync
可以获取异步任务的返回值和抛出的异常信息,但是不能修改返回结果。
测试代码:
public class CompletableFutureDemo1 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 2;System.out.println("线程结束了...");return i;}, executor).whenCompleteAsync((res, e) -> { // 不能修改返回值System.out.println("res= " + res);System.out.println("e=" + e);}, executor);System.out.println("main方法结束了…………");System.out.println("future=" + future.get());}
}
测试结果:
3.2、exceptionally
当异步任务跑出了异常后会触发的方法,如果没有抛出异常该方法不会执行
测试代码:
public class CompletableFutureDemo1 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 0;System.out.println("线程结束了...");return i;}, executor).whenCompleteAsync((res, e) -> { // 不能修改返回值System.out.println("res= " + res);System.out.println("e=" + e);}, executor).exceptionally(e -> {System.out.println("exceptionally执行了,e = " + e);return 100;});System.out.println("main方法结束了…………");System.out.println("future=" + future.get());}
}
测试结果: 主动触发算术异常
测试结果: 将int i = 100 / 0;
改为int i = 100 / 5;
3.2.1、拓展—>利用exceptionally达到显式地捕获相关异常的效果
示例:
@Test
public void test() {CompletableFuture<Integer> future = createNewFile();try {Integer flag = future.get();if (flag == 0) {System.out.println("创建成功!!!");} else if (flag == 1) {System.out.println("捕获到NullPointerException,创建失败!!!");} else if (flag == 2) {System.out.println("捕获到IOException,创建失败!!!");} else {System.out.println("捕获到其他异常,创建失败!!!");}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}
}private CompletableFuture<Integer> createNewFile() {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {File file = null;try {file.createNewFile();} catch (IOException e) {// 对于编译时异常,在CompletableFuture中不能直接向外抛出,throw new RuntimeException(e.getMessage());} catch (NullPointerException e) {throw e;}return 0;}).exceptionally(th -> {// 走到这里,说明创建失败if (th.getCause() instanceof NullPointerException) {// 显式地修改返回值return 1;} else if (th.getCause() instanceof RuntimeException) {return 2;}return 3;});return future;
}
效果:
3.2.2、拓展—>completeExceptionally
如果完成动作过程中抛出异常,将导致对get()和相关方法的调用引发给定的异常。
简而言之,future.completeExceptionally(e)是在完成过程中主动设置异常信息。(如果future未完成,抛出主动设置的异常信息。反之,则返回完成后的结果。)
案例:
@Testpublic void test03() throws InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("future start……");try {Thread.sleep(2 * 1000);} catch (InterruptedException e) {e.printStackTrace();}int i = 100 / 20;System.out.println("future end……");return i;});Thread.sleep(3 * 1000);future.completeExceptionally(new RuntimeException("未完成时主动抛出异常!!!"));try {Integer res = future.get();System.out.println("res = " + res);} catch (ExecutionException e) {// 可捕获主动抛出的异常System.out.println("捕获主动抛出的异常ExecutionException");e.printStackTrace();}}
效果 (future已完成计算):
如果将主线程Thread.sleep(3 * 1000);改为Thread.sleep(1 * 1000);
结果如下 (future未完成计算):
3.2.3、拓展—>obtrudeException
强制导致方法get()和相关方法的后续调用抛出给定的异常,无论是否已完成。
3.3、handleAsync
可以获取异步任务的返回值和抛出的异常信息,而且可以显示地修改返回的结果
测试代码:
public class CompletableFutureDemo1 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;System.out.println("线程结束了...");return i;}, executor).handle((res, e) -> {System.out.println("res = " + res);System.out.println("e = " + e);return 200;});System.out.println("main方法结束了…………");System.out.println("future=" + future.get());}
}
测试效果:
4、线程串行方法
- thenRunAsync 方法:只要之前的执行完成就执行 thenRun的后续操作。(无接受参数,无返回)
- thenAcceptAsync 方法:消费者模式,接受上一个任务处理的结果,并消费处理,无返回结果
- thenApplyAsync 方法:当一个线程依赖另一个线程,获取上一个任务的返回结果,并返回当前任务的结果。
(带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。)
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
4.1、thenRunAsync 实现代码
public class CompletableFutureDemo2 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture.runAsync(() -> {System.out.println("线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;System.out.println("线程结束了...");}, executor).thenRunAsync(()->{System.out.println("thenRunAsync我进行操作了……………………");});}}
4.2、thenRunAsync 实现效果
4.3、thenAcceptAsync 实现代码
public class CompletableFutureDemo2 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;System.out.println("线程结束了...");return i;}, executor).thenAcceptAsync(res -> {System.out.println("thenAcceptAsync-------->" + res);});}}
4.4、thenAcceptAsync 实现效果
4.5、thenApplyAsync 实现代码
public class CompletableFutureDemo2 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;System.out.println("线程结束了...");return i;}, executor).thenApplyAsync(res -> {System.out.println("thenAcceptAsync-------->" + res);return 200;});System.out.println("future--------->" + future.get());}}
4.6、thenApplyAsync 实现效果
5、等待两个任务执行完成后才会触发
- runAfterBothAsync 方法:不可以获取前面两线程的返回结果,本身也没有返回结果。
- thenAcceptBothAsync 方法:可以获取前面两线程的返回结果,本身没有返回结果。
- thenCombineAsync 方法:可以获取前面两线程的返回结果,本身也有返回结果
(带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。)
5.1、runAfterBothAsync 实现代码
public class CompletableFutureDemo3 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("future1线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;System.out.println("future1线程结束了...");return i;}, executor);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 20;
// try {// Thread.sleep(3000);
// } catch (InterruptedException e) {// e.printStackTrace();
// }System.out.println("future2线程结束了...");return i;}, executor);future1.runAfterBothAsync(future2,()->{System.out.println("任务3执行了");},executor);}}
5.2、runAfterBothAsync 实现效果
如果放开Thread.sleep(3000);
,那么 runAfterBothAsync
中的代码会等3s后(也就是等待 future1和 future2都执行完)才执行。
5.3、thenAcceptBothAsync 实现代码
public class CompletableFutureDemo3 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("future1线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;System.out.println("future1线程结束了...");return i;}, executor);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 20;
// try {// Thread.sleep(3000);
// } catch (InterruptedException e) {// e.printStackTrace();
// }System.out.println("future2线程结束了...");return i;}, executor);future1.thenAcceptBothAsync(future2, (res1, res2) -> {System.out.println("thenAcceptBothAsync开始了");System.out.println("res1 = " + res1);System.out.println("res2 = " + res2);System.out.println("thenAcceptBothAsync结束了");},executor);}}
5.4、thenAcceptBothAsync 实现效果
5.5、thenCombineAsync 实现代码
public class CompletableFutureDemo3 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("future1线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;System.out.println("future1线程结束了...");return i;}, executor);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 20;
// try {// Thread.sleep(3000);
// } catch (InterruptedException e) {// e.printStackTrace();
// }System.out.println("future2线程结束了...");return i;}, executor);CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (res1, res2) -> {System.out.println("future3开始了");return res1 + res2;}, executor);System.out.println("future3.get()=" + future3.get());}}
5.6、thenCombineAsync 实现效果
6、两个任务完成一个就会触发
两个任务只要有一个完成就会触发。
(对于acceptEitherAsync
、applyToEitherAsync
可接受前面两任务返回结果来说,如果任务有返回值,哪个任务先执行完先获取其结果作为参数)
- runAfterEitherAsync 方法:不可以获取前面两线程的返回结果,本身也没有返回结果。
- acceptEitherAsync 方法:可以获取前面两线程的返回结果,本身也没有返回结果。
- applyToEitherAsync 方法:可以获取前面两线程的返回结果,本身有返回结果。
(带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。)
6.1、runAfterEitherAsync 实现代码
public class CompletableFutureDemo4 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("future1线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;
// try {// Thread.sleep(3000);
// } catch (InterruptedException e) {// e.printStackTrace();
// }System.out.println("future1线程结束了...");return i;}, executor);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 20;
// try {// Thread.sleep(3000);
// } catch (InterruptedException e) {// e.printStackTrace();
// }System.out.println("future2线程结束了...");return i;}, executor);future1.runAfterEitherAsync(future2,()->{System.out.println("runAfterEitherAsync任务执行了");},executor);}}
6.2、runAfterEitherAsync 实现效果
6.3、acceptEitherAsync 实现代码
public class CompletableFutureDemo4 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("future1线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1线程结束了...");return i;}, executor);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 20;
// try {// Thread.sleep(3000);
// } catch (InterruptedException e) {// e.printStackTrace();
// }System.out.println("future2线程结束了...");return i;}, executor);future1.acceptEitherAsync(future2, (res) -> {System.out.println("acceptEitherAsync开始了");System.out.println("res = " + res);},executor);}}
6.4、acceptEitherAsync 实现效果
6.5、applyToEitherAsync 实现代码
public class CompletableFutureDemo4 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("future1线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1线程结束了...");return i;}, executor);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 20;
// try {// Thread.sleep(3000);
// } catch (InterruptedException e) {// e.printStackTrace();
// }System.out.println("future2线程结束了...");return i;}, executor);CompletableFuture<Integer> future3 = future1.applyToEitherAsync(future2, (res) -> {System.out.println("future3开始了");System.out.println("res = " + res);return res;}, executor);System.out.println("future3.get()=" + future3.get());}}
6.6、applyToEitherAsync 实现效果
7、多任务组合
- anyOf 方法:只要有一个任务完成。
- allOf 方法:等待所有任务完成。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
7.1、anyOf 实现代码
只要有一个任务完成就会触发。
public class CompletableFutureDemo5 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());/*** anyOf:只要有一个线程完成,那么就不阻塞* allOf:所有线程都完成,在 get方法阻塞直至所有线程都完成*/public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("future1线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1线程结束了...");return i;}, executor);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 20;System.out.println("future2线程结束了...");return i;}, executor);CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);anyOf.get();System.out.println("主任务完成anyOf:" + anyOf.get());}}
7.2、anyOf 实现效果
7.3、allOf 实现代码
阻塞等待所有任务完成才会触发。
public class CompletableFutureDemo5 {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,50,10,TimeUnit.SECONDS,new LinkedBlockingDeque(1000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());/*** anyOf:只要有一个线程完成,那么就不阻塞* allOf:所有线程都完成,在 get方法阻塞直至所有线程都完成*/public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main方法开始了…………");CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("future1线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 5;try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1线程结束了...");return i;}, executor);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2线程开始了...");System.out.println("当前线程---->" + Thread.currentThread().getName());int i = 100 / 20;System.out.println("future2线程结束了...");return i;}, executor);CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);allOf.get(); // 阻塞在这个位置,等待所有线程的完成System.out.println("主任务完成allOf:" + future1.get() + "," + future2.get());}}
7.4、allOf 实现效果
8、细节
8.1、thenApply与thenCompose区别
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);
}
- thenApply:返回的是泛型中的类型转化为返回值类型的CompletableFuture对象。
- thenCompose:返回的是一个扁平化的CompletableFuture对象。
(用来连接两个CompletableFuture,是生成一个新的CompletableFuture。特别像stream().flatMap扁平化处理)
区别:当返回是CompletableFuture的话,thenApply是嵌套,而thenCompose扁平化。
public class CompletableFutureTest {public static void main(String[] args) {CompletableFuture<Integer> apply = CompletableFuture.supplyAsync(() -> {int i;try {i = 100 / 2;} catch (Exception e) {throw new RuntimeException("算术异常!!!");}return i;});// thenApply返回的如果是CompletableFuture,就会嵌套起来CompletableFuture<? extends CompletableFuture<?>> apply1 = apply.thenApply(res -> {System.out.println("res1 = " + res);System.out.println("apply1当前还在执行中……");return CompletableFuture.completedFuture(res);});CompletableFuture<? extends CompletableFuture<?>> apply2 = apply.thenApply(res -> {System.out.println("res2 = " + res);System.out.println("apply2当前还在执行中……");return CompletableFuture.completedFuture(res);});// thenCompose不同,返回的扁平化之后的一维CompletableFutureCompletableFuture<?> compose = apply.thenCompose(res -> {System.out.println("res3 = " + res);System.out.println("compose当前还在执行中……");return CompletableFuture.completedFuture(res);});System.out.println(apply1 == apply2);try {apply.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
测试效果:
【Java8新特性--->异步处理】CompletableFuture相关推荐
- Java8新的异步编程方式 CompletableFuture(三)
前面两篇文章已经整理了CompletableFuture大部分的特性,本文会整理完CompletableFuture余下的特性,以及将它跟RxJava进行比较. 3.6 Either Either 表 ...
- Java8新特性总结 -7.新API和工具
所有示例代码打包下载 : 点击打开链接 Java8新特性 : 接口新增默认方法和静态方法 Optional类 Lambda表达式 方法引用 Stream API - 函数式操作流元素集合 Date/T ...
- Java系列 - 用Java8新特性进行Java开发太爽了
本人博客文章网址:https://www.peretang.com/using-java8s-new-features-to-coding-is-awesome/ 前言 从开始写博客到现在已经过去3个 ...
- 【JAVA拾遗】Java8新特性合辑
[JAVA拾遗]Java8新特性合辑 文章目录 [JAVA拾遗]Java8新特性合辑 0. 逼逼 [--/--]126 Lambda Expressions & Virtual Extensi ...
- 【Java8新特性】关于Java8的Stream API,看这一篇就够了!!
写在前面 Java8中有两大最为重要的改变.第一个是 Lambda 表达式:另外一个则是 Stream API(java.util.stream.*) ,那什么是Stream API呢?Java8中 ...
- 【Java8新特性】浅谈方法引用和构造器引用
写在前面 Java8中一个很牛逼的新特性就是方法引用和构造器引用,为什么说它很牛逼呢?往下看! 方法引用 当要传递给Lambda体的操作,已经有实现的方法了,可以使用方法引用!这里需要注意的是:实现抽 ...
- java8新特性_Java8新特性之Date API|乐字节
大家好,我是乐字节的小乐,上篇文章讲述了<Java8新特性之Optional>,接下来,小乐将接着讲述Java8新特性之Date API 2019日历 Java8之Date API Jav ...
- Java8 新特性之流式数据处理(转)
转自:https://www.cnblogs.com/shenlanzhizun/p/6027042.html 一. 流式处理简介 在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作 ...
- java8新特性_乐字节-Java8新特性-接口默认方法
总概 JAVA8 已经发布很久,而且毫无疑问,java8是自java5(2004年发布)之后的最重要的版本.其中包括语言.编译器.库.工具和JVM等诸多方面的新特性. Java8 新特性列表如下: 接 ...
- java8新特性: lambda表达式:直接获得某个list/array/对象里面的字段集合
java8新特性: lambda表达式:直接获得某个list/array/对象里面的字段集合 比如,我有一张表: entity Category.java service CategoryServic ...
最新文章
- 实现无锁的栈与队列(1)
- AttributeError: module 'tensorflow_core.estimator' has no attribute 'inputs'
- c语言fread读取错误,【求助】C语言fread读取二进制文件时,读取结果全都是零
- 虚拟机 Linux ubuntu16.04 对 windows 设置共享文件夹教程(samba)(ubuntu16.04换源【更换下载源】【更换源】【更新源】)(如果连不上需要开SSH服务)
- python编程对电脑要求_什么是集成开发环境和代码编辑器?优秀Python编程环境的要求(4)...
- qt能使用logback_X04Logback的配置
- 动手学无人驾驶(1):交通标志识别
- Linux目录架构详解
- c语言交通违章编程代码,C语言程序设计之交通处罚单管理系统 报告(内含代码).doc...
- 跟随美国博导12年,我学到最深刻的不是科研,而是这个。。。
- C|C++中的静态全局变量,静态局部变量,全局变量,局部变量的区别
- C++编程技术常见问题有哪些?
- 关于【CSDN免积分下载器】的分析
- 1959年第一台电子计算机,1、 一般认为,世界上第一台电子数字计算机诞生于______。 A.1946年 B.1952年 C.1959年 D.1962年...
- QQ登录界面实现(JAVA)
- Python实现Singleton模式的几种方式
- 计算机鼠标双击怎么,鼠标双击变成属性怎么办 鼠标双击变成属性解决办法【详解】...
- web扫描器之Nessus
- Word文档怎样添加图片?技巧分享!怎么在Word文档中加入图片?
- 微信小程序标题栏放logo 搜索框