九、CompletableFuture异步编排
一、认识Future任务机制和FutureTask
1、Future概述
1、在JDK1.5之后提供了一个多线程的新的处理接口
Callable
,该接口需要与Future
接口整合在一起,而后再进行最终的异步操作,提升了多线程的处理性能。2、Future类位于java.util.concurrent包下,它是JDK1.5新加的一个接口,定义了操作
异步任务执行的一些方法
,比如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。3、如果主线程需要执行一个很耗时的任务,可以通过Future把这个任务放到异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future的get方法获取任务结果。
public interface Future<V> {/*** 用来取消任务,如果取消任务成功则返回true,否则返回false* mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,如果设置为true,则表示可以* 取消正在执行中的任务。如果任务已完成,无论设置成true还是false,此方法都返回false,即如果取消已经完成* 的任务会返回false;如果任务正在执行,参数设置为true就返回true,设置成false就返回false;如果任务还没* 有执行,无论设置成true还是false,肯定返回true。*/boolean cancel(boolean mayInterruptIfRunning);//表示任务是否被取消成功,如果在任务正常完成前被取消成功,返回trueboolean isCancelled();//表示任务是否已经完成,若任务完成,返回trueboolean isDone();//用来获取任务执行结果,该方法会产生阻塞,会一直等到任务执行完毕才返回V get() throws InterruptedException, ExecutionException;//用来获取任务执行结果,如果在执行时间内,还没有获取到结果,则直接返回nullV get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
2、FutureTask概述
1、FutureTask类实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable、Future接口,所以FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
FutureTask的构造方法:
/*** 创建一个FutureTask ,它将在运行时执行给定的Callable*/
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable
}/*** 创建一个FutureTask ,将在运行时执行给定的Runnable ,并安排get将在成功完成后返回给定的结果*/
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable
}
3、FutureTask使用
1、单任务使用步骤:
创建
Callable
接口的实现类,并实现call()
方法,该call()
方法将作为线程执行体,并且有返回值。创建
Callable
实现类的实例,使用FutureTask
类来包装Callable
对象,该FutureTask
对象封装了该Callable
对象的call()
方法的返回值。使用
FutureTask
对象作为Thread
对象的target
创建并启动新线程。调用
FutureTask
对象的get()
方法来获得子线程执行结束后的返回值。2、结合线程池,把FutureTask交给线程池执行,这样能够提高程序的执行效率。
/*** @Date: 2022/7/7* 1、创建Callable接口的实现类,并实现call()方法,该call()方法将作为线程执行体,并且有返回值。* 2、创建Callable实现类的实例,使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值。* 3、使用FutureTask对象作为Thread对象的target创建并启动新线程。* 4、调用FutureTask对象的get()方法来获得子线程执行结束后的返回值。*/
@Slf4j
public class FutureTest1 {public static void main(String[] args) throws ExecutionException, InterruptedException {log.info(Thread.currentThread().getName() + " 开始执行...");//实例化CallableCallTest callTest = new CallTest();//使用FutureTask类来包装Callable对象FutureTask<String> task = new FutureTask<>(callTest);//使用FutureTask对象作为Thread对象的target创建新线程Thread thread = new Thread(task, "异步线程1");//启动线程thread.start();//调用FutureTask对象的get()方法来获得子线程执行结束后的返回值log.info("task返回结果 " + task.get());log.info(Thread.currentThread().getName() + " 结束运行...");}/*** 创建Callable接口的实现类,并实现call()方法*/static class CallTest implements Callable<String> {@Overridepublic String call() throws Exception {Thread.sleep(5000);return Thread.currentThread().getName() + " 执行call方法,执行耗时5s";}}
}
/*** 23:55:33.618 [main] INFO com.itan.future.FutureTest1 - main 开始执行...* 23:55:38.679 [main] INFO com.itan.future.FutureTest1 - task返回结果 异步线程1 执行call方法,执行耗时5s* 23:55:38.679 [main] INFO com.itan.future.FutureTest1 - main 结束运行...*/
/*** @Author: ye.yanbin* @Date: 2022/7/7* 结合线程池运行*/
@Slf4j
public class FutureThreadPoolTest {public static void main(String[] args) throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();//corePoolSize设置为1,maximumPoolSize设置为2,队列容量设置为3,拒绝策略为AbortPolicy(直接抛出异常)ExecutorService pool = Executors.newFixedThreadPool(3);FutureTask<String> futureTask1 = new FutureTask<>(() -> {Thread.sleep(300);return "任务1结束";});pool.submit(futureTask1);FutureTask<String> futureTask2 = new FutureTask<>(() -> {Thread.sleep(500);return "任务2结束";});pool.submit(futureTask2);System.out.println(futureTask1.get());System.out.println(futureTask2.get());Thread.sleep(300);long end = System.currentTimeMillis();System.out.println("总耗时:" + (end - start) + " 毫秒");pool.shutdown();}
}
/*** 运行结果:* 任务1结束* 任务2结束* 总耗时:989 毫秒*/
4、Future缺点
1、
get()
方法在没有获取到结果的情况下会一直等待任务结果,容易导致程序阻塞。2、
isDone()
方法判断任务是否完成,通常情况下是使用循环(轮询方式)判断任务是否完成,如果任务一直处理,就会一直循环判断,非常消耗CPU资源。3、总结:Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
二、CompletableFuture
1、出现背景
1、JDK1.5提供的Future可以实现异步计算操作,虽然Future的相关方法提供了异步任务的执行能力,但是对于线程执行结果的获取只能够采用阻塞或轮询的方式进行处理,阻塞的方法与多线程异步处理的初衷产生了分歧,轮询的方式又会造成CPU资源的浪费,同时也无法及时得到结果。
2、为了解决这些设计问题,从JDK1.8开始提供了
Future的扩展实现类CompletableFuture,可以简化异步编程的复杂性,同时又可以结合函数式编程模式,利用回调的方式进行异步处理计算操作
。3、
CompletableFuture
类同时实现Future、CompletionStage
两个接口
2、CompletionStage接口说明
1、CompletionStage代表异步计算过程中的某一阶段,一个阶段完成以后可能会触发另一个阶段。
2、一个阶段执行的计算可以是一个Function、Consumer或Runnable(分别使用名称包括apply、accept或run的方法),具体取决于它是否需要参数和、或产生结果。例如, stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())。
3、一个阶段的执行可以由单个阶段的完成触发,也可以由多个阶段一起触发。
3、创建CompletableFuture方式
1、通过构造函数创建,
但是不推荐使用这种方式
。2、CompletableFuture源码中提供了四个静态方法用来创建一个异步操作:
runAsync无返回值
supplyAsync有返回值
3、
runAsync传入的任务要求是Runnable类型的,所以没有返回值
。因此,runAsync适合创建不需要返回值的计算任务,有如下两种方式:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
4、
supplyAsync执行带有返回计算结果的计算任务
,有如下两种方式:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
5、Executor executor参数说明:
没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步任务
。
如果指定Executor,则使用我们自定义的或者特别指定的线程池执行异步任务
。
/*** @Date: 2022/7/7* 1、验证不传入Executor时使用的线程池为ForkJoinPool* 2、验证传入Executor时使用自定义线程池*/
public class CompletableFutureTest1 {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);//无返回值的CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {System.out.println("future1 start...");System.out.println(Thread.currentThread().getName());System.out.println("future1 end...");});CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {System.out.println("future2 start...");System.out.println(Thread.currentThread().getName());System.out.println("future2 end...");}, pool);System.out.println(future1.get());//输出nullSystem.out.println(future2.get());//输出null//关闭线程池pool.shutdown();}
}
/*** 运行结果如下:* future1 start...* ForkJoinPool.commonPool-worker-1* future1 end...* null* future2 start...* pool-1-thread-1* future2 end...* null*/
/*** @Date: 2022/7/7* 测试有返回值的*/
public class CompletableFutureTest2 {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);//有返回值的CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("future1 start...");System.out.println(Thread.currentThread().getName());System.out.println("future1 end...");return "future1 return";}, pool);System.out.println(future1.get());//future1 return//关闭线程池pool.shutdown();}
}
三、CompletableFuture常用方法
1、获取结果和触发计算
1、
public T get()
:阻塞式获取任务结果。2、
public T get(long timeout, TimeUnit unit)
:带有时间的等待,如果超时则抛错TimeoutException。3、
public T join()
:和get方法一样。4、
public T getNow(T valueIfAbsent)
:立即获取结果(不阻塞),如果完成则返回结果值,否则返回给定的valueIfAbsent。5、
public boolean complete(T value)
:如果任务没有完成,则将返回值的方法的值设置成指定的内容。
/*** @Date: 2022/7/7*/
public class CompletableFutureTest3 {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(10);/*** get方法,阻塞式获取结果* join方法,阻塞式获取结果*/CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "完成";}, pool);System.out.println("get方法:" + future1.get());System.out.println("join方法:" + future1.join());/*** 带有超时的get方法,超过设定的时间,没有返回值,直接返回null*/CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "完成";}, pool);System.out.println(future2.get(2L, TimeUnit.SECONDS));/*** 立即获取结果(不阻塞),如果完成则返回结果值,否则返回给定的valueIfAbsent*/CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "完成";}, pool);System.out.println(future3.getNow("自定义getNow的值"));/*** 是否打断get方法,立即返回自定义的值*/System.out.println(future3.complete("自定义complete的值") + "\t结果为:" + future3.join());//关闭线程池pool.shutdown();}
}
2、函数式接口说明
1、CompletableFuture中的方法都使用到了函数式接口,使用lambda必须先了解函数式接口
函数式接口名称 方法名称 参数 返回值 Runnable void run()
无参数 无返回值 Function R apply(T t)
1个参数 有返回值 Consumer void accept(T t)
1个参数 无返回值 Supplier T get()
无参数 有返回值 BiConsumer void accept(T t, U u)
2个参数 无返回值 BiFunction R apply(T t, U u)
2个参数 有返回值
3、计算完成时回调方法
1、
whenComplete
开头的方法可以处理正常和异常的计算结果,有如下方法:2、
whenComplete与whenCompleteAsync的区别
:
whenComplete:是执行当前任务的线程继续执行whenComplete的任务。
whenCompleteAsync:是把whenCompleteAsync这个任务继续提交给线程池来执行。
3、
exceptionally
方法用于处理出现异常的情况,可以指定出现异常后的返回值,如下:4、
总结
:
方法不以
Async
结尾,说明action
使用相同的线程执行。以
Async
结尾可能会使用其他线程执行,如果是使用相同的线程池,也可能会被同一个线程执行。
/*** @Date: 2022/7/7* whenComplete方法测试*/
public class CompletableFutureTest4 {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(10);/*** whenComplete方法虽然能获取异常信息和结果,但是无法修改返回结果,* 虽然在完成运算之后,对结果+3操作,但是最后get方法获取到的数据还是2* 运行结果如下:* 当前线程:pool-1-thread-1* 运行结果:2* 当前线程:main* 运行结果:5 ,异常:null* 当前线程:pool-1-thread-1* 运行结果:5 ,异常:null* 返回结果:2*/CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 10 / 5;System.out.println("运行结果:" + i);return i;}, pool).whenComplete((result, exception) -> {//虽然能获取异常信息和结果,但是无法修改返回结果,并且是main线程来执行此任务System.out.println("当前线程:" + Thread.currentThread().getName());result = result + 3;System.out.println("运行结果:" + result + " ,异常:" + exception);}).whenCompleteAsync((result, exception) -> {//虽然能获取异常信息和结果,但是无法修改返回结果,交由线程池来执行此任务System.out.println("当前线程:" + Thread.currentThread().getName());result = result + 3;System.out.println("运行结果:" + result + " ,异常:" + exception);}, pool);System.out.println("返回结果:" + future1.get());//关闭线程池pool.shutdown();}
}
/*** @Date: 2022/7/7* exceptionally方法测试*/
public class CompletableFutureTest4 {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(1);/*** exceptionally方法可以感知到异常,并且还能修改返回结果* 运行结果如下:* 当前线程:pool-1-thread-1* 运行结果:null ,异常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero* 返回结果:10*/CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 10 / 0;System.out.println("运行结果:" + i);return i;}, pool).whenComplete((result, exception) -> {//虽然能获取异常信息和结果,但是无法修改返回结果System.out.println("运行结果:" + result + " ,异常:" + exception);}).exceptionally(exception -> {//可以感知异常,同时还能修改返回结果return 10;});System.out.println("返回结果:" + future1.get());//关闭线程池pool.shutdown();}
}
4、handle
1、可对任务结果做处理,
和whenComplete相比,handle能改变返回结果,若出现异常也能继续往下执行
。有如下方法:
/*** @Date: 2022/7/10* handle方法测试*/
public class CompletableFutureTest5 {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(1);/*** 能对异常和返回结果处理* 运行结果如下:* 当前线程:pool-1-thread-1* 运行结果:2* 当前线程:main* 上一步运行结果:2 异常信息:null* 返回结果:4*/CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 10 / 4;System.out.println("运行结果:" + i);return i;}, pool).handle((result, exception) -> {System.out.println("当前线程:" + Thread.currentThread().getName());System.out.println("上一步运行结果:" + result + " 异常信息:" + exception);if (result != null) {return result * 2;}if (exception != null) {return 0;}return 0;});System.out.println("返回结果:" + future1.get());//关闭线程池pool.shutdown();}
}
5、线程串行化方法
1、
thenApply
开头的方法,接收上一个任务返回的结果,并返回当前任务的值
,有如下方法:2、
thenAccept
开头的方法,接收上一个任务返回的结果,并消费,但无返回结果
,有如下方法:3、
thenRun
开头的方法,只要前面的任务执行完成,就开始执行,无法获取上一个任务的返回结果,也无返回结果
,有如下方法:4、
总结
:
以上方法都要前置的任务执行成功,才能运行;如果前置任务出现异常,则停止执行任务
。
带有Async是异步执行的
。
/*** @Date: 2022/7/10* thenApply方法测试*/
public class CompletableFutureTest6 {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(1);/*** thenApply方法接收上一个任务的返回结果,并返回thenApply的处理结果,可以看到返回结果两任务拼接* 运行结果如下:* 当前线程:pool-1-thread-1* 运行结果:5* 当前线程:pool-1-thread-1* 上一步运行结果:5* 返回结果:hello5*/CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, pool).thenApplyAsync(result -> {System.out.println("当前线程:" + Thread.currentThread().getName());System.out.println("上一步运行结果:" + result);return "hello" + result;}, pool);System.out.println("返回结果:" + future1.get());//关闭线程池pool.shutdown();}
}
/*** @Date: 2022/7/10* thenAccept方法测试*/
public class CompletableFutureTest7 {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(1);/*** thenAccept方法仅接收上一任务的返回结果,并消费,无返回结果;可以看到返回结果为null* 运行结果如下:* 当前线程:pool-1-thread-1* 运行结果:5* 当前线程:pool-1-thread-1* 上一步运行结果:5* 继续处理...* 返回结果:null*/CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, pool).thenAcceptAsync(result -> {System.out.println("当前线程:" + Thread.currentThread().getName());System.out.println("上一步运行结果:" + result);System.out.println("继续处理...");}, pool);System.out.println("返回结果:" + future1.get());//关闭线程池pool.shutdown();}
}
/*** @Date: 2022/7/10* thenRun方法测试*/
public class CompletableFutureTest8 {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(1);/*** thenRun方法,不能获取上一步任务的执行结果,也无返回值* 运行结果如下:* 当前线程:pool-1-thread-1* 运行结果:5* 当前线程:pool-1-thread-1* 继续处理...* 返回结果:null*/CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, pool).thenRunAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());System.out.println("继续处理...");}, pool);System.out.println("返回结果:" + future1.get());//关闭线程池pool.shutdown();}
}
6、两任务组合都要执行完成
1、当两个任务都执行完成的情况下,才能触发该任务。
2、
thenCombine
开头的方法,组合两个任务,获取两个任务的返回结果,交由thenCombine处理并返回,有如下方法:3、
thenAcceptBoth
开头的方法,组合两个任务,获取两个任务的返回结果,交由thenAcceptBoth处理,没有返回值,有如下方法:4、
runAfterBoth
开头的方法,组合两个任务,不能获取两个任务的返回结果,只需要两个任务执行完成后,交由runAfterBoth处理,没有返回值,有如下方法:
/*** @Date: 2022/7/10* runAfterBoth方法测试* 组合两个任务,无法获取前两个任务的返回值,也没有返回结果,需要前两个任务都执行完成才能触发*/
public class RunAfterBothTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任务1开始:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("任务1结束:" + i);return i;}, pool);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任务2开始:" + Thread.currentThread().getName());String str = "hello";System.out.println("任务2结束:" + str);return str;}, pool);//组合两任务CompletableFuture<Void> future3 = future1.runAfterBothAsync(future2, () -> {System.out.println("任务3开始:" + Thread.currentThread().getName());}, pool);System.out.println("返回结果:" + future3.get());//关闭线程池pool.shutdown();}
}
/*** 运行结果如下:* 任务1开始:pool-1-thread-1* 任务1结束:5* 任务2开始:pool-1-thread-2* 任务2结束:hello* 任务3开始:pool-1-thread-3* 返回结果:null*/
/*** @Date: 2022/7/10* thenAcceptBoth方法测试* 组合两个任务,能获取两个任务的返回值,没有返回结果,需要前两个任务都执行完成才能触发*/
public class ThenAcceptBothTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任务1开始:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("任务1结束:" + i);return i;}, pool);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任务2开始:" + Thread.currentThread().getName());String str = "hello";try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结束:" + str);return str;}, pool);//组合两任务CompletableFuture<Void> future3 = future1.thenAcceptBothAsync(future2, (f1, f2) -> {System.out.println("任务3开始...任务1的结果:" + f1 + " 任务2的结果:" + f2);}, pool);System.out.println("返回结果:" + future3.get());//关闭线程池pool.shutdown();}
}
/*** 运行结果如下:* 任务1开始:pool-1-thread-1* 任务1结束:5* 任务2开始:pool-1-thread-2* 任务2结束:hello* 任务3开始...任务1的结果:5 任务2的结果:hello* 返回结果:null*/
/*** @Date: 2022/7/10* thenCombine方法测试* 组合两个任务,能获取两个任务的返回值,有返回结果,需要前两个任务都执行完成才能触发*/
public class ThenCombineTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任务1开始:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("任务1结束:" + i);return i;}, pool);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任务2开始:" + Thread.currentThread().getName());String str = "hello";try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结束:" + str);return str;}, pool);//组合两任务CompletableFuture<String> future3 = future1.thenCombineAsync(future2, (f1, f2) -> {System.out.println("任务3开始...任务1的结果:" + f1 + " 任务2的结果:" + f2);return f2 + "-" + f1;}, pool);System.out.println("返回结果:" + future3.get());//关闭线程池pool.shutdown();}
}
/*** 运行结果如下:* 任务1开始:pool-1-thread-1* 任务1结束:5* 任务2开始:pool-1-thread-2* 任务2结束:hello* 任务3开始...任务1的结果:5 任务2的结果:hello* 返回结果:hello-5*/
7、两任务组合任意执行一个
1、当两个任务中,任意一个任务执行完成的时候,触发该任务。
2、
applyToEither
开头的方法,两任务有一个执行完成,获取执行完成的任务的结果,处理任务并且有新的返回值,有如下方法:3、
acceptEither
开头的方法,两任务有一个执行完成,获取执行完成的任务的结果,处理任务,没有新的返回值,有如下方法:4、
runAfterEither
开头的方法,两任务有一个执行完成,无法获取任务的结果,直接处理任务,也没有返回值,有如下方法:
/*** @Date: 2022/7/10* runAfterEither方法测试* 两个任务,只要有一个完成,就执行任务3,无法获取前面任务结果,也无返回值*/
public class RunAfterEitherTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任务1开始:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("任务1结束:" + i);return i;}, pool);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任务2开始:" + Thread.currentThread().getName());String str = "hello";try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结束:" + str);return str;}, pool);//组合两任务CompletableFuture<Void> future3 = future1.runAfterEitherAsync(future2, () -> {System.out.println("任务3开始...");}, pool);System.out.println("返回结果:" + future3.get());//关闭线程池pool.shutdown();}
}
/*** 运行结果如下:* 任务1开始:pool-1-thread-1* 任务1结束:5* 任务2开始:pool-1-thread-2* 任务2结束:hello* 任务3开始...* 返回结果:null*/
/*** @Date: 2022/7/10* AcceptEither方法测试* 两个任务,只要有一个完成,就执行任务3,获取执行完成的任务的结果,无返回值*/
public class AcceptEitherTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任务1开始:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("任务1结束:" + i);return i;}, pool);CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任务2开始:" + Thread.currentThread().getName());String str = "hello";try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结束:" + str);return str;}, pool);//组合两任务CompletableFuture<Void> future3 = future1.acceptEitherAsync(future2, (res) -> {System.out.println("任务3开始...之前的任务的结果:" + res);}, pool);System.out.println("返回结果:" + future3.get());//关闭线程池pool.shutdown();}
}
/*** 运行结果如下:* 任务1开始:pool-1-thread-1* 任务1结束:5* 任务2开始:pool-1-thread-2* 任务2结束:hello* 任务3开始...之前的任务的结果:5* 返回结果:null*/
/*** @Date: 2022/7/10* AcceptEither方法测试* 两个任务,只要有一个完成,就执行任务3,获取执行完成的任务的结果,有返回值*/
public class ApplyToEitherTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任务1开始:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("任务1结束:" + i);return i;}, pool);CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任务2开始:" + Thread.currentThread().getName());String str = "hello";try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结束:" + str);return str;}, pool);//组合两任务CompletableFuture<Object> future3 = future1.applyToEitherAsync(future2, (res) -> {System.out.println("任务3开始...之前的任务的结果:" + res);return res.toString() + " 成功";}, pool);System.out.println("返回结果:" + future3.get());//关闭线程池pool.shutdown();}
}
/*** 运行结果如下:* 任务1开始:pool-1-thread-1* 任务1结束:5* 任务2开始:pool-1-thread-2* 任务2结束:hello* 任务3开始...之前的任务的结果:5* 返回结果:5 成功*/
8、多任务组合
1、
allOf
等待所有任务完成2、
anyOf
只要有一个任务完成
/*** @Date: 2022/7/10* AllOf方法测试* 多任务组合,等待所有任务完成*/
public class AllOfTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任务1开始:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("任务1结束:" + i);return i;}, pool);CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任务2开始:" + Thread.currentThread().getName());String str = "hello";try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结束:" + str);return str;}, pool);CompletableFuture<Object> future3 = CompletableFuture.supplyAsync(() -> {System.out.println("任务3开始:" + Thread.currentThread().getName());return "成功";}, pool);//多任务组合CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);allOf.get();System.out.println("main线程执行完成");//关闭线程池pool.shutdown();}
}
/*** 运行结果如下:* 任务1开始:pool-1-thread-1* 任务1结束:5* 任务2开始:pool-1-thread-2* 任务3开始:pool-1-thread-3* 任务2结束:hello* main线程执行完成*/
/*** @Date: 2022/7/10* AnyOf方法测试* 多任务组合,只要有一个完成就返回*/
public class AnyOfTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {ExecutorService pool = Executors.newFixedThreadPool(3);CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任务1开始:" + Thread.currentThread().getName());int i = 10 / 2;try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务1结束:" + i);return i;}, pool);CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任务2开始:" + Thread.currentThread().getName());String str = "hello";try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务2结束:" + str);return str;}, pool);CompletableFuture<Object> future3 = CompletableFuture.supplyAsync(() -> {System.out.println("任务3开始:" + Thread.currentThread().getName());return "成功";}, pool);//多任务组合CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);System.out.println("返回结果:" + anyOf.get());System.out.println("main线程执行完成");//关闭线程池pool.shutdown();}
}
/*** 运行结果如下:* 任务1开始:pool-1-thread-1* 任务2开始:pool-1-thread-2* 任务3开始:pool-1-thread-3* 返回结果:成功* main线程执行完成* 任务1结束:5* 任务2结束:hello*/
9、总结
1、没有传入自定义线程池,都用默认线程池ForkJoinPool。
2、传入了自定义线程池,如果执行第一个任务的时候传入了自定义线程池:
调用不以Async结尾的方法执行第二个任务时,则第二个任务和第一个任务共用同一个线程池。
调用以Async结尾的方法执行第二个任务时,则第一个任务使用的是自定义的线程池,第二个任务使用的是ForkJoinPool。
3、
提醒
:由于系统优化切换原则,有可能处理太快,来不及切换线程,直接使用调用者线程执行任务。
九、CompletableFuture异步编排相关推荐
- 谷粒商城十五商品详情CompletableFuture异步编排
多线程异步任务的问题 例如a,b,c三个异步任务,不是随机运行就可以,它们还有一定的关系,c需要等待a的返回结果,b不需要等待谁的结果. 当异步任务产生一些关系和顺序之后,我们要编排好它们的关系进行调 ...
- Day140-142.尚品汇:AOP+Redis缓存+redssion分布式锁、CompletableFuture异步编排、首页三级分类展示、Nginx静态代理
目录 Day08 一.获取商品详情 加入缓存 二.全局缓存:分布式锁与aop 整合 三.布隆过滤器 四.CompletableFuture 异步编排 jdk1.8 Day09 1. 将item 改为多 ...
- CompletableFuture异步编排(2)
1.handle public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? e ...
- 【多线程】优雅使用线程池结合CompletableFuture实现异步编排
文章目录 参考 1.线程池引入 2.Executors 2.1.概述 2.2.Executors缺陷 3.优雅的创建线程池 3.1.正确挑选方法 3.2.线程池配置类 4.线程池执行流程 5.Comp ...
- 谷粒商城高级篇(38)——异步编排之商品详情查询
异步编排之商品详情查询 异步编排 CompletableFuture介绍 创建异步对象 计算完成时回调方法 handle 方法 线程串行化方法 两任务组合 全部完成 一个完成即可 多任务组合 业务描述 ...
- Java异步编排 2021-09-11
Java 异步并发与池化技术 异步编排 Java 异步并发与池化技术 一.什么是异步调用 二 Future接口 三.异步开启 四.CompletableFuture接口实现异步编排 4.1串行关系执行 ...
- 谷粒商城笔记+踩坑(15)——商品详情搭建+异步编排
导航: 谷粒商城笔记+踩坑汇总篇 目录 1.搭建页面环境 1.1.配置 Nginx 和 网关 1.2.动静资源配置 1.3.搜索页到详情页跳转 2.模型类抽取和controller 2.1.分析首页需 ...
- java 并发 异步_Java并发 CompletableFuture异步编程的实现
前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作.如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得 ...
- 实验九 使用异步方式实现文件读\写
实验九 使用异步方式实现文件读\写 一.实验目的 了解Windows系统异步文件读/写的概念. 熟悉Windows系统文件读/写相关的API. 掌握采用异步方式实现文件读/写的相关参数设置. 二.实验 ...
最新文章
- 假设你有一个多层感知机网络(MLP),输入层有10个节点、一个单隐层共50个神经元,最后是一个3个神经元的输出层。请问网络的结构是什么样子的使用数学形式进行描述?
- [Oracle] CPU/PSU补丁安装详细教程
- 一. MyBatis系列:第一个MyBatis工程
- 个人博客代码_Jekyll + Github Pages 搭建个人免费博客
- Angular Model
- PancakeBunny获160万美元战略融资,Binance Labs领投
- Fix Corrupt Blocks on HDFS
- Vue.js 学习笔记 十二 Vue发起Ajax请求
- 构建自己的NSZombie
- 把自己录制的mp3音乐分享到朋友圈
- 抗光幕布哪个牌子好?
- xp系统蓝屏代码7b_蓝屏代码7b怎么修复
- Elasticsearch APIs解析(一、API规范及约定)
- 数值分析基础工具使用Matlab绘制双曲线
- Sklearn KDTree 的使用
- python not found in axis_关于python:pandas-drop函数错误 (label not contained in axis)
- 用opencv实现两张半透明png图片以一定透明度叠加
- c语言指针作用一句话,C语言指针是什么?C语言指针的概念。
- java入门基础(四)
- web渗透之文件上传漏洞