rxjava背压_关于Rx Java:如何在RxJava RabbitMQ Observable中实现背压?
我是RxJava的新手,它试图从支持无损背压的RabbitMQ队列中实现消息的Observable。 我设法从Spring AMQP MessageListener创建了一个Observable。 这样可以在同步环境中很好地处理背压(例如,调用堆栈阻塞),但是一旦引入了多个线程,背压就会如您所愿地消失在窗口之外。 该类如下:
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
import rx.Observable;
import rx.subscriptions.Subscriptions;
import javax.inject.Inject;
@Component
public class CommandExchange {
private final MessageConverter messageConverter;
private final ConnectionFactory connectionFactory;
@Inject
public CommandExchange(MessageConverter messageConverter, ConnectionFactory connectionFactory) {
this.messageConverter = messageConverter;
this.connectionFactory = connectionFactory;
}
public Observable< T > observeQueue(String... queueNames) {
return Observable.create(subscriber -> {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueNames);
container.setMessageListener((MessageListener) message -> {
T command = (T) messageConverter.fromMessage(message);
if (!subscriber.isUnsubscribed()) {
System.out.println("Being asked for a message.");
subscriber.onNext(command);
}
});
container.start();
Subscriptions.create(container::shutdown);
});
}
}
我无法在没有阻塞或缓冲的情况下了解如何实现无损反压。 使用缓冲没有任何意义,因为Rabbit MQ队列已经是一个缓冲区-因此,仅当订阅者准备好从队列中使用消息时,才应使用该消息。 解决方案是否是使用基于拉式的可观察的(即停止使用侦听器,而是在订户有需求时获取消息)? 如果是这样,处理队列中当前没有消息的最佳实践是什么?
是的,我将停止使用侦听器,并按需从队列中获取消息。 请求记帐和背压都将为您处理,如果您使用
Observable.create(new SyncOnSubscribe< T >() {...});
在SyncOnSubscribe中,您或多或少只指定获取一条消息所采取的操作(如果没有等待,则不执行任何操作)。
在这方面需要更多帮助吗? 如果您需要,我可以添加更多细节。
public Observable< T > observeQueue(String queueName) { Observable.OnSubscribe< T > stateless = SyncOnSubscribe.createStateless(s -> { s.onNext((T) rabbitTemplate.receiveAndConvert(queueName, -1)); }); return Observable.create(stateless).subscribeOn(Schedulers.newThread()); }这很好用,尽管我不确定在新线程上运行的阻塞观察器是否是最佳实践。
rxjava背压_关于Rx Java:如何在RxJava RabbitMQ Observable中实现背压?相关推荐
- java事务超时时间,java – 如何在WebSphere Liberty Batch中配置事务超时?
> javax.transaction.global.timeout的作用是什么? >我是否需要在CheckpointAlgorithm中实现checkpointTimeout()方法? ...
- java替换数组中的元素_如何使用Java 8流快速替换列表中的元素
java替换数组中的元素 假设您有一个项目清单: List<String> books = Arrays.asList("The Holy Cow: The Bovine Tes ...
- java点击按钮结线程_多线程的Java应用程序在调试工具Netbeans中单击“停止”按钮时输出一个奇怪的结果...
我使用wait()和notify()机制学习了java中的多线程. 但我很好奇输出一个简单的多线程Java应用程序. 代码如下: class Q { int n; boolean valueSet = ...
- 0编译器详解_详解Java枚举类型(Enum)中的方法
文章前记 程序员工作久了便可能整日忙碌于"增删改查"中,迷失方向,毫无进步. 该公众号致力于分享软件开发相关的原创干货,助你完成从程序员到架构师的进阶之路! 努力!做一个NB的Co ...
- java html2text_java-如何在html / text内容中获取文本?
大家好 我有html / text之类的东西: first text one: second texttwo: third textthree: fourth textfive: fifth text ...
- java aio实现_深入理解Java AIO(三)—— Linux中的AIO实现
我们调用的Java AIO底层也是要调用OS的AIO实现,而OS主要也就Windows和Linux这两大类,当然还有Solaris和mac这些小众的. 在 Windows 操作系统中,提供了一个叫做 ...
- 合肥Java面试常考题_北大青鸟java 面试--常见面试题(中)
上一文中,我们总结了java面试的基础,多线程,jvm的常见面试题,本文合肥北大青鸟合工大校区的袁老师继续介绍面试中网络.数据结构和算法.分布式理论和微服务的常见面试题. 一.网络 网络的话,主要集中 ...
- java sha1加密ascii码_请问下面java的Sha1加密在c#中对应要怎么写?
/** * 读取指定文件块数据Sha1 * * @param fis * @return */ private static MessageDigest calSha1(BufferedInputSt ...
- 下列关于Java多线程并发控制_下列关于Java多线程并发控制机制的叙述中,错误的是...
下列叙述成都望江楼的造景手法有(). 竹文化景观应体现科学性与艺术性的和谐统一,关于既要满足植物的生态习性,又能体现美学价值. 在中国传统的审美趣味.多线伦理道德上,竹在造园中被拟人化为( )的代表. ...
- android 线性布局 底部,java – 如何在android线性布局中对齐父底部?
我有一个线性布局 我想在它的底部创建一个切片. 我知道有一些选择,但我有点困惑 1)android:layout_gravity:"bottom" – >由于某种原因,这对我 ...
最新文章
- UVa 11624,两次BFS
- 你能活多少岁,就让人工智能来告诉你吧
- kotlin sealed 中_Kotlin sealed class
- 摩托罗拉:未来一切以手机为中心
- springboot中使用缓存shiro-ehcache
- 求一个任意实数c的算术平方根g_中考总复习实数知识点
- python的xml.dom学习笔记
- 怎么判断tcp重组完成_网络工程师(8):TCP为什么可靠
- ios基础篇(十二)——UINavgationController的使用(三)ToolBar
- 【ACL20】让笨重的BERT问答匹配模型变快!
- 《程序员》2012年7期精彩内容:智能算法
- 2008服务器系统开启ftp,win 2008服务器开启FTP功能
- 普及练习场 深度优先搜索 八皇后
- Error:Execution failed for task ':app:transformClassesAndResourcesWithProguardForRelease'. Job fai
- 让大象飞中的工作法(一)
- 如何在Win11调出IE11浏览器?
- Spring Cloud Alibaba 实战 | 第十二篇: 微服务整合Sentinel的流控、熔断降级,赋能拥有降级功能的Feign新技能熔断,实现熔断降级双剑合璧(JMeter模拟测试)
- 博客从wordpress迁移到hexo
- python笛卡尔坐标系_python – n球面坐标系到笛卡尔坐标系
- Java九十条经验法则之第二条:遇到多个构造器时要考虑使用构建器