scala 异步调用_非阻塞异步Java 8和Scala的Try / Success / Failure
scala 异步调用
受Heinz Kabutz最近的时事通讯以及我在最近的书中研究的Scala的期货的启发,我着手使用Java 8编写了一个示例,该示例如何将工作提交给执行服务并异步地响应其结果,并使用了回调。无需阻止任何线程等待执行服务的结果。
理论认为,调用拦截方法,如get
上java.util.concurrent.Future
是坏的,因为系统会需要超过线程的最佳数量,如果它是不断做工作,并在浪费时间与结果上下文切换 。
在Scala世界中,像Akka这样的框架都使用编程模型,这意味着这些框架永远不会阻塞-线程阻塞的唯一时间是用户对阻塞的对象进行编程时,他们不愿意这样做。 通过永不阻塞,该框架可以避免每个内核使用大约一个线程,这比说说标准JBoss Java EE Application Server(在启动后最多拥有400个线程)要少得多。 很大程度上归功于Akka框架的工作,Scala 2.10添加了Futures和Promises ,但是Java中还不存在这些东西。
以下代码显示了我的预期目标。 它分为三个部分。 首先,使用在类ch.maxant.async.Future
找到的static future
方法将新任务添加到执行服务中。 它返回一个Future
,但不是从java.util.concurrent
包中返回一个Future
,而是从ch.maxant.async
包中返回其子类。 其次, Future
具有一种名为map
的方法,该方法遵循Scala或新的Java 8 Stream
类的功能样式。 map
方法使您可以注册回调,或更准确地说,可以将第一个future包含的值映射(转换)为新值。 在第一个Future
完成后,映射将在将来的其他时间执行,因此会产生新的Future
。 第三,我们在Future
类中使用另一种方法注册一个回调,一旦我们创建的所有期货都完成,该回调将运行。 任何时候都不会使用Future
API的任何阻止方法!
final Random random = new Random();
int numTasks = 10;
List<Future<Integer>> futures = new ArrayList<>();for(int i = 0; i < numTasks; i++){final int j = i;log("adding future " + i);// PART 1//start some work async / in the futureFuture<String> f = future(new Task<String>( () -> {sleep(random.nextInt(1000));if(j < 5){log("working success");return "20";}else{log("working failure");throw new Exception();}}));// PART 2//register a callback, to be called when the work is donelog("adding mapping callback to future");final Future<Integer> f2 = f.map( (Try<String> stringNumber) -> {return stringNumber.map( (String s) -> {log("mapping '" + s + "' to int");return Integer.parseInt(s);}).recover( (Exception e) -> {log("recovering");return -10;}).get(); //wont throw an exception, because we provided a recovery!});futures.add(f2);
}// PART 3
log("registering callback for final result");
Future.registerCallback(futures, (List<Try<Integer>> results) -> {Integer finalResult = results.stream().map( (Try<Integer> t) -> {log("mapping " + t);try {return t.get();} catch (Exception e) {return 0;}}).reduce(0, (Integer i1, Integer i2) -> {log("reducing " + i1 + " and " + i2);return i1 + i2;});log("final result is " + finalResult);Future.shutdown();if(finalResult != 50){throw new RuntimeException("FAILED");}else{log("SUCESS");}
});System.out.println("Completed submitting all tasks on thread " + Thread.currentThread().getId());//this main thread will now die, but the Future executor is still up and running. the callback will shut it down and with it, the jvm.
第11行调用了future
方法来注册一个新Task
,该Task
是使用Work
实例构造的,在这里是使用Java 8 lambda构造的。 工作会睡一会儿,然后要么返回数字20(作为字符串),要么抛出异常,以演示如何处理错误。
使用第11行从执行服务返回的Future
,第25行将其值从字符串映射为整数,从而生成Future<Integer>
而不是Future<String>
。 该结果将添加到第35行的Future
列表中,第3部分在第40行中使用该列表registerCallback
方法将确保在最后一个future完成后调用给定的回调。
第25-33行的映射使用传递给Try
对象的lambda完成。 Try
有点像Java 8 Optional
,它是Success
和Failure
类的抽象(超类),我是根据对Scala的了解而实现的。 与必须显式检查错误相比,它可使程序员更轻松地处理故障。 我对Try
接口的实现如下:
public interface Try<T> {/** returns the value, or throws an exception if its a failure. */T get() throws Exception;/** converts the value using the given function, resulting in a new Try */<S> Try<S> map(Function1<T, S> func);/** can be used to handle recovery by converting the exception into a {@link Try} */Try<T> recover(Recovery<T> r);}
发生的情况是Success
和Failure
的实现可以优雅地处理错误。 例如,如果第一个清单的第11行上的Future
完成但有例外,则将第一个清单的第25行上的lambda传递给Failure
对象,并且在Failure
上调用map
方法绝对没有任何作用。 没有例外,没有任何问题。 为了补偿,您可以调用recover
方法,例如在第一个清单的第29行,该方法允许您处理异常并返回程序可以继续使用的值,例如默认值。
另一方面, Success
类以不同的方式实现Try
接口的map
和recover
方法,这样,调用map
会导致给定的函数被调用,但是调用recover
绝对不会执行任何操作。 map
和recover
方法无需显式编码try / catch块,而是提供了一种更好的语法,该语法在读取或查看代码时更容易验证(与编写相比,这种情况在代码中更常见)。
由于map
和recover
方法将函数的结果包装在Try
s中,因此您可以将调用链接在一起,例如第Try
和32行。Scala的Try
API具有比我在这里实现的三种方法更多的方法。 请注意,我选择在Try
API java.util.function.Function
不使用java.util.function.Function
,因为它的apply
方法不会throw Exception
,这意味着第一个清单中显示的代码不像现在那样好。 相反,我写了
Function1
接口。
难题的第3部分是如何使程序在所有Future
完成之后做一些有用的事情,而又不会像对Future#get()
方法那样讨厌调用。 解决方案是注册一个回调,如第40行所示。该回调与此处显示的所有其他回调一样,都已提交给执行服务。 这意味着我们无法保证哪个线程将运行它,这会带来副作用,即线程本地存储(TLS)不再起作用-某些框架((的较旧版本?)Hibernate依赖TLS,而它们只会胜任)。在这里工作。 Scala有一个很好的方法可以使用implicit
关键字来解决该问题,而Java还没有(但是…?),因此需要使用其他机制。 我在提到它,只是为了让您知道它。
因此,当最后一个Future
完成时,将调用40-60行,并传递包含Integer
而不是Future
的Try
的List
。 registerCallback
方法将期货转换为适当的Success
或Failure
。 但是,我们如何将它们转换成有用的东西呢? 幸运的是,Java 8现在有了一个简单的map / reduce,就支持了Stream
类,该类通过调用stream()
方法从第42行的Try
集合中Try
了。 首先,我将Try
映射(转换)为它们的值,然后在第49行上将流减少为单个值。我本可以使用不使用自己的求和值的lambda实现
Integer::sum
,例如someStream.reduce(0, Integer::sum)
。
我上次运行该程序时,它输出以下内容:
Thread-1 says: adding future 0
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 1
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 2
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 3
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 4
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 5
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 6
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 7
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 8
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 9
Thread-1 says: adding mapping callback to future
Thread-1 says: registering callback for final result
Thread-10 says: working success
Completed submitting all tasks on thread 1
Thread-14 says: working success
Thread-10 says: working failure
Thread-14 says: working failure
Thread-12 says: working success
Thread-10 says: working failure
Thread-10 says: mapping '20' to int
Thread-10 says: mapping '20' to int
Thread-10 says: recovering
Thread-10 says: recovering
Thread-10 says: mapping '20' to int
Thread-10 says: recovering
Thread-11 says: working success
Thread-11 says: mapping '20' to int
Thread-13 says: working success
Thread-10 says: mapping '20' to int
Thread-12 says: working failure
Thread-12 says: recovering
Thread-14 says: working failure
Thread-14 says: recovering
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: final result is 50
Thread-14 says: SUCESS
如您所见,主线程添加了所有任务并注册了所有映射功能(第1-20行)。 然后,它注册回调(输出的第21行,与清单的第39行相对应),最后从清单的第63行输出文本,此后它死了,因为它无事可做。 然后,输出的第22行和第24-42行显示了池中的各个线程(包含5个线程),这些线程处理工作以及从String到Integer的映射或从异常中恢复。 这是第一个清单的第1部分和第2部分中的代码。 您可以看到它是完全异步的,在所有初始工作完成之前会发生一些映射/恢复(将第38行或第40行分别映射和恢复到输出的第41行,此行随后发生并且是最后一行)最初的工作)。 第43-52行是map / reduce的输出,它是主列表的第3部分。 请注意,没有记录reduce,因为我运行的代码(位于Github上)使用上面提到的Integer::sum
快捷方式,而不是上面显示的第一个清单的第50-51行。
尽管使用Java 6(甚至5?)可以实现所有这些功能,例如通过获取提交到池中的任务来自己提交回调,但是一旦完成,执行该操作所需的代码量就会更大,并且该代码本身将比此处显示的代码更丑陋。 可以使用回调进行映射的Java 8 lambda, Future
以及具有简洁错误处理功能的Try
API都可以使此处所示的解决方案更具可维护性。
上面显示的代码以及ch.maxant.async
包中类的代码在Apache License Version 2.0下可用,并且可以从我的Github帐户下载。
翻译自: https://www.javacodegeeks.com/2013/10/non-blocking-asynchronous-java-8-and-scalas-trysuccessfailure.html
scala 异步调用
scala 异步调用_非阻塞异步Java 8和Scala的Try / Success / Failure相关推荐
- Java中如何使用非阻塞异步编程——CompletableFuture
分享一波:程序员赚外快-必看的巅峰干货 对于Node开发者来说,非阻塞异步编程是他们引以为傲的地方.而在JDK8中,也引入了非阻塞异步编程的概念.所谓非阻塞异步编程,就是一种不需要等待返回结果的多线程 ...
- 深入理解非阻塞同步IO和非阻塞异步IO
这两篇文章分析了Linux下的5种IO模型 http://blog.csdn.net/historyasamirror/article/details/5778378 http://blog.csdn ...
- 同步阻塞 同步非阻塞 异步阻塞 异步非阻塞
今天老师讲了同步阻塞 同步非阻塞 异步阻塞 异步非阻塞.讲完感觉老师自己说的都是前后矛盾的.去网上找了几篇大佬的博客,看完后才有点点感悟.特地小结记下来,若有错误之处,欢迎大家斧正. 首先先弄清楚同步 ...
- java 异步调用接口_Java接口异步调用
java接口调用从调用方式上可以分为3类:同步调用,异步调用,回调:同步调用基本不用说了,它是一种阻塞式的调用,就是A方法中直接调用方法B,从上往下依次执行.今天来说说异步调用. 什么是异步调用? 我 ...
- python2异步编程_最新Python异步编程详解
我们都知道对于I/O相关的程序来说,异步编程可以大幅度的提高系统的吞吐量,因为在某个I/O操作的读写过程中,系统可以先去处理其它的操作(通常是其它的I/O操作),那么Python中是如何实现异步编程的 ...
- Linux下同步模式、异步模式、阻塞调用、非阻塞调用总结
同步和异步:与消息的通知机制有关. 本质区别 现实例子 同步模式 由处理消息者自己去等待消息是否被触发 我去银行办理业务,选择排队等,排到头了就办理. 异步模式 由触发机制来通知处理消息者 我去银行办 ...
- java同步异步调用_详解java 三种调用机制(同步、回调、异步)
1:同步调用:一种阻塞式调用,调用方要等待对方执行完毕才返回,jsPwwCe它是一种单向调用 2:回调:一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口: 3:异步调用:一种类似消 ...
- 同步异步 阻塞 非阻塞 异步调用 线程队列 协程
阻塞 非阻塞 阻塞:程序遇到了IO操作 导致代码无法继续执行 交出了COU执行权 非阻塞:没有IO操作 或者即使遇到IO操作 也不阻塞代码执行 阻塞 就绪 运行指的是应用程序所处的状态写程序时 尽量减 ...
- java的rest异步调用_使用AsyncRestTemplate进行异步调用
背景: 最近项目中需要并发调用c++服务的http接口,问题是同时调用两个接口时,会发生严重阻塞,导致页面响应慢,还经常遇到接收数据超时,导致RestTemplate报出ReadTimeout错误,一 ...
最新文章
- Android studio 自动导入(全部)包 import
- 输入网址按回车,到底发生了什么
- 【612页】Android 大厂面试题及解析大全(中高级)
- 开源:分享10 个让你笑的合不拢嘴的 GitHub 项目!
- 前端学习(2458):评论模块
- Error:-81024 LR_VUG:The 'QTWeb' type is not supported on win32 platforms
- 苹果手机解压缩软件_装X教科书:买苹果电脑前应该了解哪些东西?
- 排序算法之 插入排序
- Linux下的Vivado安装——以Ubuntu为例
- Junos CLI常用命令
- USB Server助力广汽埃安U盾远程安全管控
- 微信小程序测试需要考虑哪些方面?一分钟快速掌握(小白必看!)
- URAL 1742 Team building 强联通
- ThinkPHP源码解析之控制器
- python正负数取余说明
- linux系统调用:exit()与_exit()函数详解【转】
- 大学生IT博客大赛最技术50强与最生活10强文章
- linux内核的红黑树
- finalize 复活_为什么Sekiro的其他内容将在2020年复活
- 机器人磨内孔_半导体阀块内孔机器人打磨,效果超乎想象!