一、使用的场景

Java多线程的背景下,运行线程的时候可以有两种形式,一种是有返回值的一种是没有返回值的。当我们获取运行的线程的返回值需要结合Future来进行使用。

二、 传统Future

Future主要时在异步执行的过程中可以有返回值,与之相对应的有一个FutureTask,这两个接口的区别是:

  • Future在源码中有一段这样的描述“represents the result of an asynchronous computation”也就是说他代表的是一个异步计算返回的结果
  • 其中Callable代表的是返回一个结果,在源码中是这样描述的“A task that returns a result and may throw an exception
  • 那么一个实现Callable的接口就可以执行一个任务,利用多线程这个任务可以是异步的,他返回的结果可以用Future进行接收
  • Runnable,所有实现了他类的实例都可以被线程执行,实现它主要是实现一个参数的方法
  • FutureTask是一个类,他实现了Runnable、Future,所以说实现他的对象即可以被线程执行(实现了Runnable)也可以有返回值(拥有Future的API接口)

1、FutureTask示列

public class FutureDemo {/***@description: FutureTask 利用线程来进行执行*/@Testpublic void test01(){FutureTask<String> futureTask = new FutureTask<String>(new MyThread());new Thread(futureTask, "t1").start();try {System.out.println(futureTask.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}class MyThread implements Callable<String>{@Overridepublic String call() throws Exception {System.out.println("come in call()");return "hello callable";}
}

2、Future的单独使用

@Test
public void test03(){//其中引出的一个问题就是future与futureTask之间的区别于联系//FutureTask不仅继承了Future还继承了Runable,所以说他具有了双重属性ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);Future<String> stringFuture1 = fixedThreadPool.submit(new MyThread());Future<String> stringFuture2 = fixedThreadPool.submit(() -> {System.out.println("通过语法糖实现的FutureTask");return "FutureTask Success";});
}

作为执行过程中的一个资源单位的Thread,他们重复利用的是线程池,线程池的接口是Executor,里面他的方法是execute(Runnable command),入参是Runable形的,Runable中需要实现的接口是run(),这个方法没有返回值。线程中可以有返回值的是Callable接口,他的需要实现的方法是call()。但是他不能直接被线程执行,因为线程Thread构造函数的入参不包括Callable类型的,如果想要使用它可以配合FutureTask进行使用,他实现了Runnable、Future,所以即可以被线程执行也可以有返回值,那么线程执行的时候执行那个计算呢?那就是通过构造函数的参数传递进去的Callable类型的,他完美的将Thread与Future结合了起来。

//------------
//FutureTask中Runnable类型的入参计算执行了
//Result
public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask(()->{try {TimeUnit.MILLISECONDS.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("FutureTask中Runnable类型的入参计算执行了");},"Result");new Thread(futureTask).start();System.out.println("------------");System.out.println(futureTask.get());
}

Future使用过程中的优缺点也是十分明显的,有点就是他可以得到返回值,缺点则是在得到返回值的时候会发生阻塞,下面CompletableFuture的出现也就是为了解决Future的缺点,他可以看做是结合

三、CompletableFuture的使用

1、网络数据抓取的模仿示例

如果想要做一比价的程序,那么我们首先需要通过网络请求得到我们目标网站中的价格信息,在获得价格信息的过程中存在网络调用,这是耗时的操作。示例中做的就是结合JDK8中的流操作模拟同步操作与异步操作两种的效率。需要关注的主要是流式编程在网络请求中的应用。

public class CompletableFutureMallDemo {static List<NetMall> list = Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"));public static List<String> getPrice(List<NetMall> list,String productName){return list.stream().map(netMall->{return String.format(productName+"in %s price is %.2f",netMall.getNetMallName(),netMall.calcPrice(productName));}).collect(Collectors.toList());}public static List<String> getPriceFuture(List<NetMall> list,String productName){return list.stream().map(netMall -> {//将请求的标准信息分装成一部请求的方式return CompletableFuture.supplyAsync(()->{//由于已经被封装成一部请求的方式所以这里对calcPrice的每次调用实际上都是//放在线程池中完成的,可以并行执行System.out.println("生成CompletableFuture");return String.format(productName+"in %s price is %.2f",netMall.getNetMallName(),netMall.calcPrice(productName));});})//如果没有这一步,数据实际上也是会变成并行操作,那么是为什么呢?//1、如果要是不要这一步,数据就是上边的数据传递过来一个最后一行的map执行一个//由于最后一个map中的数据执行是阻塞的,那么就造成了整个流的阻塞//2、添加上了下面的一步那就是先将上面转换CompletableFuture的执行完,之后再遍历调用//最后一个map的?好像也不是很能说的通,但是事实却是如此,从运行的结果上来看也是这样的.collect(Collectors.toList()).stream().map((s)->{System.out.println("从生成CompletableFuture中获取数据");return s.join();}).collect(Collectors.toList());}public static void main(String[] args) {System.out.println("传统串行形式");long startTime = System.currentTimeMillis();getPrice(list,"mysql").forEach(item -> {System.out.println(item);});long endTime = System.currentTimeMillis();System.out.println("---costTime: "+(endTime - startTime)+"毫秒");System.out.println("并行运行");long startTime2 = System.currentTimeMillis();getPriceFuture(list,"mysql").forEach(item -> {System.out.println(item);});long endTime2 = System.currentTimeMillis();System.out.println("---costTime: "+(endTime2 - startTime2)+"毫秒");}
}@Data
@AllArgsConstructor
class NetMall{private String netMallName;public double calcPrice(String productName){try{TimeUnit.SECONDS.sleep(1);}catch (InterruptedException e){e.printStackTrace();}return ThreadLocalRandom.current().nextDouble()*2+productName.charAt(0);}
}

2、获取返回值

@Test//测试返回值
public void test01() throws ExecutionException,InterruptedException, TimeoutException {//获取结果CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "abc";});try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}//get方法的执行也是阻塞的,变现为——如果建第一个get注销因为return需要3s才能返回所以//第二个的get就会超时,将会报超时错误TimeoutExceptionSystem.out.println(completableFuture.get());System.out.println(completableFuture.get(1, TimeUnit.SECONDS));//相对于get来说在编译的时候不报错,但是在运行的时候依旧是会报错的System.out.println(completableFuture.join());//如果计算完成直接返回正确值,否则的话返回默认值System.out.println(completableFuture.getNow("XXX"));//打断计算的过程给一个默认值到get、joinSystem.out.println(completableFuture.complete("completeValue")+"\t"+completableFuture.join());
}

3、thenApply——发生异常中断后续执行

@Test//对计算的结果进行处理,如果有异常在当前的阶段就进行终止
public void test02(){ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(()->{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("111");return 1;}).thenApply(f->{int i = 10/0;System.out.println("222");return f+2;}).thenApply(f->{System.out.println("333");return f+3;}).whenComplete((v,e)->{System.out.println("whenComplete");if (e==null){System.out.println("计算结果为:"+v);}}).exceptionally(e->{e.printStackTrace();System.out.println(e.getMessage());return null;});System.out.println(Thread.currentThread().getName()+"---主线程先去忙其他的任务");//主线程不能够立马关闭否则的话CompletableFuture的线程也会关闭,或者说谁自己建立一个线程池//但是如果是自己建的线程池最终通过shutdown进行关闭在@Test的方法中是不行的try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
}

4、handle——发生异常继续执行

@Test//对计算的结果进行处理,handle处理中是可以继续朝下执行的
public void test03(){ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(()->{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("111");return 1;}).handle((f,e)->{int i = 10/0;System.out.println("222");return f+2;}).handle((f,e)->{System.out.println("333");return f+3;}).whenComplete((v,e)->{if (e==null){System.out.println("计算结果为:"+v);}}).exceptionally(e->{e.printStackTrace();System.out.println(e.getMessage());return null;});try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
}

上面两种说的中断或是不中断都是对处理过程而言的,对于whenComplete、exceptionally代码块中的都是会执行的不过,不过那一阶段发生错误,whenComplete中值的参数都会是null

5、自定义线程池无序手动阻塞主线程

public static void threadPoolTest(){ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture.supplyAsync(()->{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("111");return 1;},threadPool).thenApply(f->{System.out.println("222");return f+2;}).thenApply(f->{System.out.println("333");return f+3;}).whenComplete((v,e)->{if (e==null){System.out.println("计算结果为:"+v);}}).exceptionally(e->{e.printStackTrace();System.out.println(e.getMessage());return null;});threadPool.shutdown();
}

6、thenAccept中只消费不返回

join是从他链的上一步获取返回值,如果没有返回值则返回Null

@Test//thenAccept中只消费不返回
public void test04(){System.out.println(CompletableFuture.supplyAsync(() -> {return 1;}).thenApply(f -> {return f + 2;}).thenAccept(f -> {System.out.println(f);}).thenRun(() -> {System.out.println("执行在上链操作的后面但是不接受结果也不进行返回值");}).join());//join是从他链的上一步获取返回值,如果没有返回值则返回Null
}

7、线程池使用的优先级顺序

如果要是使用的是默认的线程,那么同步调用的话就会一直使用这个任务;异步那就肯定也是如此,如果使用的是异步接口那么在执行调用链上的第一个任务的时候用的是自定义的,后面的用的是默认的。基于线程的优化策略,有可能还会直接使用主线程。

@Test
//执行的时候如果没有自定义线程池,那么都默认使用ForkJoinPool
//执行的时候定义了线程池,调用thenRun方法执行第二个任务的时候还会使用与第一个任务
//相同的线程池;如果是调用thenRunAsync执行第二个任务的时候则第一个任务使用的是你
//自己传入的线程池,第二个任务使用的是ForkJoin的线程池
//有可能处理的太快,基于系统的优化,直接使用main线程进行处理
public void test05() {ExecutorService executor = Executors.newFixedThreadPool(5);CompletableFuture.supplyAsync(()->{System.out.println("1号任务"+"\t"+Thread.currentThread().getName());return "abcd";}).thenRun(()->{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3号任务"+"\t"+Thread.currentThread().getName());}).thenRun(()->{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("4号任务"+"\t"+Thread.currentThread().getName());});
}

8、两个中选一个进行执行

@Test//两个中选一个执行速度快的进行执行
public void test06(){CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {System.out.println("Play-A come in!");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "Play-A";});CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {System.out.println("Play-B come in!");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "Play-B";});System.out.println(playA.applyToEither(playB, f -> {return f + " is winer !!!";}).join());
}

9、执行结果合并

@Test//对结果进行合并
public void test07(){CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t 启动");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return 10;});CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "\t 启动");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 20;});long start = System.currentTimeMillis();System.out.println(completableFuture1.thenCombine(completableFuture2, (v1, v2) -> {System.out.println("连个异步的结果进行合并");return v1 + v2;}).join());long end = System.currentTimeMillis();System.out.println("共计耗时:"+(end - start));
}

Future与CompletableFuture相关推荐

  1. 多线程创建方式 线程池、Future和CompletableFuture

    大家好,我是烤鸭: 今天说一下 多线程的几种创建方式及使用. 1. Thread 和 Runnable 继承 Thread 类 和实现 Runnable 接口.     这种就不举例子了. 2.线程池 ...

  2. Flink的异步I/O及Future和CompletableFuture

    1 概述   Flink在做流数据计算时,经常要外部系统进行交互,如Redis.Hive.HBase等等存储系统.系统间通信延迟是否会拖慢整个Flink作业,影响整体吞吐量和实时性.   如需要查询外 ...

  3. Future和CompletableFuture的用法和区别

    Future的用法 多线程场景时,一般是实现runnable接口,覆写run方法,返回值是void类型,因此这种情况下不需要线程的返回结果.  如果需要线程的返回结果,就需要用callable接口来代 ...

  4. 带你了解了解Future和CompletableFuture

    前言 " 文本已收录至我的GitHub仓库,欢迎Star:https://github.com/bin392328206 种一棵树最好的时间是十年前,其次是现在 " 絮叨 并发 多 ...

  5. future java fork_将Java Future转换为CompletableFuture

    小编典典 有一种方法,但是您不喜欢它.以下方法将a Future转换为a CompletableFuture: public static CompletableFuture makeCompleta ...

  6. Future和CompletableFuture的区别和对比,以及Future主要的四个缺点——不能回调会阻塞、批量任务处理彼此依赖会阻塞、不能多个任务级联执行、得不到最先完成的任务

    注意:CompletableFuture内线程池的线程是守护线程,所以主线程执行结束会自动结束 1. Future在执行结束后没法回调,调用get方法会被阻塞:CompletableFuture调用g ...

  7. 【Java并发编程实战】(十七):Future和CompletableFuture的原理及实战——异步编程没有那么难

    文章目录 引言 生活中的例子 场景1 场景2 Java中的Future 如何获取Future Future的主要方法及使用 Future的核心源码 Future模式的高阶版本-- Completabl ...

  8. Future和CompletableFuture

    场景一是同步,场景二是异步,先拿到订单这个返回结果,订单其实就是一个future,即使它现在不能吃,但将来可以得到我们想要的结果 futureData构造速度快,是一个虚拟的数据,让调用者可以不用等待 ...

  9. 并行编程-disruptor与Future(CompletableFuture 和 guava)场景比较

    Disruptor 是 LMAX 公司开发的高性能队列,用于解决内存队列的延迟问题. LMAX 基于 Disruptor 打造的系统单线程能支撑每秒 600 万订单,许多著名的开源项目也使用 Disr ...

最新文章

  1. 《Head First 设计模式》ch.3 装饰(Decorator)模式
  2. Catch Me If You ... Can't Do Otherwise--转载
  3. 移动互联环境下的流程管理
  4. Oracle的row_number函数
  5. 如何利用云服务器挖矿_企业如何选择云服务器
  6. 【数据结构与算法】散列表
  7. Ubuntu安装常用软件
  8. ShellCode初体验
  9. lxde 的安装和卸载以及注意事项,lubuntu
  10. C#高级编程 第十五章 反射
  11. 01_6_SERVLET如何从上一个页面取得参数
  12. 我去!三面字节跳动,竟次次败,带薪摸鱼偷刷阿里老哥的面试宝典,成功上岸!
  13. 分享一个stm8s003单片机的ADC转换,附加一个冒泡算法(用于减少误差)
  14. jQuery(二十二)
  15. Java FlowLayout(流式布局)布局管理器
  16. 红警ol总是服务器满 可服务区显示流程,红警OL:11月27日部分服务器数据互通合服公告...
  17. 程序员推荐!JetBrains IDEs使用技巧与必备插件
  18. 【打印机】局域网连接打印机
  19. unity网络实战开发(丛林战争)-前期知识准备(008-粘包和分包及解决方案)
  20. 操作系统---颠簸(抖动)

热门文章

  1. 爬取一条微博的所有转发链接
  2. artcam2008 stp 教程_ArtCAM 中文版使用教程
  3. openKylin社区SIG组2022年第四季度工作会议顺利召开!
  4. 【简单总结】句子相似度计算的几种方法
  5. python 下载bing壁纸
  6. mac 搭建kafka系列教程
  7. SQL SERVER访问Access数据库,出现错误:无法初始化链接服务器 (null) 的 OLE DB 访问接口 Microsoft.Jet.OLEDB.4.0 的数据源对象
  8. AutoIt自动化实例
  9. 一颗躁动的心---下决心从SLAM开始,不钻研嵌入式底层了
  10. 幼儿在托育集体环境中,到底学到了什么?