我是RxJava的新手,它试图从支持无损背压的RabbitMQ队列中实现消息的Observable。 我设法从Spring AMQP MessageListener创建了一个Observable。 这样可以在同步环境中很好地处理背压(例如,调用堆栈阻塞),但是一旦引入了多个线程,背压就会如您所愿地消失在窗口之外。 该类如下:

import org.springframework.amqp.core.MessageListener;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.stereotype.Component;

import rx.Observable;

import rx.subscriptions.Subscriptions;

import javax.inject.Inject;

@Component

public class CommandExchange {

private final MessageConverter messageConverter;

private final ConnectionFactory connectionFactory;

@Inject

public CommandExchange(MessageConverter messageConverter, ConnectionFactory connectionFactory) {

this.messageConverter = messageConverter;

this.connectionFactory = connectionFactory;

}

public Observable< T > observeQueue(String... queueNames) {

return Observable.create(subscriber -> {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

container.setQueueNames(queueNames);

container.setMessageListener((MessageListener) message -> {

T command = (T) messageConverter.fromMessage(message);

if (!subscriber.isUnsubscribed()) {

System.out.println("Being asked for a message.");

subscriber.onNext(command);

}

});

container.start();

Subscriptions.create(container::shutdown);

});

}

}

我无法在没有阻塞或缓冲的情况下了解如何实现无损反压。 使用缓冲没有任何意义,因为Rabbit MQ队列已经是一个缓冲区-因此,仅当订阅者准备好从队列中使用消息时,才应使用该消息。 解决方案是否是使用基于拉式的可观察的(即停止使用侦听器,而是在订户有需求时获取消息)? 如果是这样,处理队列中当前没有消息的最佳实践是什么?

是的,我将停止使用侦听器,并按需从队列中获取消息。 请求记帐和背压都将为您处理,如果您使用

Observable.create(new SyncOnSubscribe< T >() {...});

在SyncOnSubscribe中,您或多或少只指定获取一条消息所采取的操作(如果没有等待,则不执行任何操作)。

在这方面需要更多帮助吗? 如果您需要,我可以添加更多细节。

public Observable< T > observeQueue(String queueName) { Observable.OnSubscribe< T > stateless = SyncOnSubscribe.createStateless(s -> { s.onNext((T) rabbitTemplate.receiveAndConvert(queueName, -1)); }); return Observable.create(stateless).subscribeOn(Schedulers.newThread()); }这很好用,尽管我不确定在新线程上运行的阻塞观察器是否是最佳实践。

rxjava背压_关于Rx Java:如何在RxJava RabbitMQ Observable中实现背压?相关推荐

  1. java事务超时时间,java – 如何在WebSphere Liberty Batch中配置事务超时?

    > javax.transaction.global.timeout的作用是什么? >我是否需要在CheckpointAlgorithm中实现checkpointTimeout()方法? ...

  2. java替换数组中的元素_如何使用Java 8流快速替换列表中的元素

    java替换数组中的元素 假设您有一个项目清单: List<String> books = Arrays.asList("The Holy Cow: The Bovine Tes ...

  3. java点击按钮结线程_多线程的Java应用程序在调试工具Netbeans中单击“停止”按钮时输出一个奇怪的结果...

    我使用wait()和notify()机制学习了java中的多线程. 但我很好奇输出一个简单的多线程Java应用程序. 代码如下: class Q { int n; boolean valueSet = ...

  4. 0编译器详解_详解Java枚举类型(Enum)中的方法

    文章前记 程序员工作久了便可能整日忙碌于"增删改查"中,迷失方向,毫无进步. 该公众号致力于分享软件开发相关的原创干货,助你完成从程序员到架构师的进阶之路! 努力!做一个NB的Co ...

  5. java html2text_java-如何在html / text内容中获取文本?

    大家好 我有html / text之类的东西: first text one: second texttwo: third textthree: fourth textfive: fifth text ...

  6. java aio实现_深入理解Java AIO(三)—— Linux中的AIO实现

    我们调用的Java AIO底层也是要调用OS的AIO实现,而OS主要也就Windows和Linux这两大类,当然还有Solaris和mac这些小众的. 在 Windows 操作系统中,提供了一个叫做 ...

  7. 合肥Java面试常考题_北大青鸟java 面试--常见面试题(中)

    上一文中,我们总结了java面试的基础,多线程,jvm的常见面试题,本文合肥北大青鸟合工大校区的袁老师继续介绍面试中网络.数据结构和算法.分布式理论和微服务的常见面试题. 一.网络 网络的话,主要集中 ...

  8. java sha1加密ascii码_请问下面java的Sha1加密在c#中对应要怎么写?

    /** * 读取指定文件块数据Sha1 * * @param fis * @return */ private static MessageDigest calSha1(BufferedInputSt ...

  9. 下列关于Java多线程并发控制_下列关于Java多线程并发控制机制的叙述中,错误的是...

    下列叙述成都望江楼的造景手法有(). 竹文化景观应体现科学性与艺术性的和谐统一,关于既要满足植物的生态习性,又能体现美学价值. 在中国传统的审美趣味.多线伦理道德上,竹在造园中被拟人化为( )的代表. ...

  10. android 线性布局 底部,java – 如何在android线性布局中对齐父底部?

    我有一个线性布局 我想在它的底部创建一个切片. 我知道有一些选择,但我有点困惑 1)android:layout_gravity:"bottom" – >由于某种原因,这对我 ...

最新文章

  1. UVa 11624,两次BFS
  2. 你能活多少岁,就让人工智能来告诉你吧
  3. kotlin sealed 中_Kotlin sealed class
  4. 摩托罗拉:未来一切以手机为中心
  5. springboot中使用缓存shiro-ehcache
  6. 求一个任意实数c的算术平方根g_中考总复习实数知识点
  7. python的xml.dom学习笔记
  8. 怎么判断tcp重组完成_网络工程师(8):TCP为什么可靠
  9. ios基础篇(十二)——UINavgationController的使用(三)ToolBar
  10. 【ACL20】让笨重的BERT问答匹配模型变快!
  11. 《程序员》2012年7期精彩内容:智能算法
  12. 2008服务器系统开启ftp,win 2008服务器开启FTP功能
  13. 普及练习场 深度优先搜索 八皇后
  14. Error:Execution failed for task ':app:transformClassesAndResourcesWithProguardForRelease'. Job fai
  15. 让大象飞中的工作法(一)
  16. 如何在Win11调出IE11浏览器?
  17. Spring Cloud Alibaba 实战 | 第十二篇: 微服务整合Sentinel的流控、熔断降级,赋能拥有降级功能的Feign新技能熔断,实现熔断降级双剑合璧(JMeter模拟测试)
  18. 博客从wordpress迁移到hexo
  19. python笛卡尔坐标系_python – n球面坐标系到笛卡尔坐标系
  20. Java九十条经验法则之第二条:遇到多个构造器时要考虑使用构建器

热门文章

  1. python表示倍数_python输出倍数
  2. #if 与 if 的区别
  3. NewStarCTF 公开赛wp
  4. 2015阿里巴巴北京年会——马云“北伐”讲话
  5. 小学信息技术 Linux,小学信息技术教育教学计划
  6. 高数 | chx和shx分别是什么
  7. JAVA中dot的用法_doT学习(一)之语法
  8. Linux块设备驱动(一) _驱动模型
  9. “应用程序无响应”原因汇总
  10. 猫哥教你写爬虫 022--类与对象(下)