有一天,我重写了执行不佳的多线程代码,该代码在Future.get()某个时刻被阻塞:

public void serve() throws InterruptedException, ExecutionException, TimeoutException {final Future<Response> responseFuture = asyncCode();final Response response = responseFuture.get(1, SECONDS);send(response);
}private void send(Response response) {//...
}

这实际上是一个用Java编写的Akka应用程序,具有1000个线程的线程池(原文如此!)–所有这些都在此get()调用中被阻塞。 否则系统无法跟上并发请求的数量。 重构之后,我们摆脱了所有这些线程,只引入了一个,大大减少了内存占用。 让我们简化一下并显示Java 8中的示例。第一步是引入CompletableFuture而不是普通的Future (请参阅提示9 )。 很简单,如果:

  • 您可以控制如何将任务提交给ExecutorService :只需使用CompletableFuture.supplyAsync(..., executorService)而不是executorService.submit(...)
  • 您处理基于回调的API:使用Promise

否则(如果您已经阻塞了API或Future<T> ),将有一些线程被阻塞。 这就是为什么现在诞生了这么多异步API的原因。 假设我们以某种方式重写了代码以接收CompletableFuture

public void serve() throws InterruptedException, ExecutionException, TimeoutException {final CompletableFuture<Response> responseFuture = asyncCode();final Response response = responseFuture.get(1, SECONDS);send(response);
}

显然,这并不能解决任何问题,我们必须利用新的反应式编程风格:

public void serve() {final CompletableFuture<Response> responseFuture = asyncCode();responseFuture.thenAccept(this::send);
}

这在功能上是等效的,但是现在serve()应该立即运行(没有阻塞或等待)。 只要记住, this::send将在完成responseFuture的同一线程中执行。 如果您不想在某个地方重载某些任意线程池或send()昂贵,请考虑为此使用单独的线程池: thenAcceptAsync(this::send, sendPool) 。 很好,但是我们失去了两个重要的属性:错误传播和超时。 由于我们更改了API,因此错误传播很难。 当serve()方法退出时,异步操作可能尚未完成。 如果您关心异常,请考虑返回responseFuture或其他替代机制。 至少,请记录异常,因为否则它将被吞噬:

final CompletableFuture<Response> responseFuture = asyncCode();
responseFuture.exceptionally(throwable -> {log.error("Unrecoverable error", throwable);return null;
});

请注意上面的代码: exceptionally()尝试从故障中恢复 ,并返回替代结果。 它在这里有效,但是如果您将thenAccept() exceptionally()thenAccept() ,即使在失败的情况下, send()也会被调用,但是参数为null (或者我们从exceptionally()返回的值exceptionally()

responseFuture.exceptionally(throwable -> {log.error("Unrecoverable error", throwable);return null;}).thenAccept(this::send);  //probably not what you think

丢失1秒超时的问题非常微妙。 我们的原始代码等待(阻塞)最多1秒钟,直到Future完成。 否则抛出TimeoutException 。 我们失去了此功能,甚至超时的更糟糕的单元测试也不方便并且经常被跳过。 为了在不牺牲事件驱动精神的前提下实现超时,我们需要一个额外的构建块:在给定时间之后始终失败的未来:

public static <T> CompletableFuture<T> failAfter(Duration duration) {final CompletableFuture<T> promise = new CompletableFuture<>();scheduler.schedule(() -> {final TimeoutException ex = new TimeoutException("Timeout after " + duration);return promise.completeExceptionally(ex);}, duration.toMillis(), MILLISECONDS);return promise;
}private static final ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(1,new ThreadFactoryBuilder().setDaemon(true).setNameFormat("failAfter-%d").build());

这很简单:我们创建一个承诺 (没有基础任务或线程池的未来),并在给定java.time.Duration之后使用TimeoutException完成它。 如果您get()某个地方get()这样的未来,则阻塞了至少那么多时间后,将抛出TimeoutException 。 实际上,它将是ExecutionException包装TimeoutException ,没有办法解决。 请注意,我仅使用一个线程使用固定scheduler线程池。 这不仅是出于教育目的:“在这种情况下,“ 1个线程对于任何人都应该足够 ”” [1]failAfter()本身是没有用的,但是将其与我们的responseFuture结合起来,我们就有了解决方案!

final CompletableFuture<Response> responseFuture = asyncCode();
final CompletableFuture<Response> oneSecondTimeout = failAfter(Duration.ofSeconds(1));
responseFuture.acceptEither(oneSecondTimeout, this::send).exceptionally(throwable -> {log.error("Problem", throwable);return null;});

这里发生了很多事情。 在通过我们的后台任务接收到responseFuture ,我们还创建了一个“合成的” oneSecondTimeout将来,它将永远不会成功完成,但总是在1秒后失败。 现在,我们通过调用acceptEither合并两者。 该运算符将针对第一个完成的将来( responseFutureoneSecondTimeout执行代码块,而只是忽略较慢的代码的结果。 如果asyncCode()内1完成第二this::send将被调用,并从异常oneSecondTimeout会被忽略。 然而! 如果asyncCode()确实很慢,则oneSecondTimeout启动。 但是由于它失败并带有异常,因此将调用exceptionally错误处理程序,而不是this::send 。 您可以认为send()exceptionally都将被调用,而不是两者都被调用。 当然,如果我们有两个正常完成的“普通”期货,则将调用前一个的响应来调用send() ,并丢弃后者。

这不是最干净的解决方案。 一个干净的人会包装原始的未来,并确保它在给定的时间内完成。 此类操作符可在com.twitter.util.Future (Scala;称为com.twitter.util.Futurewithin() )中使用,但是在scala.concurrent.Future丢失(可能是受前者启发)。 让我们留下Scala并为CompletableFuture实现类似的运算符。 它以一个Future作为输入,并返回一个在基础底层完成时完成的Future。 但是,如果完成基础未来花费的时间太长,则会引发异常:

public static <T> CompletableFuture<T> within(CompletableFuture<T> future, Duration duration) {final CompletableFuture<T> timeout = failAfter(duration);return future.applyToEither(timeout, Function.identity());
}

这导致了最终,清洁和灵活的解决方案:

final CompletableFuture<Response> responseFuture = within(asyncCode(), Duration.ofSeconds(1));
responseFuture.thenAccept(this::send).exceptionally(throwable -> {log.error("Unrecoverable error", throwable);return null;});

希望您喜欢这篇文章,因为您可以看到Java中的反应式编程已不再是未来的事情(无双关语)。

翻译自: https://www.javacodegeeks.com/2014/12/asynchronous-timeouts-with-completablefuture.html

具有CompletableFuture的异步超时相关推荐

  1. 异步http 超时_具有CompletableFuture的异步超时

    异步http 超时 有一天,我重写了执行不佳的多线程代码,该代码在Future.get()某个时刻被阻塞: public void serve() throws InterruptedExceptio ...

  2. .Net Cancellable Task - APM异步超时机制扩展

    概述 .NET基于委托的APM(Asynchronous Programming Model)模式通过BeginInvoke, EndInvoke, AsyncCallback,IAsyncResul ...

  3. Java8 - 使用CompletableFuture 构建异步应用

    文章目录 概述 同步API VS 异步API 同步的困扰 实现异步API 将同步方法改为异步方法 处理异常错误 概述 为了展示 CompletableFuture 的强大特性, 创建一个名为 best ...

  4. Java 8 (10) CompletableFuture:组合式异步编程

    随着多核处理器的出现,提升应用程序的处理速度最有效的方式就是可以编写出发挥多核能力的软件,我们已经可以通过切分大型的任务,让每个子任务并行运行,使用线程的方式,分支/合并框架(java 7) 和并行流 ...

  5. CompletableFuture 实现异步计算

    在Markdown的语法中,<u>下划线</u>中的文字会被解析器加上下划线,为了不影响阅读,本文中JDK文档涉及到<U>都会替换为<N>,请各位注意. ...

  6. CompletableFuture并行异步处理类使用示例

    等待所有任务执行完, 串行执行和异步执行的高级写法: package com.zhangxueliang.demo.springbootdemo.JUC.c_026_01_ThreadPool;imp ...

  7. CompletableFuture 创建异步对象

    CompletableFuture 提供了四个静态方法来创建一个异步操作. static CompletableFuture<Void> runAsync(Runnable runnabl ...

  8. jooq 执行sql_使用jOOQ和Java 8的CompletableFuture进行异步SQL执行

    jooq 执行sql 响应式编程是一个新的流行词,它实际上仅表示异步编程或消息传递. 事实是,函数语法极大地帮助构建了异步执行链,今天,我们将看到如何使用jOOQ和新的CompletableFutur ...

  9. 具有jOOQ和Java 8的CompletableFuture的异步SQL执行

    响应式编程是一个新的流行词,它实际上仅表示异步编程或消息传递. 事实是,函数语法极大地帮助构建了异步执行链,今天,我们将看到如何使用jOOQ和新的CompletableFuture API在Java ...

最新文章

  1. 多级页表如何节省内存
  2. (翻译)Quartz官方教程——第七课:TriggerListeners 和 JobListeners
  3. 日常生活小技巧 --惠普战66三代 重装系统
  4. Linux Oracle服务启动停止脚本与开机自启动[转]
  5. Win7无线网络共享设置方法
  6. AVS264_FAQ集锦
  7. ios开发 多人语音聊天_手游语音市场的现状、机遇与挑战
  8. 问题 1045: [编程入门]自定义函数之整数处理
  9. 如何攻克 C++ 中复杂的类型转换?
  10. 生产环境下ftp的迁移并构建corosync+pacemaker的高可用
  11. 计算机网络超详细笔记(四):介质访问控制子层
  12. 手机麦克风结构原理图_麦克风的分类和工作原理
  13. a16z 2022 年加密行业研究报告(简)
  14. EasyDSS点播视频添加水印的位置与定义的位置不匹配怎么办?
  15. org.springframework.hateoas.config.HateoasConfiguration required a single bean, but 15 were found:
  16. deeplearning.ai课程作业:Recurrent Neural Networks- Course 5 Week3
  17. 省市县三级联动JS代码
  18. 国内九大垂直类B2C电子商务
  19. html5 jquery 鼠标拖动例子,jquery实现鼠标拖动实现DIV排序示例代码
  20. IDEA搜索上一个下一个搜索关键字的快捷键

热门文章

  1. centos8安装并启动tomcat9
  2. ISO语言代码和国家代码+Locale常量+ISO货币符号
  3. 异常java.lang.Thread.dumpStack(Unknown Source)
  4. JDK8的日期时间类1
  5. 使用Spring Boot和MongoDB构建一个React式应用程序
  6. 设计模式 原型模式_创新设计模式:原型模式
  7. jpa root.join_JPA 2.1和Java EE 7中的JPQL增强功能(第1部分– JOIN ON)
  8. 改变数据类型的装饰器_用装饰器改变收藏
  9. meetup_使用RxNetty访问Meetup的流API
  10. maven的常见问题_Maven常见问题和陷阱