CompletionService

Callable + Future 可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。

而CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。

CompletionService原理

内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果

使用案例

询价应用:向不同电商平台询价,并保存价格

采用“ThreadPoolExecutor+Future”的方案:异步执行询价然后再保存

使用CompletionService实现先获取的报价先保存到数据库

@Slf4j
public class CompletionServiceDemo {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建线程池ExecutorService executor = Executors.newFixedThreadPool(10);//创建CompletionServiceCompletionService<Integer> cs = new ExecutorCompletionService<>(executor);//异步向电商S1询价cs.submit(() -> getPriceByS1());//异步向电商S2询价cs.submit(() -> getPriceByS2());//异步向电商S3询价cs.submit(() -> getPriceByS3());//将询价结果异步保存到数据库for (int i = 0; i < 3; i++) {//从阻塞队列获取futureTaskInteger r = cs.take().get();executor.execute(() -> save(r));}executor.shutdown();}private static void save(Integer r) {log.debug("保存询价结果:{}",r);}private static Integer getPriceByS1() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(5000);log.debug("电商S1询价信息1200");return 1200;}private static Integer getPriceByS2() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(8000);log.debug("电商S2询价信息1000");return 1000;}private static Integer getPriceByS3()  throws InterruptedException {TimeUnit.MILLISECONDS.sleep(3000);log.debug("电商S3询价信息800");return 800;}
}

实现类似 Dubbo 的 Forking Cluster场景

Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行地调用多个服务实例,只要有一个成功就返回结果。

geocoder(addr) {//并行执行以下3个查询服务,r1=geocoderByS1(addr);r2=geocoderByS2(addr);r3=geocoderByS3(addr);//只要r1,r2,r3有一个返回//则返回return r1|r2|r3;
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于保存Future对象
List<Future<Integer>> futures = new ArrayList<>(3);
//提交异步任务,并保存future到futures
futures.add(cs.submit(()->geocoderByS1()));
futures.add(cs.submit(()->geocoderByS2()));
futures.add(cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {// 只要有一个成功返回,则breakfor (int i = 0; i < 3; ++i) {r = cs.take().get();//简单地通过判空来检查是否成功返回if (r != null) {break;}}
} finally {//取消所有任务for(Future<Integer> f : futures)f.cancel(true);
}
// 返回结果

应用场景总结

  • 当需要批量提交异步任务的时候建议你使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。

  • CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。

  • 线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

CompletableFuture

简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。

CompletableFuture是Future接口的扩展和增强。 CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

jdk8 API文档:https://docs.oracle.com/javase/8/docs/api/

CompletionStage接口: 执行某一个阶段,可向下执行后续阶段。异步执行,默认线程池是ForkJoinPool.commonPool()

应用场景

1. 描述依赖关系:

  • thenApply() 把前面异步任务的结果,交给后面的Function

  • thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回

2. 描述and聚合关系:

  • thenCombine:任务合并,有返回值

  • thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。

  • runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。

3. 描述or聚合关系:

  • applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。

  • acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。

  • runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。

4. 并行执行:

  • CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行

创建异步操作

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这四个方法区别在于:

  • runAsync 方法以Runnable函数式接口类型为参数,没有返回结果,supplyAsync 方法Supplier函数式接口类型为参数,返回结果类型为U;Supplier 接口的 get() 方法是有返回值的(会阻塞)

  • 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

  • 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

runAsync&supplyAsync

Runnable runnable = () -> System.out.println("执行无返回结果的异步任务");
CompletableFuture.runAsync(runnable);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("执行有返回值的异步任务");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello World";
});
String result = future.get();

获取结果

join&get

join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)

结果处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
  • Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。

  • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

  • 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常

whenComplete & exceptionally

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}if (new Random().nextInt(10) % 2 == 0) {int i = 12 / 0;}System.out.println("执行结束!");return "test";
});//BiConsumer 消费型结果 不需要join
future.whenComplete(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String t, Throwable action) {System.out.println(t+" 执行完成!");}
});
//函数型需要 join
future.exceptionally(new Function<Throwable, String>() {@Overridepublic String apply(Throwable t) {System.out.println("执行失败:" + t.getMessage());return "异常xxxx";}
}).join();
执行结束!
test 执行完成!
或者
执行失败:java.lang.ArithmeticException: / by zero
null 执行完成!

结果转换

所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。

thenApply

thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int result = 100;System.out.println("一阶段:" + result);return result;
}).thenApply(number -> {int result = number * 3;System.out.println("二阶段:" + result);return result;
});
一阶段:100
二阶段:300
最终结果:300

thenCompose

thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(30);System.out.println("第一阶段:" + number);return number;}}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {@Overridepublic CompletionStage<Integer> apply(Integer param) {return CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = param * 2;System.out.println("第二阶段:" + number);return number;}});}});
第一阶段:10
第二阶段:20
最终结果: 20

thenApply 和 thenCompose 的区别

thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture;

thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生成一个新的CompletableFuture。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> result1 = future.thenApply(param -> param + " World");CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));System.out.println(result1.get());
System.out.println(result2.get());
Hello World
Hello World

结果消费

与结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数 只对结果执行Action,而不返回新的计算值。

根据对结果的处理方式,结果消费函数又分为:

  • thenAccept系列:对单个结果进行消费

  • thenAcceptBoth系列:对两个结果进行消费

  • thenRun系列:不关心结果,只对结果执行Action

thenAccept

通过观察该系列函数的参数类型可知,它们是函数式接口Consumer,这个接口只有输入,没有返回值。

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}).thenAccept(number ->System.out.println("第二阶段:" + number * 5));
第一阶段:8
第二阶段:40
最终结果:null

thenAcceptBoth

thenAcceptBoth 函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果。

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}
});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}
});futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {@Overridepublic void accept(Integer x, Integer y) {System.out.println("最终结果:" + (x + y));}
第二阶段:1
第一阶段:2
最终结果:3

thenRun

thenRun 也是对线程任务结果的一种消费函数,与thenAccept不同的是,thenRun 会在上一阶段 CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;
}).thenRun(() ->System.out.println("thenRun 执行"));
第一阶段:2
thenRun 执行
最终结果:null

结果组合

thenCombine

thenCombine 方法,合并两个线程任务的结果,并进一步处理。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二阶段:" + number);return number;}});
CompletableFuture<Integer> result = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer x, Integer y) {return x + y;}});
第一阶段:9
第二阶段:5
最终结果:14

任务交互

所谓线程交互,是指将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理。

applyToEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一阶段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段end:" + number);return number;}});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二阶段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段end:" + number);return number;}});future1.applyToEither(future2, new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer number) {System.out.println("最快结果:" + number);return number * 2;}
第一阶段start:6
第二阶段start:5
第二阶段end:5
最快结果:5

acceptEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});future1.acceptEither(future2, new Consumer<Integer>() {@Overridepublic void accept(Integer number) {System.out.println("最快结果:" + number);}
第二阶段:3
最快结果:3

runAfterEither

两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});future1.runAfterEither(future2, new Runnable() {@Overridepublic void run() {System.out.println("已经有一个任务完成了");}
}).join();
第一阶段:3
已经有一个任务完成了

runAfterBoth

两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:1");return 1;}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:2");return 2;}
});future1.runAfterBoth(future2, new Runnable() {@Overridepublic void run() {System.out.println("上面两个任务都执行完成了。");}
第一阶段:1
第二阶段:2
上面两个任务都执行完成了。

anyOf

anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
Random random = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return "hello";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(1));} catch (InterruptedException e) {e.printStackTrace();}return "world";});
CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
world

allOf

allOf方法用来实现多 CompletableFuture 的同时返回。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1完成!");return "future1完成!";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2完成!");return "future2完成!";});CompletableFuture<Void> combindFuture = CompletableFuture.allOf(future1, future2);
try {combindFuture.get();
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}
future2完成!
future1完成!
future1: true,future2: true

CompletableFuture常用方法总结

使用案例:实现最优的“烧水泡茶”程序

著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:

对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。

基于Future实现

public class FutureTaskDemo3{public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建任务T2的FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 创建任务T1的FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 线程T1执行任务ft1Thread T1 = new Thread(ft1);T1.start();// 线程T2执行任务ft2Thread T2 = new Thread(ft2);T2.start();// 等待线程T1执行结果System.out.println(ft1.get());}
}// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {FutureTask<String> ft2;// T1任务需要T2任务的FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@Overridepublic String call() throws Exception {System.out.println("T1:洗水壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T1:烧开水...");TimeUnit.SECONDS.sleep(15);// 获取T2线程的茶叶String tf = ft2.get();System.out.println("T1:拿到茶叶:"+tf);System.out.println("T1:泡茶...");return "上茶:" + tf;}
}
// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:洗茶壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2:拿茶叶...");TimeUnit.SECONDS.sleep(1);return "龙井";}

基于CompletableFuture实现

public class CompletableFutureDemo2 {public static void main(String[] args) {//任务1:洗水壶->烧开水CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {System.out.println("T1:洗水壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:烧开水...");sleep(15, TimeUnit.SECONDS);});//任务2:洗茶壶->洗茶杯->拿茶叶CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {System.out.println("T2:洗茶壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶叶...");sleep(1, TimeUnit.SECONDS);return "龙井";});//任务3:任务1和任务2完成后执行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {System.out.println("T1:拿到茶叶:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});//等待任务3执行结果System.out.println(f3.join());}static void sleep(int t, TimeUnit u){try {u.sleep(t);} catch (InterruptedException e) {}}

工作任务

@Slf4j
public class CompletableFutureDemo {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(10);log.debug("monkey进入餐厅,点了份西红柿炒番茄");CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{log.debug("厨师炒菜");sleep(2,TimeUnit.SECONDS);return "西红柿炒番茄好了";},executorService).thenCombine(CompletableFuture.supplyAsync(()->{log.debug("服务员蒸饭");sleep(3,TimeUnit.SECONDS);return "米饭好了";}),(dish,rice)->{log.debug("服务员打饭");sleep(1,TimeUnit.SECONDS);return dish+","+rice;});log.debug("monkey在刷抖音");log.debug("{},monkey开吃",cf.join());log.debug("monkey吃完饭去结账,要求开发票");cf = CompletableFuture.supplyAsync(()->{log.debug("服务员收款");sleep(1,TimeUnit.SECONDS);return "20";}).thenApply(money->{log.debug("服务员开发票,面额{}元",money);sleep(2,TimeUnit.SECONDS);return money+"元发票";});log.debug("monkey接到朋友电话");log.debug("monkey拿到{},准备回家", cf.join());log.debug("monkey走出餐厅,来到公交车站,等待301路或者918路公交到来");cf = CompletableFuture.supplyAsync(() -> {log.debug("301路公交正在赶来");sleep(2,TimeUnit.SECONDS);return "301路到了";}).applyToEither(CompletableFuture.supplyAsync(() -> {log.debug("918路公交正在赶来");sleep(1,TimeUnit.SECONDS);return "918路到了";}), bus -> {if(bus.startsWith("918")){throw new RuntimeException("918撞树了.......");}return bus;}).exceptionally(e->{log.debug(e.getMessage());log.debug("monkey叫出租车");sleep(3,TimeUnit.SECONDS);return "出租车到了";});log.debug("{},monkey坐车回家", cf.join());}static void sleep(int t, TimeUnit u){try {u.sleep(t);} catch (InterruptedException e) {}}
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/bin/java -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=56490:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/lib/rt.jar:/Users/menxu/Workspace/java/source/thread.2020.8.23/juc-demo(并发专题完整代码)/target/classes:/Users/menxu/.m2/repository/org/projectlombok/lombok/1.18.20/lombok-1.18.20.jar:/Users/menxu/.m2/repository/org/openjdk/jol/jol-core/0.10/jol-core-0.10.jar:/Users/menxu/.m2/repository/com/lmax/disruptor/3.3.4/disruptor-3.3.4.jar:/Users/menxu/.m2/repository/junit/junit/4.12/junit-4.12.jar:/Users/menxu/.m2/repository/org/hamcrest/hamcrest-core/2.2/hamcrest-core-2.2.jar:/Users/menxu/.m2/repository/org/springframework/boot/spring-boot-starter-data-redis/2.5.4/spring-boot-starter-data-redis-2.5.4.jar:/Users/menxu/.m2/repository/org/springframework/data/spring-data-redis/2.5.4/spring-data-redis-2.5.4.jar:/Users/menxu/.m2/repository/org/springframework/data/spring-data-keyvalue/2.5.4/spring-data-keyvalue-2.5.4.jar:/Users/menxu/.m2/repository/org/springframework/data/spring-data-commons/2.5.4/spring-data-commons-2.5.4.jar:/Users/menxu/.m2/repository/org/springframework/spring-tx/5.3.9/spring-tx-5.3.9.jar:/Users/menxu/.m2/repository/org/springframework/spring-oxm/5.3.9/spring-oxm-5.3.9.jar:/Users/menxu/.m2/repository/org/springframework/spring-aop/5.3.9/spring-aop-5.3.9.jar:/Users/menxu/.m2/repository/org/springframework/spring-context-support/5.3.9/spring-context-support-5.3.9.jar:/Users/menxu/.m2/repository/org/slf4j/slf4j-api/1.7.32/slf4j-api-1.7.32.jar:/Users/menxu/.m2/repository/io/lettuce/lettuce-core/6.1.4.RELEASE/lettuce-core-6.1.4.RELEASE.jar:/Users/menxu/.m2/repository/io/netty/netty-common/4.1.67.Final/netty-common-4.1.67.Final.jar:/Users/menxu/.m2/repository/io/netty/netty-handler/4.1.67.Final/netty-handler-4.1.67.Final.jar:/Users/menxu/.m2/repository/io/netty/netty-resolver/4.1.67.Final/netty-resolver-4.1.67.Final.jar:/Users/menxu/.m2/repository/io/netty/netty-buffer/4.1.67.Final/netty-buffer-4.1.67.Final.jar:/Users/menxu/.m2/repository/io/netty/netty-codec/4.1.67.Final/netty-codec-4.1.67.Final.jar:/Users/menxu/.m2/repository/io/netty/netty-transport/4.1.67.Final/netty-transport-4.1.67.Final.jar:/Users/menxu/.m2/repository/io/projectreactor/reactor-core/3.4.9/reactor-core-3.4.9.jar:/Users/menxu/.m2/repository/org/reactivestreams/reactive-streams/1.0.3/reactive-streams-1.0.3.jar:/Users/menxu/.m2/repository/org/springframework/boot/spring-boot-starter-web/2.5.4/spring-boot-starter-web-2.5.4.jar:/Users/menxu/.m2/repository/org/springframework/boot/spring-boot-starter-json/2.5.4/spring-boot-starter-json-2.5.4.jar:/Users/menxu/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.12.4/jackson-databind-2.12.4.jar:/Users/menxu/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.12.4/jackson-annotations-2.12.4.jar:/Users/menxu/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.12.4/jackson-core-2.12.4.jar:/Users/menxu/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.12.4/jackson-datatype-jdk8-2.12.4.jar:/Users/menxu/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.12.4/jackson-datatype-jsr310-2.12.4.jar:/Users/menxu/.m2/repository/com/fasterxml/jackson/module/jackson-module-parameter-names/2.12.4/jackson-module-parameter-names-2.12.4.jar:/Users/menxu/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/2.5.4/spring-boot-starter-tomcat-2.5.4.jar:/Users/menxu/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/9.0.52/tomcat-embed-core-9.0.52.jar:/Users/menxu/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/9.0.52/tomcat-embed-el-9.0.52.jar:/Users/menxu/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/9.0.52/tomcat-embed-websocket-9.0.52.jar:/Users/menxu/.m2/repository/org/springframework/spring-web/5.3.9/spring-web-5.3.9.jar:/Users/menxu/.m2/repository/org/springframework/spring-beans/5.3.9/spring-beans-5.3.9.jar:/Users/menxu/.m2/repository/org/springframework/spring-webmvc/5.3.9/spring-webmvc-5.3.9.jar:/Users/menxu/.m2/repository/org/springframework/spring-context/5.3.9/spring-context-5.3.9.jar:/Users/menxu/.m2/repository/org/springframework/spring-expression/5.3.9/spring-expression-5.3.9.jar:/Users/menxu/.m2/repository/org/springframework/boot/spring-boot-starter/2.5.4/spring-boot-starter-2.5.4.jar:/Users/menxu/.m2/repository/org/springframework/boot/spring-boot/2.5.4/spring-boot-2.5.4.jar:/Users/menxu/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.5.4/spring-boot-autoconfigure-2.5.4.jar:/Users/menxu/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.5.4/spring-boot-starter-logging-2.5.4.jar:/Users/menxu/.m2/repository/ch/qos/logback/logback-classic/1.2.5/logback-classic-1.2.5.jar:/Users/menxu/.m2/repository/ch/qos/logback/logback-core/1.2.5/logback-core-1.2.5.jar:/Users/menxu/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.14.1/log4j-to-slf4j-2.14.1.jar:/Users/menxu/.m2/repository/org/apache/logging/log4j/log4j-api/2.14.1/log4j-api-2.14.1.jar:/Users/menxu/.m2/repository/org/slf4j/jul-to-slf4j/1.7.32/jul-to-slf4j-1.7.32.jar:/Users/menxu/.m2/repository/jakarta/annotation/jakarta.annotation-api/1.3.5/jakarta.annotation-api-1.3.5.jar:/Users/menxu/.m2/repository/org/springframework/spring-core/5.3.9/spring-core-5.3.9.jar:/Users/menxu/.m2/repository/org/springframework/spring-jcl/5.3.9/spring-jcl-5.3.9.jar:/Users/menxu/.m2/repository/org/yaml/snakeyaml/1.28/snakeyaml-1.28.jar:/Users/menxu/.m2/repository/org/hamcrest/hamcrest/2.2/hamcrest-2.2.jar com.tuling.future.CompletableFutureDemo
18:18:12.555 [main] DEBUG com.tuling.future.CompletableFutureDemo - monkey进入餐厅,点了份西红柿炒番茄
18:18:12.641 [pool-1-thread-1] DEBUG com.tuling.future.CompletableFutureDemo - 厨师炒菜
18:18:12.643 [ForkJoinPool.commonPool-worker-1] DEBUG com.tuling.future.CompletableFutureDemo - 服务员蒸饭
18:18:12.644 [main] DEBUG com.tuling.future.CompletableFutureDemo - monkey在刷抖音
18:18:15.648 [ForkJoinPool.commonPool-worker-1] DEBUG com.tuling.future.CompletableFutureDemo - 服务员打饭
18:18:16.648 [main] DEBUG com.tuling.future.CompletableFutureDemo - 西红柿炒番茄好了,米饭好了,monkey开吃
18:18:16.651 [main] DEBUG com.tuling.future.CompletableFutureDemo - monkey吃完饭去结账,要求开发票
18:18:16.652 [ForkJoinPool.commonPool-worker-1] DEBUG com.tuling.future.CompletableFutureDemo - 服务员收款
18:18:16.653 [main] DEBUG com.tuling.future.CompletableFutureDemo - monkey接到朋友电话
18:18:17.658 [ForkJoinPool.commonPool-worker-1] DEBUG com.tuling.future.CompletableFutureDemo - 服务员开发票,面额20元
18:18:19.660 [main] DEBUG com.tuling.future.CompletableFutureDemo - monkey拿到20元发票,准备回家
18:18:19.660 [main] DEBUG com.tuling.future.CompletableFutureDemo - monkey走出餐厅,来到公交车站,等待301路或者918路公交到来
18:18:19.660 [ForkJoinPool.commonPool-worker-1] DEBUG com.tuling.future.CompletableFutureDemo - 301路公交正在赶来
18:18:19.661 [ForkJoinPool.commonPool-worker-2] DEBUG com.tuling.future.CompletableFutureDemo - 918路公交正在赶来
18:18:20.662 [ForkJoinPool.commonPool-worker-2] DEBUG com.tuling.future.CompletableFutureDemo - java.lang.RuntimeException: 918撞树了.......
18:18:20.662 [ForkJoinPool.commonPool-worker-2] DEBUG com.tuling.future.CompletableFutureDemo - monkey叫出租车
18:18:23.662 [main] DEBUG com.tuling.future.CompletableFutureDemo - 出租车到了,monkey坐车回家

并发编程之深入理解十三:CompletionService CompletableFuture相关推荐

  1. 并发编程之深入理解java线程

    并发编程之深入理解java线程 一.线程基础知识 1.1 进程和线程 1.1.1 进程 1.1.2 线程 1.1.3 进程与线程的区别 1.1.4 进程间通信的方式 1.2 线程的同步互斥 1.3 上 ...

  2. 并发编程之深入理解synchronized

    并发编程之深入理解synchronized 一.java共享内存带来的线程安全问题 1.1 问题分析 1.2 临界区 1.3 竞态条件 二.synchronized使用 2.1 解决之前的共享问题 三 ...

  3. 并发编程之深入理解JMM并发三大特性volatile

    并发编程之深入理解JMM&并发三大特性&volatile 并发和并行 并发三大特性 可见性 有序性 原子性 Java内存模型(JMM) JMM定义 JMM与硬件内存架构的关系 内存交互 ...

  4. java线程钥匙_Java多线程并发编程/锁的理解

    一.前言 最近项目遇到多线程并发的情景(并发抢单&恢复库存并行),代码在正常情况下运行没有什么问题,在高并发压测下会出现:库存超发/总库存与sku库存对不上等各种问题. 在运用了 限流/加锁等 ...

  5. Java 并发编程概念深入理解

    why-为什么要有多线程? 单线程情况下: 在有IO操作的情况下,线程是在阻塞的,cpu什么事情也不干,直到IO操作完成 如果没有IO操作且是单核cpu,可以是单线程 多线程的情况下: 有IO操作的情 ...

  6. 呸 渣男!八股文不让看,非得让看并发编程全彩图册,这下又进厂了

    前言 我不知道你有没有发现,很多Java 的高级知识点,其实在我们平时的工作中,用到的场景并不是很多.这样下来就造成了一个知识漏洞的现象,自己平时在工作中表现和成绩都是不错的,但是在没有进行复习.准备 ...

  7. Java 并发编程CAS、volatile、synchronized原理详解

    CAS(CompareAndSwap) 什么是CAS? 在Java中调用的是Unsafe的如下方法来CAS修改对象int属性的值(借助C来调用CPU底层指令实现的): /*** * @param o ...

  8. 阿里大手子评:入门到大成!GitHub新上线并发编程深度解析实战PDF

    前言: 众所周知,在很多一二线互联网公司的面试中,并发编程几乎是必然会问的问题,而绝大部分程序员对并发编程的理解也都停留在使用阶段. 市面上几乎所有有关并发编程的书,通过搜索引擎查找了几乎所有的并发编 ...

  9. 开篇:并发编程核心[分工、协作、互斥]

    线程和进程图解 工厂.车间.工人 并发编程可抽象成三个核心问题: 分工.同步/协作.互斥 学并发编程,透彻理解这三个核心是关键 分工 将当前 Sprint 的 Story 拆分成「合适」大小的 Tas ...

最新文章

  1. Qt---布局,设置控件边距,拉伸因子
  2. “僵尸病毒”入侵全球电脑,7.5万部电脑中招(来源:广州日报)
  3. 算法导论之动态规划(最长公共子序列和最优二叉查找树)
  4. unity3d AssetBundle包加密
  5. AngularJS $http 异步后台无法获取请求参数
  6. 模板:半平面交(计算几何)
  7. 阿里修冶:微服务拆分之道
  8. 常见排序算法的原理与实现(js)
  9. onmouseover|onmouseout和onmouseenter|onmouseleave的区别
  10. introduce to Installsheild X
  11. python支持双向索引_python3 deque 双向队列创建与使用方法分析
  12. 三星滑盖手机java游戏_三星滑盖手机大全简介
  13. 35岁学python爬虫_学习python12小时后,告诉想学爬虫的你,别怕,爬虫,没那么难抓...
  14. c语言指针选择题库及答案,C语言指针练习习题及答案.doc
  15. c语言parse是什么意思英语,it/parse是什么意思
  16. uniapp之app自动更新
  17. java推送叮叮消息,叮叮叮!请及时签收入门学习Java导航路线
  18. 正确简单地安装Tensorflow和Keras
  19. Java从小白到大牛第1篇 Java基础-关东升-专题视频课程
  20. Linux快速查看文件内容中包含的字符

热门文章

  1. 各大主流招聘平台的优缺点和适合人群【总结】
  2. oracle同步数据adg_[adg数据库同步机制]三分钟读懂Oracle数据库容灾架之DataGuard
  3. OpenSSL BIO源码简析
  4. SLIC 超像素分割详解(三):应用
  5. Android Qcom USB Driver学习(一)
  6. 计算机维修基本技能考试试题,高级计算机维修工操作技能考核试卷
  7. oracle安装与使用
  8. 基础-数学-最大后验概率(MAP)maximum a posteriori
  9. Ajax实现图片上传并预览
  10. 1.MATLAB图像处理基础知识