技术停滞

假设您有一个流以不可预测的频率发布事件。 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件。 如果您的流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题。 静默时间过长(停顿)可以解释为网络问题。 因此,我们经常不时发送人造事件( ping ),以确保:

  • 客户还活着
  • 让客户知道我们还活着

举一个更具体的例子,假设我们有一个Flowable<String>流,它会产生一些事件。 如果没有事件超过一秒钟,我们应该发送一个占位符"PING"消息。 当静默时间更长时,应该每秒发出一个"PING"消息。 我们如何在RxJava中实现这样的要求? 最明显但不正确的解决方案是将原始流与ping合并:

Flowable<String> events = //...
Flowable<String> pings = Flowable.interval(1, SECONDS).map(x -> "PING");Flowable<String> eventsWithPings = events.mergeWith(pings);

mergeWith()运算符至关重要:它接受真正的events ,并将它们与恒定的ping流合并。 当然,当没有真实的事件出现时,将出现"PING"消息。 不幸的是,它们与原始流完全无关。 这意味着即使有很多正常事件,我们也会继续发送ping命令。 而且,当静默开始时,我们不会在一秒钟后精确发送"PING" 。 如果您对这种机制感到满意,则可以在此处停止阅读。

一种更复杂的方法需要发现持续超过1秒的静音。 我们可以使用timeout()运算符。 不幸的是,它会产生TimeoutException并从上游退订-行为过于激进。 我们只想收到某种通知。 事实证明,可以使用debounce()运算符。 通常,此操作员会推迟新事件的发出,以防万一有新事件出现,从而覆盖了旧事件。 所以,如果我说:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);

这意味着delayed流仅在1秒内跟随其他事件时才会发出事件。 如果events流使产生事件的速度足够快,从技术上讲, delayed可能永远不会发出任何东西。 我们将使用delayed流通过以下方式发现沉默:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.map(ev -> "PING");
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

请记住, mergeWith()和它的static merge()对应项之间没有区别。 所以我们到了某个地方。 如果流繁忙,则delayed流将永远不会收到任何事件,因此不会发送"PING"消息。 但是,当原始流不发送任何事件超过1秒时, delayed接收到最后看到的事件,将其忽略并转换为"PING" 。 聪明,但坏了。 此实现仅在发现停顿后才发送一个"PING" ,而不是每秒发送一次定期ping。 很容易修复! 除了将最后一次看到的事件转换为单个"PING"我们还可以将其转换为周期性ping序列:

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING"));
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

您能看到缺陷在哪里吗? 每当原始流中出现一点沉默时,我们就会每秒发出一次ping 。 但是,一旦出现一些真正的事件,我们应该停止这样做。 我们没有。 上游的每个停顿都会导致新的无限ping流出现在最终的合并流中。 我们必须以某种方式告诉pings流,因为原始流发出了真正的事件,所以它应该停止发出ping 。 猜猜是什么,有takeUntil()运算符可以做到这一点!

Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events));
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

花一点时间完全掌握上面的代码片段。 每当原始流上超过1秒没有任何React时, delayed流就会发出一个事件。 pings流发射的序列"PING"每秒从发射每个事件的事件delayed 。 但是,一旦事件出现在events流上,便会终止pings流。 您甚至可以将所有这些定义为单个表达式:

Flowable<String> events = //...
Flowable<String> eventsWithPings = events.mergeWith(events.debounce(1, SECONDS).flatMap(x1 -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events)));

可测性

好的,我们已经编写了所有这些内容,但是我们应该如何测试事件驱动代码的这个三层嵌套的Blob? 我们如何确保ping在正确的时间出现并在静默结束时停止? 如何模拟各种与时间相关的场景? RxJava具有许多杀手级功能,但是测试时间流逝可能是最大的功能。 首先,让我们的ping代码更具可测试性和通用性:

<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {return events.mergeWith(events.debounce(1, SECONDS, clock).flatMap(x1 -> Flowable.interval(0, 1, SECONDS, clock).map(e -> ping).takeUntil(events)));}

此实用程序方法采用任意的T流并添加ping ,以防该流在较长时间内不产生任何事件。 我们在测试中像这样使用它:

PublishProcessor<String> events = PublishProcessor.create();
TestScheduler clock = new TestScheduler();
Flowable<String> eventsWithPings = withPings(events, clock, "PING");

哦,男孩, PublishProcessorTestSchedulerPublishProcessor是一个有趣的类,它是一个亚型Flowable (所以我们可以使用它作为一个普通的流)。 另一方面,我们可以使用其onNext()方法强制发出事件:

events.onNext("A");

如果有人收听events流,他将立即收到"A"事件。 这clock是怎么回事? RxJava中以任何方式处理时间的每个运算符(例如debounce debounce()interval()timeout()window() )都可以采用可选的Scheduler参数。 它充当时间的外部来源。 特殊的TestScheduler是我们完全控制的人为的时间来源。 也就是说,只要我们不显式调用advanceTimeBy()时间就保持静止:

clock.advanceTimeBy(999, MILLISECONDS);

999毫秒不是巧合。 Ping在1秒钟后开始精确显示,因此在999毫秒后将不可见。 现在是时候揭示完整的测试用例了:

@Test
public void shouldAddPings() throws Exception {PublishProcessor<String> events = PublishProcessor.create();final TestScheduler clock = new TestScheduler();final Flowable<String> eventsWithPings = withPings(events, clock, "PING");final TestSubscriber<String> test = eventsWithPings.test();events.onNext("A");test.assertValues("A");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("B");test.assertValues("A", "B");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING");events.onNext("C");test.assertValues("A", "B", "PING", "C");clock.advanceTimeBy(1000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");events.onNext("D");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("E");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");clock.advanceTimeBy(3_000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING");
}

看起来像一堵墙,但这实际上是我们逻辑的完整测试方案。 它可以确保ping在1000毫秒后精确显示,当沉默很长时重复一次,而在真正事件出现时则重复很短。 但最重要的部分是:该测试是100%可预测的并且非常快。 没有Awaitility ,忙等待,轮询,间歇性测试失败和缓慢。 我们完全控制的人工时钟可确保所有这些组合流均按预期工作。

翻译自: https://www.javacodegeeks.com/2017/09/detecting-testing-stalled-streams-rxjava-faq.html

技术停滞

技术停滞_检测和测试停滞的流– RxJava常见问题解答相关推荐

  1. 检测和测试停滞的流– RxJava常见问题解答

    假设您有一个流以不可预测的频率发布事件. 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件. 如果您的流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题. 静默时间 ...

  2. flowable背压 取消_使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答...

    flowable背压 取消 RxJava缺少创建无限自然数流的工厂. 这样的流很有用,例如,当您想通过压缩两个事件的顺序来为可能的无限事件流分配唯一的序列号时: Flowable<Long> ...

  3. openwrt固定速率_固定速率与固定延迟– RxJava常见问题解答

    openwrt固定速率 如果您使用的是纯Java,从版本5开始,我们有一个方便的调度程序类,该类允许以固定速率或固定延迟运行任务: import java.util.concurrent.Execut ...

  4. ashx文件 验证是否登录_汇总丨增值税综合服务平台登录常见问题解答

    按照国家税务总局统一部署,国家税务总局厦门市税务局将于2019年10月31日起上线增值税发票管理系统2.0版.届时原增值税选择确认平台和发票查验平台将升级为增值税发票综合服务平台. 为帮助您提前了解综 ...

  5. 渴望 英语_渴望订阅– RxJava常见问题解答

    渴望 英语 在教学和指导RxJava以及撰写本书之后 ,我注意到某些领域特别有问题. 我决定发布一些简短的提示,以解决最常见的陷阱. 这是第一部分. Observable和Flowable本质上是惰性 ...

  6. rxjava 背压_背压加载文件– RxJava常见问题解答

    rxjava 背压 事实证明,将文件作为流进行处理非常有效且方便. 许多人似乎忘记了,自Java 8(3年以上!)以来,我们可以很容易地将任何文件变成一行代码: String filePath = & ...

  7. python3安装常见问题_有关在 Windows 上使用 Python 的常见问题解答

    有关在 Windows 上使用 Python 的常见问题解答Frequently Asked Questions about using Python on Windows 07/19/2019 本文 ...

  8. 游戏开发物语方案点数分配_游戏开发物语攻略汇总 常见问题解答

    第1页: 展开 今天为大家带来的是游戏开发物语攻略汇总,常见问题解答.这里汇总了一些游戏开发物语的实用攻略,感兴趣的小伙伴一起来看看吧. 实用攻略 1.找一张相性表,一开始选择两组好的相性(八楼的朋友 ...

  9. 学习笔记:SpringCloud 微服务技术栈_实用篇①_基础知识

    若文章内容或图片失效,请留言反馈.部分素材来自网络,若不小心影响到您的利益,请联系博主删除. 前言 学习视频链接 SpringCloud + RabbitMQ + Docker + Redis + 搜 ...

最新文章

  1. 3.innodb内存结构
  2. 本地 php nginx压测试
  3. python CMAKE的使用
  4. android 手机命令大全,adb 命令大全
  5. (转)细说Cookie
  6. ZKWeb网页框架1.3正式发布
  7. aPaaS将如何改变软件行业?
  8. php 取整十整百,php取整数的方法与实例总结
  9. MapReduce Map数 reduce数设置
  10. 必看!建立内部邮件服务器注意要点
  11. 注解配置springMvc及向作用域中赋值
  12. 数据库优化之MySQL
  13. 企业如何利用工业大数据实现价值,这5个真实案例告诉你!
  14. 关于安卓脚本打包apk
  15. iOS 10诸如相机、相册、通讯录、麦克风、定位权限设置,防止奔溃或上架被拒
  16. Clearview Mac电子书阅读器
  17. 群签名技术的理解和总结
  18. usb_cam的ROS2甜点
  19. 微信小程序用户盒子、宫格列表(CSS)
  20. Homekit智能家居DIY之智能通断开关

热门文章

  1. Loj#3130-「COCI 2018.12」Praktični【线性基】
  2. P6672-[清华集训2016]你的生命已如风中残烛【结论】
  3. M. Monster Hunter(树形dp)
  4. 【DP】集合问题(2015特长生 T4/luogu 1466)
  5. vue-cli2、vue-cli3脚手架详细讲解
  6. 一文搞定 Spring Data Redis 详解及实战
  7. Java synchronized 中的while 和 notifyAll
  8. 接口 DataOutput
  9. 越努力越幸运,三年了!!!
  10. 全国教学交流研讨会“教学为本”主题总结