使用RxJava和SseEmitter进行服务器发送的事件
Spring Framework 4.2 GA即将发布,让我们看一下它提供的一些新功能。 引起我注意的一个事件是一个简单的新类SseEmitter
,它是对Spring MVC控制器中易于使用的发送事件的抽象。 SSE是一项技术,使您可以在一个HTTP连接内沿一个方向将数据从服务器流式传输到浏览器。 听起来像是websocket可以做什么的子集。 但是,由于它是一个简单得多的协议,因此可以在不需要全双工的情况下使用,例如,实时推动股价变化或显示长时间运行的进程。 这将是我们的例子。
假设我们有一个具有以下API的虚拟硬币矿工:
public interface CoinMiner {BigDecimal mine() {//...}
}
每次调用mine()
我们都必须等待几秒钟,才能获得大约1个硬币的回报(平均)。 如果要挖掘多个硬币,我们必须多次调用此方法:
@RestController
public class MiningController {//...@RequestMapping("/mine/{count}")void mine(@PathVariable int count) {IntStream.range(0, count).forEach(x -> coinMiner.mine());}}
这项工作有效,我们可以请求/mine/10
和mine()
方法将执行10次。 到目前为止,一切都很好。 但是挖掘是一项占用大量CPU的任务,将计算分散到多个内核将是有益的。 此外,即使使用并行化,我们的API端点也相当慢,我们必须耐心等待直到所有工作完成而没有任何进度通知。 让我们首先修复并行性–但是,由于并行流无法控制底层线程池,因此我们来使用显式的ExecutorService
:
@Component
class CoinMiner {CompletableFuture<BigDecimal> mineAsync(ExecutorService executorService) {return CompletableFuture.supplyAsync(this::mine, executorService);}//...}
客户端代码必须显式提供ExecutorService
(只是设计选择):
@RequestMapping("/mine/{count}")
void mine(@PathVariable int count) {final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList());futures.forEach(CompletableFuture::join);
}
首先多次调用mineAsync
,然后(作为第二阶段)等待所有mineAsync
完成并join
这非常重要。 很容易写:
IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).forEach(CompletableFuture::join);
但是,由于Java 8中流的惰性,该任务将按顺序执行! 如果您还不习惯流的懒惰,请始终从下至上阅读它们:我们要求join
一些将来的内容,以便流上升并只调用一次mineAsync()
(惰性!),并将其传递给join()
。 当join()
完成时,它再次上升并要求另一个Future
。 通过使用collect()
我们强制所有mineAsync()
执行,开始所有异步计算。 稍后我们等待每一个。
介绍
现在该变得更具反应性了(我说过了)。 控制器可以返回SseEmitter
的实例。 从处理程序方法return
后,容器线程将被释放并可以处理更多即将到来的请求。 但是连接没有关闭,客户端一直在等待! 我们应该做的是保留对SseEmitter
实例的引用,并在以后从另一个线程调用其send()
和complete
方法。 例如,我们可以启动一个长时间运行的进程,并保持send()
从任意线程进行进度。 完成该过程后,我们complete()
SseEmitter
,最后关闭HTTP连接(至少从逻辑SseEmitter
,请记住Keep-alive
)。 在下面的示例中,我们有一堆CompletableFuture
,当每个CompletableFuture
完成时,我们只需将1
发送给客户端( notifyProgress()
)。 当所有期货都完成后,我们完成流( thenRun(sseEmitter::complete)
),关闭连接:
@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();final List<CompletableFuture<BigDecimal>> futures = mineAsync(count);futures.forEach(future ->future.thenRun(() -> notifyProgress(sseEmitter)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(sseEmitter::complete);return sseEmitter;
}private void notifyProgress(SseEmitter sseEmitter) {try {sseEmitter.send(1);} catch (IOException e) {throw new RuntimeException(e);}
}private List<CompletableFuture<BigDecimal>> mineAsync(@PathVariable int count) {return IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList());
}
调用此方法将产生以下响应(注意Content-Type
):
< HTTP/1.1 200 OK
< Content-Type: text/event-stream;charset=UTF-8
< Transfer-Encoding: chunked
<
data:1data:1data:1data:1* Connection #0 to host localhost left intact
稍后我们将学习如何在客户端解释这种响应。 现在暂时让我们整理一下设计。
与引进RxJava
上面的代码有效,但是看起来很凌乱。 实际上,我们有一系列事件,每个事件都代表计算的进度。 计算最终完成,因此流也应发出信号结束。 听起来就像是Observable
! 我们从重构CoinMiner
开始,以返回Observable<BigDecimal
:
Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final ReplaySubject<BigDecimal> subject = ReplaySubject.create();final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());futures.forEach(future ->future.thenRun(() -> subject.onNext(BigDecimal.ONE)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(subject::onCompleted);return subject;
}
每当mineMany()
返回的事件出现在Observable
,我们就mineMany()
那么多硬币。 当所有期货都完成后,我们也完成了交易。 在实现方面,这看起来还没有改善,但是从控制器的角度来看,它有多干净:
@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).subscribe(value -> notifyProgress(sseEmitter),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter;
}
调用coinMiner.mineMany()
我们只需订阅事件。 事实证明Observable
和SseEmitter
方法匹配1:1。 这里发生的事情很不言自明:启动异步计算,每当后台计算发出任何进度信号时,将其转发给客户端。 好的,让我们回到实现。 看起来很乱,因为我们将CompletableFuture
和Observable
混合使用。 我已经描述了如何仅使用一个元素将CompletableFuture
转换为Observable
。 这是一个概述,包括rx.Single
从RxJava 1.0.13开始发现的rx.Single
抽象(此处未使用):
public class Futures {public static <T> Observable<T> toObservable(CompletableFuture<T> future) {return Observable.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onNext(result);subscriber.onCompleted();}}));}public static <T> Single<T> toSingle(CompletableFuture<T> future) {return Single.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onSuccess(result);}}));}}
将这些实用程序运算符放在某个地方,我们可以改善实现并避免混合使用两个API:
Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final List<Observable<BigDecimal>> observables = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());return Observable.merge(observables);
}Observable<BigDecimal> mineAsync(ExecutorService executorService) {final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::mine, executorService);return Futures.toObservable(future);
}
RxJava有一个内置的运算符,用于将多个Observable
合并为一个,我们的每个基础Observable
发出一个事件,这无关紧要。
深入研究RxJava运算符
让我们使用RxJava的功能来稍微改善流式传输。
scan()
当前,每次我们开采一枚硬币时,我们都会send(1)
客户端send(1)
事件。 这意味着每个客户都必须跟踪其已经收到的硬币数量,以便计算总的计算数量。 如果服务器始终发送总金额而不是增量,那就太好了。 但是,我们不想更改实现。 事实证明,使用Observable.scan()
运算符非常简单:
@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).scan(BigDecimal::add).subscribe(value -> notifyProgress(sseEmitter, value),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter;
}private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {try {sseEmitter.send(value);} catch (IOException e) {e.printStackTrace();}
}
scan()
运算符接收上一个事件和当前事件,并将它们组合在一起。 通过应用BigDecimal::add
我们只需将所有数字相加即可。 例如1、1 +1,(1 +1)+1等。 scan()
类似于flatMap()
,但保留中间值。
用sample()
采样
可能是因为我们的后端服务产生了太多的进度更新,我们无法使用。 我们不想给客户端增加不相关的更新并饱和带宽。 每秒最多发送两次更新听起来很合理。 幸运的是,RxJava也有一个内置的运算符:
Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs.scan(BigDecimal::add).sample(500, TimeUnit.MILLISECONDS).subscribe(//...);
sample()
将定期查看底层流,并仅发出最新的项,并丢弃中间的项。 幸运的是,我们使用scan()
即时聚合了项目,因此我们不会丢失任何更新。
window()
–恒定的发射间隔
不过有一个陷阱。 如果在选定的500毫秒内没有新内容出现, sample()
将不会两次发出相同的项目。 很好,但是请记住我们正在通过TCP / IP连接推送这些更新。 最好是定期向客户端发送更新,即使在此期间什么也没发生–只是为了保持连接的正常运行,就像ping
。 可能有多种方法可以满足此要求,例如,涉及timeout()
运算符。 我选择使用window()
运算符每500毫秒对所有事件进行分组:
Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs.window(500, TimeUnit.MILLISECONDS).flatMap(window -> window.reduce(BigDecimal.ZERO, BigDecimal::add)).scan(BigDecimal::add).subscribe(//...);
这是一个棘手的问题。 首先,我们将所有进度更新分组在500毫秒窗口中。 然后,我们使用reduce
来计算在此时间段内开采的硬币的总数(类似于scan()
)。 如果在此期间未开采任何硬币,我们只需返回ZERO
。 最后,我们使用scan()
汇总每个窗口的小计。 我们不再需要sample()
因为window()
确保每500毫秒发出一个事件。
客户端
JavaScript中有很多SSE用法的示例,因此为您提供一种快速的解决方案,称为我们的控制器:
var source = new EventSource("/mine/10");
source.onmessage = function (event) {console.info(event);
};
我相信SseEmitter
是Spring MVC的一项重大改进,它将使我们能够编写更健壮和更快的Web应用程序,需要即时的单向更新。
翻译自: https://www.javacodegeeks.com/2015/08/server-sent-events-with-rxjava-and-sseemitter.html
使用RxJava和SseEmitter进行服务器发送的事件相关推荐
- rxjava 循环发送事件_使用RxJava和SseEmitter进行服务器发送的事件
rxjava 循环发送事件 Spring Framework 4.2 GA即将发布,让我们看一下它提供的一些新功能. 引起我注意的一个事件是一个简单的新类SseEmitter ,它是对Spring M ...
- jax-rs jax-ws_迟来总比没有好:SSE或服务器发送的事件现在已在JAX-RS中
jax-rs jax-ws 服务器发送的事件 (或简称为SSE )是非常有用的协议,它允许服务器通过HTTP将数据推送到客户端. 这是我们的Web浏览器支持的年龄,但是令人惊讶的是, JAX-RS规范 ...
- 迟来总比没有好:SSE或服务器发送的事件现在已在JAX-RS中
服务器发送的事件 (或简称为SSE )是非常有用的协议,它允许服务器通过HTTP将数据推送到客户端. 这是我们的网络浏览器支持的年龄,但令人惊讶的是, JAX-RS规范在很长一段时间内都忽略了这一点. ...
- WebSockets与服务器发送的事件/ EventSource
WebSockets和服务器发送事件都能够将数据推送到浏览器. 在我看来,它们似乎是竞争技术. 它们之间有什么区别? 您何时会选择一个? #1楼 根据caniuse.com: 96%的全球用户本机支持 ...
- java sse spring_【SpringBoot WEB 系列】SSE 服务器发送事件详解
SSE 全称Server Sent Event,直译一下就是服务器发送事件,一般的项目开发中,用到的机会不多,可能很多小伙伴不太清楚这个东西,到底是干啥的,有啥用 本文主要知识点如下: SSE 扫盲, ...
- HTML5支持服务器发送事件
来源 传统的WEB应用程序通信时的简单时序图: 现在Web App中,大都有Ajax,是这样子: HTML5有一个Server-Sent Events(SSE)功能,允许服务端推送数据到客户端.(通常 ...
- HTML5支持服务器发送事件(Server-Sent Events)-单向消息传递数据推送(C#示例)
传统的WEB应用程序通信时的简单时序图: 现在Web App中,大都有Ajax,是这样子: HTML5有一个Server-Sent Events(SSE)功能,允许服务端推送数据到客户端.(通常叫数据 ...
- sql server无法绑定由多个部分组成的标识符_HTML5服务器推送事件(Server-sent-event)...
在前端开发中,实现界面推送的方式,这里大概总结下三种方式 轮询(ajax),比较耗费服务器资源.COMET方式(COMET 技术并不是 HTML 5 ) websocket 双向数据推送,灵活,功能强 ...
- HTML5 服务器推送事件(Server-sent Events)实战开发
对于一般的 Web 应用开发,大多数开发人员并不陌生.在 Web 应用中,浏览器和服务器之间使用的是请求 / 响应的交互模式.浏览器发出请求,服务器根据收到的请求来生成相应的响应.浏览器再对收到的响应 ...
最新文章
- 高并发大型网站架构设计
- 两周后上线,老板你在开玩笑吗?
- 【PAT - 甲级1045】Favorite Color Stripe(30分)(dp,LIS类问题)
- docker mysql57_docker安装mysql57
- mysql 存储过程 批量导入数据_sql 利用存储过程批量导入数据
- 服务器控件的使用注意事项
- bzoj2038 [2009国家集训队]小Z的袜子(hose)
- java计算机毕业设计劳务外包管理系统源码+系统+mysql数据库+lw文档
- #pragma once用法总结和链接错误
- 火箭08-09赛程列表
- Xilinx HLS 导出IP失败的最新解决方案(2022.1.15)
- C++数值类型极限值的获取
- NiosII 学习过程
- JAVA回文数的判断
- 流放之路进去后显示无法连接登入服务器,流放之路此账号目前无法登录游戏
- sunos与linux区别,linux与solaris的联系与区别总结:命令的异同
- 三级分销如何合规分账?
- 吃豆人 html5 倒计时,ChinaJoy开展倒计时,回忆杀吃豆人ip摩擦康迪克水杯溅火花...
- 什么是银行的SWIFT代码
- *CF1216F. Wi-Fi (dp)
热门文章
- C++描述杭电OJ 2008.数值统计 ||
- Ajax判断用户名是否可用
- 在gitee上创建自己的仓库步骤
- 开发环境 Minio 添加桶的操作流程-页面操作
- 布隆过滤器速度_详解布隆过滤器的原理、使用场景和注意事项
- 递归算法和迭代算法_Java中没有递归的二进制搜索–迭代算法
- neo4j 显示名字_Neo4j:绘制“我的名字是……我在工作”图
- java中使用jython_将Jython嵌入到您的Java代码库中
- java jsp学习指南_JSP教程–最终指南
- jvm内存 大于 xmx_为什么我的JVM访问的内存少于通过-Xmx指定的内存?