flowable背压 取消

RxJava缺少创建无限自然数流的工厂。 这样的流很有用,例如,当您想通过压缩两个事件的顺序来为可能的无限事件流分配唯一的序列号时:

Flowable<Long> naturalNumbers = //???Flowable<Event> someInfiniteEventStream = //...
Flowable<Pair<Long, Event>> sequenced = Flowable.zip(naturalNumbers,someInfiniteEventStream,Pair::of
);

实现naturalNumbers令人惊讶地复杂。 在RxJava 1.x中,您可以短暂地放弃不遵守反压的Observable

import rx.Observable;  //RxJava 1.xObservable<Long> naturalNumbers = Observable.create(subscriber -> {long state = 0;//poor solution :-(while (!subscriber.isUnsubscribed()) {subscriber.onNext(state++);}
});

这样的流没有背压是什么意思? 好吧,基本上,流可以轻松地以CPU内核允许的速度生成事件(不断增加的state变量),每秒数百万。 但是,当使用者无法如此Swift地使用事件时,未处理事件的积压开始出现:

naturalNumbers
//      .observeOn(Schedulers.io()).subscribe(x -> {//slooow, 1 millisecond});

上面的程序(带有observeOn()运算符的注释掉)可以正常运行,因为它具有意外的反压。 默认情况下,所有内容在RxJava中都是单线程的,因此生产者和使用者在同一个线程中工作。 实际上,调用subscriber.onNext()会阻止,因此while循环会自动对其进行限制。 但是,尝试取消注释observeOn() ,灾难会在几毫秒后发生。 订阅回调在设计上是单线程的。 对于每个元素,它至少需要1毫秒,因此该流每秒可以处理不超过1000个事件。 我们有些幸运。 RxJava快速发现这种灾难性状况,并因MissingBackpressureException而快速失败

我们最大的错误是生产事件,而没有考虑消费者的速度。 顺便说一下,这是响应流背后的核心思想:不允许生产者发出比消费者请求更多的事件。 在RxJava 1.x中,即使实现最简单的流(从头开始考虑背压)也不是一件容易的事。 RxJava 2.x带来了几个方便的运算符,这些运算符建立在以前版本的经验基础之上。 首先RxJava 2.x时不允许你实现Flowable (背压-aware)的相同的方式,你可以与Observable 。 创建Flowable会使消费者使消息过载是不可能的:

Flowable<Long> naturalNumbers = Flowable.create(subscriber -> {long state = 0;while (!subscriber.isCancelled()) {subscriber.onNext(state++);}
}, BackpressureStrategy.DROP);

您是否发现了这个额外的DROP参数? 在解释之前,让我们看一下使用慢速用户订阅时的输出:

0
1
2
3
//...continuous numbers...
126
127
101811682
//...where did my 100M events go?!?
101811683
101811684
101811685
//...continuous numbers...
101811776
//...17M events disappeared again...
101811777
//...

你的旅费可能会改变。 怎么了? observeOn()运算符在调度程序(线程池)之间切换。 从未决事件队列中合并的线程池。 该队列是有限的,容量为128个元素。 知道此限制的observeOn()运算符仅从上游请求128个元素(我们的自定义Flowable )。 此时,它使我们的订户可以处理事件,每毫秒1次。 因此,大约100毫秒后, observeOn()发现其内部队列几乎为空,并要求更多。 会得到128、129、130…吗? 没有! 我们的Flowable在这0.1秒内产生了疯狂的事件,并且(令人惊讶地)在该时间段内产生了超过1亿个数字。 他们去哪了 好吧, observeOn()并没有要求它们,因此DROP策略(强制性参数)只是丢弃了不需要的事件。

BackpressureStrategy

听起来不对,还有其他策略吗? 是的,很多:

  • BackpressureStrategy.BUFFER :如果上游产生太多事件,则会将它们缓冲在无界队列中。 没有任何事件丢失,但是您的整个应用程序很可能会丢失。 如果幸运的话, OutOfMemoryError将拯救您。 我被困在5秒以上的长时间GC暂停中。
  • BackpressureStrategy.ERROR :如果发现事件的过度产生,将抛出MissingBackpressureException 。 这是一个理智(且安全)的策略。
  • BackpressureStrategy.LATEST :类似于DROP ,但是记住上次删除的事件。 以防万一需要更多数据,但我们只是删除了所有内容–至少具有最后看到的价值。
  • BackpressureStrategy.MISSING :没有安全措施,请加以处理。 下游运算符之一(如observeOn() )最有可能抛出MissingBackpressureException
  • BackpressureStrategy.DROP :删除未请求的事件。

顺便说一句,当您将Observable变为Flowable还必须提供BackpressureStrategy 。 RxJava必须知道如何限制过量产生的Observable 。 好的,那么简单的序列自然数流的正确实现是什么?

认识

create()generate()之间的区别在于责任。 假设Flowable.create()会在不考虑背压的情况下完整地生成流。 它只是在需要时才产生事件。 另一方面,仅允许Flowable.generate()一次生成一个事件(或完成流)。 背压机制透明地计算出当前需要多少个事件。 generate()调用适当的次数,例如,在observeOn()情况下, observeOn() 128次。

由于此运算符一次生成一个事件,因此通常需要某种状态来确定上次出现的时间1 。 这就是generate()含义:(im)可变状态的持有者和一个基于该状态生成下一个事件的函数:

Flowable<Long> naturalNumbers =Flowable.generate(() -> 0L, (state, emitter) -> {emitter.onNext(state);return state + 1;});

generate()的第一个参数是初始状态(工厂),在本例中为0L 。 现在,每当订户或任何下游运营商请求一些事件时,都会调用lambda表达式。 它的责任是根据提供的状态以某种方式最多调用一次onNext() (最多发出一个事件)。 首次调用lambda时, state等于初始值0L 。 但是,我们可以修改状态并返回其新值。 在此示例中,我们增加了long以便后续lambda表达式的调用接收到state = 1L 。 显然,这种情况不断发生,产生连续的自然数。

这样的编程模型显然比while循环难。 它还从根本上改变了实现事件源的方式。 与其在任何时候都想推送事件,不如只是被动地等待请求。 下游运营商和订户正在从您的流中提取数据。 这种转变可在管道的所有级别上产生背压。

generate()有一些风格。 首先,如果您的状态是可变对象,则可以使用不需要返回新状态值的重载版本。 尽管功能较少,但可变状态往往会产生较少的垃圾。 这假设您的状态不断变化,并且每次都传递相同的状态对象实例。 例如,您可以轻松地将Iterator (也是基于pull的!)变成具有反压奇观的流:

Iterator<Integer> iter = //...Flowable<String> strings = Flowable.generate(() -> iter, (iterator, emitter) -> {if (iterator.hasNext()) {emitter.onNext(iterator.next().toString());} else {emitter.onComplete();}
});

请注意,流的类型( <String> )不必与状态类型( Iterator<Integer> )相同。 当然,如果您有Java Collection并想将其转换为流,则不必先创建迭代器。 使用Flowable.fromIterable()足够了。 甚至更简单的generate()版本都假定您根本没有任何状态。 例如随机数流:

Flowable<Double> randoms = Flowable.generate(emitter -> emitter.onNext(Math.random()));

但老实说,您可能最终将需要一个Random实例:

Flowable.generate(Random::new, (random, emitter) -> {emitter.onNext(random.nextBoolean());
});

摘要

如您所见,RxJava 1.x中的Observable.create()和Flowable.create Flowable.create()有一些缺点。 如果您真的在乎大量并发系统的可伸缩性和运行状况(否则您将不会阅读本文!),则必须了解背压。 如果您真的需要从头开始创建流,而不是使用from*()系列方法或执行繁重工作的各种库,请熟悉generate() 。 本质上,您必须学习如何将某些类型的数据源建模为奇特的迭代器。 可能会有更多文章解释如何实现更多现实生活流。

这类似于无状态HTTP协议,该协议在服务器上使用称为会话*的小块状态来跟踪过去的请求。

翻译自: https://www.javacodegeeks.com/2017/08/generating-backpressure-aware-streams-flowable-generate-rxjava-faq.html

flowable背压 取消

flowable背压 取消_使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答...相关推荐

  1. 使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答

    RxJava缺少创建无限自然数流的工厂. 这样的流很有用,例如,当您想通过压缩两个事件的唯一序列号给可能的无限事件流时: Flowable<Long> naturalNumbers = / ...

  2. 技术停滞_检测和测试停滞的流– RxJava常见问题解答

    技术停滞 假设您有一个流以不可预测的频率发布事件. 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件. 如果您的流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题. ...

  3. rxjava 背压_背压加载文件– RxJava常见问题解答

    rxjava 背压 事实证明,将文件作为流进行处理非常有效且方便. 许多人似乎忘记了,自Java 8(3年以上!)以来,我们可以很容易地将任何文件变成一行代码: String filePath = & ...

  4. 背压加载文件– RxJava常见问题解答

    事实证明,将文件作为流进行处理非常有效且方便. 许多人似乎忘记了,自Java 8(3年多!)以来,我们可以很容易地将任何文件变成一行代码: String filePath = "foobar ...

  5. 渴望 英语_渴望订阅– RxJava常见问题解答

    渴望 英语 在教学和指导RxJava以及撰写本书之后 ,我注意到某些领域特别有问题. 我决定发布一些简短的提示,以解决最常见的陷阱. 这是第一部分. Observable和Flowable本质上是惰性 ...

  6. openwrt固定速率_固定速率与固定延迟– RxJava常见问题解答

    openwrt固定速率 如果您使用的是纯Java,从版本5开始,我们有一个方便的调度程序类,该类允许以固定速率或固定延迟运行任务: import java.util.concurrent.Execut ...

  7. 西门子cpu指示灯含义_【技成周报28期】西门子全系列常见问题解答分享

    点击上方蓝色字关注我们~ 每周的答疑时间又到啦~快来看看本周其他学员咨询的问题,是否也是你的疑惑呢?看看老师怎么回复的吧! 西门子S7-200SMART PLC 常见问题 问:西门子S7-200SMA ...

  8. ab的plc跟西门子哪个好些_周报61期 | 西门子全系列及博图软件常见问题解答

    技成周报:61期 1.通过编程能改变输入或输出端子的功能不? 答:不能,如果该端子功能可以修改,需要在组态里改. 2.两个路由器桥接可以实现PLC无线通信吗? 答:通过路由器及相关设置,可以实现无线通 ...

  9. flowable画图教程_给初学者的RxJava2.0教程(七): Flowable

    作者博客http://www.jianshu.com/u/c50b715ccaeb前言上一节里我们学习了只使用Observable如何去解决上下游流速不均衡的问题,之所以学习这个是因为Observab ...

最新文章

  1. Linux下遍历文件夹的实现
  2. elastic的gc相关
  3. 互联网金融产品做第三方支付平台托管需要注意什么?
  4. basler相机 ip linux,Linux环境中连接Basler相机(Pylon软件的安装),ROS环境中连接Basler相机...
  5. mysql 锁怎么使用_Mysql锁一般使用
  6. python if语句能否判断中文_Python“if”语句被忽略
  7. 冲突问题和核心对象和原生js 和 jquer的区别的两个区别(固定格式的区别和入口函数的区别)
  8. [AGC007 E]Shik and Travel
  9. 防止页面高度不足,引起的底部上浮问题
  10. 在线Excel的前端组件、控件,实现web Excel
  11. HTML页面跳转及传递参数
  12. Testbed静态分析
  13. C++-02、命名空间、构造函数、析构函数
  14. 有趣的Ruby-学习笔记4
  15. Django创建app应用和admin模块
  16. Redis遇到过的问题(Could not get a resource from the pool)
  17. IMX6UL 飞凌 LINUX 学习记录(1)
  18. 2019-2021届蓝桥杯——java真题集锦
  19. win7计算机怎么优化驱动器,Win7优化电脑加快关机速度的方法技巧
  20. 火炬2 贴吧导航(大纲1.0)

热门文章

  1. 数论分块练习([CF830 C]Bamboo Partition + [hdu 6395]Sequence )
  2. 【CF813F】Bipartite Checking(线段树分治+可删除并查集)
  3. 染色(树链剖分 洛谷-P2486)
  4. 【DP】K星人的语言(2020特长生 T3)
  5. 【矩阵乘法】沼泽鳄鱼(ssl 2511)
  6. 【模拟】pjesma(jzoj 1151)
  7. 【动态规划】【递归】取数字问题 (ssl 1644)
  8. 2017-2018 ACM-ICPC Nordic Collegiate Programming Contest (NCPC 2017)
  9. 6、mysql中字段
  10. Spring MVC 到底是如何工作的