文章目录

  • FutureTask概述
    • 使用实例
    • 类图结构
    • FutureTask的run()方法
    • FutureTask的局限性
  • CompletableFuture概述
    • CompletableFuture代码实例:通知等待
    • CompletableFuture代码实例:异步计算与结果转换
      • runAsync
      • supplyAsync
      • thenRun
      • thenAccept
      • thenApply
      • whenComplete

FutureTask概述

FutureTask代表了一个可被取消的异步计算任务,该类实现了Future接口,比如提供了启动和取消任务、查询任务是否完成、获取计算结果的接口。 FutureTask任务的结果只有当任务完成后才能获取,并且只能通过get系列方法获取,当结果还没出来时,线程调用get系列方法会被阻塞。另外,一旦任务被执行完成,任务将不能重启,除非运行时使用了runAndReset方法。FutureTask中的任务可以是Callable类型,也可以是Runnable类型(因为FutureTask实现了Runnable接口),FutureTask类型的任务可以被提交到线程池执行。

使用实例

  public static String doSomethingA() {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("--- doSomethingA---");return "TaskAResult";}public static String doSomethingB() {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("--- doSomethingB---");return "TaskBResult";}public static void main(String args[]) throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();// 1.创建future任务FutureTask<String> futureTask = new FutureTask<String>(() -> {String result = null;try {result = doSomethingA();} catch (Exception e) {e.printStackTrace();}return result;});// 2.开启异步单元执行任务AThread thread = new Thread(futureTask, "threadA");thread.start();// 3.执行任务BString taskBResult = doSomethingB();// 4.同步等待线程A运行结束String taskAResult = futureTask.get();// 5.打印两个任务执行结果System.out.println(taskAResult + " " + taskBResult);System.out.println(System.currentTimeMillis() - start);}}


使用main线程执行任务doSomethingB,这时候任务doSomethingB和doSomethingA是并发运行的,等main函数运行doSomethingB完毕后,同步等待doSomethingA任务完成,然后代码5打印两个任务的执行结果。 ·如上可知使用FutureTask可以获取到异步任务的结果。当然我们也可以把FutureTask提交到线程池来执行,使用线程池运行方式的代码如下:

    // 0自定义线程池private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS,AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5),new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) throws InterruptedException, ExecutionException {long start = System.currentTimeMillis();// 1.创建future任务FutureTask<String> futureTask = new FutureTask<String>(() -> {String result = null;try {result = doSomethingA();} catch (Exception e) {e.printStackTrace();}return result;});// 2.开启异步单元执行任务APOOL_EXECUTOR.execute(futureTask);// 3.执行任务BString taskBResult = doSomethingB();// 4.同步等待线程A运行结束String taskAResult = futureTask.get();// 5.打印两个任务执行结果System.out.println(taskAResult + " " + taskBResult);System.out.println(System.currentTimeMillis() - start);} 

下面代码与上面是等价的:

 public static void main(String[] args) throws InterruptedException, ExecutionException {long start = System.currentTimeMillis();// 1.开启异步单元执行任务AFuture<String> futureTask =POOL_EXECUTOR.submit(() -> {String result = null;try {result = doSomethingA();} catch (Exception e) {e.printStackTrace();}return result;});// 2.执行任务BString taskBResult = doSomethingB();// 3.同步等待线程A运行结束String taskAResult = futureTask.get();// 4.打印两个任务执行结果System.out.println(taskAResult + " " + taskBResult);System.out.println(System.currentTimeMillis() - start);}

类图结构

FutureTask实现了Future接口的所有方法,并且实现了Runnable接口,所以其是可执行任务,可以投递到线程池或者线程来执行。
·FutureTask中变量state是一个使用volatile关键字修饰类型,用来记录任务状态,任务状态枚举值如下:
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
一开始任务状态会被初始化为NEW;当通过set、setException、cancel函数设置任务结果时,任务会转换为终止状态;在任务完成过程中,任务状态可能会变为COMPLETING(当结果被使用set方法设置时),也可能会经过INTERRUPTING状态(当使用cancel(true)方法取消任务并中断任务时)。当任务被中断后,任务状态为INTERRUPTED;当任务被取消后,任务状态为CANCELLED;当任务正常终止时,任务状态为NORMAL;当任务执行异常后,任务状态会变为EXCEPTIONAL。 另外在任务运行过程中,任务可能的状态转换路径如下: ✦✦✦·NEW→COMPLETING→NORMAL:正常终止流程转换。
✦✦✦·NEW→COMPLETING→EXCEPTIONAL:执行过程中发生异常流程转换。
✦✦✦·NEW→CANCELLED:任务还没开始就被取消。 ✦✦✦·NEW→INTERRUPTING→INTERRUPTED:任务被中断。
从上述转换可知,任务最终只有四种终态:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED,另外可知任务的状态值是从上到下递增的。

⇨类图中callable是有返回值的可执行任务,创建FutureTask对象时,可以通过构造函数传递该任务。
⇨·类图中outcome是任务运行的结果,可以通过get系列方法来获取该结果。另外,outcome这里没有被修饰为volatile,是因为变量state已经被volatile修饰了,这里是借用volatile的内存语义来保证写入outcome时会把值刷新到主内存,读取时会从主内存读取,从而避免多线程下内存不可见问题
⇨·类图中runner变量,记录了运行该任务的线程,这个是在FutureTask的run方法内使用CAS函数设置的。
⇨·类图中waiters变量是一个WaitNode节点,是用Treiber stack实现的无锁栈,栈顶元素用waiters代表。栈用来记录所有等待任务结果的线程节点,其定义为:

static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}

可知其是一个简单的链表,用来记录所有等待结果被阻塞的线程。 最后我们看下其构造函数:

 public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;
}

由上述代码可知,构造函数内保存了传递到callable任务的callable变量,并且将任务状态设置为NEW,这里由于state为volatile修饰,所以写入state的值可以保证callable的写入也会被刷入主内存,以避免多线程下内存不可见问题。 另外还有一个构造函数:

public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;
}

该函数传入一个Runnable类型的任务,由于该任务是不具有返回值的,所以这里使用Executors.callable方法进行适配,适配为Callable类型的任务。 “Executors.callable(runnable,result);”把Runnable类型任务转换为callable:

另外,FutureTask中使用了UNSAFE机制来操作内存变量:

private static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;//state变量的偏移地址private static final long runnerOffset;//runner变量的偏移地址private static final long waitersOffset;//waiters变量的偏移地址static {try {//获取UNSAFE的实例UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;//获取变量state的偏移地址stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));//获取变量runner的偏移地址runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));//获取变量waiters变量的偏移地址waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}
}

如上代码分别获取了FutureTask中几个变量在FutureTask对象内的内存地址偏移量,以便实现用UNSAFE的CAS操作来操作这些变量。

FutureTask的run()方法

该方法是任务的执行体,线程是调用该方法来具体运行任务的,如果任务没有被取消,则该方法会运行任务,并且将结果设置到outcome变量中,其代码如下:

public void run() {//1.如果任务不是初始化的NEW状态,或者使用CAS设置runner为当前线程失败,则直接返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,                                     null, Thread.currentThread()))return;//2.如果任务不为null,并且任务状态为NEW,则执行任务try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;//2.1执行任务,如果OK则设置ran标记为truetry {result = c.call();ran = true;} catch (Throwable ex) {//2.2执行任务出现异常,则标记false,并且设置异常result = null;ran = false;setException(ex);}//3.任务执行正常,则设置结果if (ran)set(result);}} finally {runner = null;int s = state;//4.为了保证调用cancel(true)的线程在该run方法返回前中断任务执行的线程if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}private void handlePossibleCancellationInterrupt(int s) {//为了保证调用cancel在该run方法返回前中断任务执行的线程//这里使用Thread.yield()让run方法执行线程让出CPU执行权,以便让//cancel(true)的线程执行cancel(true)中的代码中断任务线程if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); // wait out pending interrupt
}

代码1,如果任务不是初始化的NEW状态,或者使用CAS设置runner为当前线程失败,则直接返回;这个可以防止同一个FutureTask对象被提交给多个线程来执行,导致run方法被多个线程同时执行造成混乱。
代码2,如果任务不为null,并且任务状态为NEW,则执行任务,其中代码2.1调用c.call()具体执行任务,如果任务执行OK,则调用set方法把结果记录到result,并设置ran为true;如果执行任务过程中抛出异常则设置result为null,ran为false,并且调用setException设置异常信息后,任务就处于终止状态,其中setException代码如下:

protected void setException(Throwable t) {//2.2.1if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state//2.2.1.1finishCompletion();}

由上述代码可知,使用CAS尝试设置任务状态state为COMPLETING,如果CAS成功,则把异常信息设置到outcome变量,并且设置任务状态为EXCEPTIONAL终止状态,然后调用finishCompletion,其代码如下:

private void finishCompletion() {//a遍历链表节点for (WaitNode q; (q = waiters) != null;) {//a.1 CAS设置当前waiters节点为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//a.1.1for (;;) {//唤醒当前q节点对应的线程Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}//获取q的下一个节点WaitNode next = q.next;if (next == null)break;q.next = null; //help gcq = next;}break;}}// b所有阻塞的线程都被唤醒后,调用done方法done();callable = null;        // callable设置为null
}

以上代码表示当任务处于终态后,激活waiters链表中所有由于等待获取结果而被阻塞的线程,并从waiters链表中移除它们,等待所有由于等待该任务结果的线程被唤醒后调用done()方法

FutureTask的局限性

FutureTask虽然提供了用来检查任务是否执行完成、等待任务执行结果、获取任务执行结果的方法,但是这些特色并不足以让我们写出简洁的并发代码,比如它并不能清楚地表达多个FutureTask之间的关系。另外,为了从Future获取结果,我们必须调用get()方法,而该方法还是会在任务执行完毕前阻塞调用线程,这明显不是我们想要的。 我们真正想要的是:

·可以将两个或者多个异步计算结合在一起变成一个,这包含两个或者多个异步计算是相互独立的情况,也包含第二个异步计算依赖第一个异步计算结果的情况。 ·

对反应式编程的支持,也就是当任务计算完成后能进行通知,并且可以以计算结果作为一个行为动作的参数进行下一步计算,而不是仅仅提供调用线程以阻塞的方式获取计算结果。 ·可以通过编程的方式手动设置(代码的方式)Future的结果;FutureTask不能实现让用户通过函数来设置其计算结果,而是在其任务内部来进行设置。 ·

可以等多个Future对应的计算结果都出来后做一些事情。 为了克服FutureTask的局限性,以及满足我们对异步编程的需要,JDK8中提供了CompletableFuture。

CompletableFuture概述

CompletableFuture是一个可以通过编程方式显式地设置计算结果和状态以便让任务结束的Future,并且其可以作为一个CompletionStage(计算阶段),当它的计算完成时可以触发一个函数或者行为;当多个线程企图调用同一个CompletableFuture的complete、cancel方式时只有一个线程会成功。 CompletableFuture除了含有可以直接操作任务状态和结果的方法外,还实现了CompletionStage接口的一些方法,这些方法遵循:
·当CompletableFuture任务完成后,同步使用任务执行线程来执行依赖任务结果的函数或者行为。
·所有异步的方法在没有显式指定Executor参数的情形下都是复用ForkJoinPool.commonPool()线程池来执行。
·所有CompletionStage方法的实现都是相互独立的,以便一个方法的行为不会因为重载了其他方法而受影响。
一个CompletableFuture任务可能有一些依赖其计算结果的行为方法,这些行为方法被收集到一个无锁基于CAS操作来链接起来的链表组成的栈中;当Completable-Future的计算任务完成后,会自动弹出栈中的行为方法并执行。
需要注意的是,由于是栈结构,在同一个CompletableFuture对象上行为注册的顺序与行为执行的顺序是相反的。 由于默认情况下支撑CompletableFuture异步运行的是 ForkJoinPool,所以这里我们有必要简单讲解下ForkJoinPool。
ForkJoinPool本身也是一种ExecutorService,与其他ExecutorService(比如ThreadPoolExecutor)相比,不同点是它使用了工作窃取算法来提高性能,其内部每个工作线程都关联自己的内存队列,正常情况下每个线程从自己队列里面获取任务并执行,当本身队列没有任务时,当前线程会去其他线程关联的队列里面获取任务来执行。这在很多任务会产生子任务或者有很多小的任务被提交到线程池来执行的情况下非常高效。 ForkJoinPool中有一个静态的线程池commonPool可用且适用大多数情况。commonPool会被任何未显式提交到指定线程池的ForkJoinTask执行使用。使用commonPool通常会减少资源使用(其线程数量会在不活跃时缓慢回收,并在任务数比较多的时候按需增加)。

CompletableFuture代码实例:通知等待

CompletableFuture是一种可以通过编程显式设置结果的future,下面我们通过一个例子来演示下:

//0自定义线程池private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();//获取当前可用处理器的java虚拟机数量private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS,AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5),new ThreadPoolExecutor.CallerRunsPolicy());public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {// 1.创建一个CompletableFuture对象CompletableFuture<String> future = new CompletableFuture<String>();// 2.开启线程计算任务结果,并设置POOL_EXECUTOR.execute(() -> {// 2.1休眠3s,模拟任务计算try {Thread.sleep(3000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}// 2.2设置计算结果到futureSystem.out.println("----" + Thread.currentThread().getName() + " set future result----");future.complete("hello,jiaduo");
//调用future的complete方法设置future的结果,设置完结果后,所有由于调用future的get()方法而被阻塞的线程会被激活,并返回设置的结果。});// 3.等待计算结果。调用future的get()方法企图获取future的结果,如果future的结果没有被设置,则调用线程会被阻塞。System.out.println("---main thread wait future result---");System.out.println(future.get());// System.out.println(future.get(1000,TimeUnit.MILLISECONDS));System.out.println("---main thread got future result---");}
}

如上所述,这里使用CompletableFuture实现了通知等待模型,主线程调用future的get()方法等待future返回结果,一开始由于future结果没有设置,所以主线程被阻塞挂起,等异步任务休眠3s,然后调用future的complete方法模拟主线程等待的条件完成,这时候主线程就会从get()方法返回。

CompletableFuture代码实例:异步计算与结果转换

runAsync

1)基于runAsync系列方法实现无返回值的异步计算:
当你想异步执行一个任务,并且不需要任务的执行结果时可以使用该方法,比如异步打日志,异步做消息通知等:

public static void runAsync() throws InterruptedException, ExecutionException {// 1.1创建异步任务,并返回futureCompletableFuture future = CompletableFuture.runAsync(new Runnable() {@Overridepublic void run() {// 1.1.1休眠2s模拟任务计算try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("over");}});// 1.2 同步等待异步任务执行结束 由于runAsync方法不会有返回值,所以当任务执行完毕后,设置future的结果为nullSystem.out.println(future.get());}

需要注意的是,在默认情况下,runAsync(Runnable runnable)方法是使用整个JVM内唯一的ForkJoinPool.commonPool()线程池来执行异步任务的,使用runAsync(Runnable runnable,Executor executor)方法允许我们使用自己制定的线程池来执行异步任务。

下面我们创建了一个自己的线程池bizPoolExecutor,在调用runAsync方法提交异步任务时,把其作为第二参数进行传递,则异步任务执行时会使用bizPoolExecutor中的线程执行,具体代码如下所示。

// 0.创建线程池
private static final ThreadPoolExecutor bizPoolExecutor = new ThreadPoolExecutor(8, 8, 1, TimeUnit.MINUTES,new LinkedBlockingQueue<>(10));//没有返回值的异步执行,异步任务由业务自己的线程池执行
public static void runAsyncWithBizExecutor() throws InterruptedException, ExecutionException {// 1.1创建异步任务,并返回futureCompletableFuture future = CompletableFuture.runAsync(new Runnable() {@Overridepublic void run() {// 1.1.1休眠2s模拟任务计算try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("over");}}, bizPoolExecutor);
// 1.2 同步等待异步任务执行结束System.out.println(future.get());
}  

supplyAsync

2)基于supplyAsync系列方法实现有返回值的异步计算:当你想异步执行一个任务,并且需要任务的执行结果时可以使用该方法,比如异步对原始数据进行加工,并需要获取到被加工后的结果等。

 // 2. 有返回值的异步执行
public static void supplyAsync() throws InterruptedException, ExecutionException {// 2.1创建异步任务,并返回futureCompletableFuture future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {// 2.1.1休眠2s模拟任务计算try {Thread.sleep(2000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}// 2.1.2 返回异步计算结果return "hello,jiaduo";//使用supplyAsync开启了一个异步任务,执行后马上返回一个future对象;异步任务内线程休眠2s,然后返回了一个字符串结果,这个结果会被设置到future内部
}});// 2.2 同步等待异步任务执行结束System.out.println(future.get());//使用future的get()方法获取结果,一开始future结果并没有被设置,所以调用线程会被阻塞;等异步任务把结果设置到future后,调用线程就会从get()处返回异步任务执行的结果。}

同样在默认情况下,supplyAsync(Supplier<U>supplier)方法是使用整个JVM内唯一的ForkJoinPool.commonPool()线程池来执行异步任务的,使用supply-Async(Supplier<U>supplier,Executor executor)方法允许我们使用自己制定的线程池来执行异步任务,代码

// 0.创建线程池
private static final ThreadPoolExecutor bizPoolExecutor = new ThreadPoolExecutor(8, 8, 1, TimeUnit.MINUTES,new LinkedBlockingQueue<>(10));// 2. 有返回值的异步执行
public static void supplyAsyncWithBizExecutor() throws InterruptedException, ExecutionException {// 2.1创建异步任务,并返回futureCompletableFuture future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {// 2.1.1休眠2s模拟任务计算try {Thread.sleep(2000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}// 2.1.2 返回异步计算结果return "hello,jiaduo";}}, bizPoolExecutor);// 2.2 同步等待异步任务执行结束System.out.println(future.get());

thenRun

3)基于thenRun实现任务激活
基于thenRun实现异步任务A,执行完毕后,激活异步任务B执行,需要注意的是,这种方式激活的异步任务B是拿不到任务A的执行结果的:

// I thenRun不能访问oneFuture的结果public static void thenRun() throws InterruptedException, ExecutionException {// 1.创建异步任务,并返回futureCompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {// 1.1休眠2s,模拟任务计算try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}// 1.2返回计算结果return "hello";}});// 2.在future上施加事件,当future计算完成后回调该事件,并返回新future  在oneFuture上调用thenRun方法添加异步执行事件,当oneFuture计算完成后回调该事件,并返回twoFuture,另外,在twoFuture上调用get()方法也会返回null,因为回调事件是没有返回值的CompletableFuture twoFuture = oneFuture.thenRun(new Runnable() {@Overridepublic void run() {// 2.1.1当oneFuture任务计算完成后做一件事情try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());System.out.println("---after oneFuture over doSomething---");}});// 3.同步等待twoFuture对应的任务完成,返回结果固定为nullSystem.out.println(twoFuture.get());}

默认情况下oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,也可以使用thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)来指定设置的回调事件使用自定义线程池线程来执行,也就是oneFuture对应的任务与在其上设置的回调执行将不会在同一个线程中执行。

thenAccept

4)基于thenAccept实现异步任务A,执行完毕后,激活异步任务B执行,需要注意的是,这种方式激活的异步任务B是可以拿到任务A的执行结果的:

 public static void thenAccept() throws InterruptedException, ExecutionException {// 1.创建异步任务,并返回futureCompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {// 1.1休眠2s,模拟任务计算try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}// 1.2返回计算结果return "hello";}});// 2.在future上施加事件,当future计算完成后回调该事件,并返回新futureCompletableFuture twoFuture = oneFuture.thenAccept(new Consumer<String>() {@Overridepublic void accept(String t) {//t是从oneFuture中得到的结果// 2.1.1对oneFuture返回的结果进行加工try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}System.out.println("---after oneFuture over doSomething---" + t);}});// 3.同步等待twoFuture对应的任务完成,返回结果固定为nullSystem.out.println(twoFuture.get());
}

thenApply

5)基于thenApply实现异步任务A,执行完毕后,激活异步任务B执行。需要注意的是,这种方式激活的异步任务B是可以拿到任务A的执行结果的,并且可以获取到异步任务B的执行结果。

 public class TestCompletableFutureCallBack {public static void main(String[] args) throws InterruptedException, ExecutionException {// 1.创建异步任务,并返回futureCompletableFuture<String> oneFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {// 1.1休眠2s,模拟任务计算try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}// 1.2返回计算结果System.out.println(Thread.currentThread().getName());return "hello";}});// 2.在future上施加事件,当future计算完成后回调该事件,并返回新futureCompletableFuture<String> twoFuture = oneFuture.thenApply(new Function<String, String>() {// 2.1在步骤1计算结果基础上进行计算,这里t为步骤1返回的hello@Overridepublic String apply(String t) {// 2.1.1对oneFuture返回的结果进行加工try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}// 2.1.2返回加工后结果System.out.println(Thread.currentThread().getName());return t + " jiduo";}});// 3.同步等待twoFuture对应的任务完成,并获取结果System.out.println(twoFuture.get());}
} 

whenComplete

6)基于whenComplete设置回调函数,当异步任务执行完毕后进行回调,不会阻塞调用线程

 public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {// 1.创建一个CompletableFuture对象CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {// 1.1模拟异步任务执行try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 1.2返回计算结果return "hello,jiaduo";}});// 2.添加回调函数future.whenComplete(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String t, Throwable u) {// 2.1如果没有异常,打印异步任务结果if (null == u) {System.out.println(t);} else {// 2.2打印异常信息System.out.println(u.getLocalizedMessage());}}});// 3.挂起当前线程,等待异步任务执行完毕Thread.currentThread().join();//挂起了main函数所在线程,是因为具体执行异步任务的是ForkJoin的commonPool线程池,其中线程都是Deamon线程,所以,当唯一的用户线程main线程退出后整个JVM进程就退出了,会导致异步任务得不到执行}

(关于用户线程与deamon线程的区别可以参考《Java并发编程之美》一书)。

如上所述,当我们使用CompletableFuture实现异步编程时,大多数时候是不需要显式创建线程池,并投递任务到线程池内的。我们只需要简单地调用CompletableFuture的runAsync或者supplyAsync等方法把异步任务作为参数即可,其内部会使用ForkJoinPool线程池来进行异步执行的支持,这大大简化了我们异步编程的负担,实现了声明式编程(告诉程序我要执行异步任务,但是具体怎么实现我不需要管),当然如果你想使用自己的线程池来执行任务,也是可以非常方便地进行设置的。

【异步编程学习笔记】JDK中的FutureTask和CompletableFuture详解(使用示例、源码)相关推荐

  1. FPGA学习之路—接口(3)—SPI详解及Verilog源码分析

    FPGA学习之路--SPI详解及Verilog源码分析 概述 SPI = Serial Peripheral Interface,是串行外围设备接口,是一种高速,全双工,同步的通信总线. 优点 支持全 ...

  2. C语言学习:百钱买百鸡问题详解(附源码)

    问题: 中国古代数学家张丘建在他的<算经>中提出了一个著名的"百钱买百鸡问题":鸡翁一,值钱五,鸡母一,值钱三,鸡雏三,值钱一,百钱买百鸡,问翁.母.雏各几何? 代码: ...

  3. 【PyTorch深度学习项目实战100例目录】项目详解 + 数据集 + 完整源码

    前言 大家好,我是阿光. 本专栏整理了<PyTorch深度学习项目实战100例>,内包含了各种不同的深度学习项目,包含项目原理以及源码,每一个项目实例都附带有完整的代码+数据集. 正在更新 ...

  4. Ext.Net学习笔记22:Ext.Net Tree 用法详解

    上面的图片是一个简单的树,使用Ext.Net来创建这样的树结构非常简单,代码如下: <ext:TreePanel runat="server"><Root> ...

  5. python列表和元组的应用_python学习笔记之列表(list)与元组(tuple)详解

    前言 最近重新再看python的基础知识,感觉自己还是对于这些知识很陌生,需要用的时候还是需要翻书查阅,还是先注重基础吧--我要重新把python的教程阅读一遍,把以前自己忽略的部分学习,加强练习和记 ...

  6. 生成对抗网络入门详解及TensorFlow源码实现--深度学习笔记

    生成对抗网络入门详解及TensorFlow源码实现–深度学习笔记 一.生成对抗网络(GANs) 生成对抗网络是一种生成模型(Generative Model),其背后最基本的思想就是从训练库里获取很多 ...

  7. 【Azure 架构师学习笔记】-Azure Data Factory (4)-触发器详解-事件触发器

    本文属于[Azure 架构师学习笔记]系列. 本文属于[Azure Data Factory]系列. 接上文[Azure 架构师学习笔记]-Azure Data Factory (3)-触发器详解-翻 ...

  8. CNN入门详解及TensorFlow源码实现--深度学习笔记

    CNN入门详解及TensorFlow源码实现–深度学习笔记 ##一.卷积神经网络 ###1.简介 卷积神经网络是一种前馈神经网络,它的人工神经元可以响应一部分覆盖范围内的周围单元,对于大型图像处理有出 ...

  9. 【matcovnet学习笔记】objective,top1error,top5error详解

    [matcovnet学习笔记]objective,top1error,top5error详解 排名前1和前5的错误率是衡量某些解决方案成功与否的重要单位 ,要理解这三个概念,关键是要看懂下面这个多类误 ...

最新文章

  1. AlarmManager深入浅出
  2. python使用redis队列_Python的Flask框架应用调用Redis队列数据的方法
  3. 视频技术详解:语音编解码技术演进和应用选型
  4. 【渝粤教育】 国家开放大学2020年春季 1107传感器与测试技术 参考试题
  5. Linux Unix shell 编程指南学习笔记(第五部分)
  6. ACM 竞赛高校联盟 练习赛 第六场 韩梅梅的抽象画(图论水题)
  7. 拓端tecdat|R语言ggsurvplot绘制生存曲线报错 : object of type ‘symbol‘ is not subsettable
  8. win7系统一键共享工具_开放教育作者共享适用于任何操作系统的有价值的工具
  9. css在线代码生成工具汇总
  10. 离线语音空调插座设计应用案例
  11. 软件工程期末笔记整理
  12. word 计算机内存不足,电脑office打不开显示内存不足怎么办
  13. WebProxy - 网站转发代理
  14. linux wep加密方式,iPad wifi 断网问题通过设置加密方式(Mixed WEP)解决
  15. 虫子满屏爬_三bug多线程示例程序浅析
  16. 导出为excel无法引用解决方法
  17. Vuex、axios以及跨域请求处理
  18. 【图像分类】华为云·垃圾分类亚军方案分享
  19. des加密解密 代码 java_java 实现DES 加密解密的示例
  20. Cox 比例风险模型中HR和置信区间

热门文章

  1. php mysql生成excel文件,PHP导出MySQL数据到Excel文件简单示例
  2. 卷积神经网络 训练的结果是什么_射击训练:卷积神经网络识别解剖结构标志位点...
  3. java求面积_Java之简单图形面积计算
  4. java resources 路径_Java工程读取resources中资源文件路径问题
  5. 西安理工大学计算机考研难吗,西安理工大学考研难吗?一般要什么水平才可以进入?...
  6. python给视频加水印_视频水印_Python SDK_服务端SDK_视频点播 - 阿里云
  7. printf linux 头文件,Linux C 格式化输出时要注意的问题
  8. 两个主机mtu不相同_案例详解:MTU不一致导致主机和RAC不断重启
  9. AChartEngine中的Renderer和DataSet介绍
  10. ubuntu20配置阿里源简单粗暴的方法