如果您曾经需要使用RabbitMQ来串行处理消息,并且有一群监听器来处理消息,那么我所看到的最好方法是在监听器上使用“独占消费者”标志,每个监听器上有1个线程来处理消息。

专用使用者标志可确保只有1个使用者可以从特定队列中读取消息,并且该使用者上的1个线程可确保按顺序处理消息。 但是有一个问题,我待会儿再讲。

让我用基于Spring Boot和Spring Integration的RabbitMQ消息使用者来演示这种行为。

首先,这是用于使用Spring java配置设置队列的配置,请注意,由于这是Spring Boot应用程序,因此在将Spring-amqp库添加到依赖项列表时,它将自动创建RabbitMQ连接工厂:

@Configuration
@Configuration
public class RabbitConfig {@Autowiredprivate ConnectionFactory rabbitConnectionFactory;@Beanpublic Queue sampleQueue() {return new Queue("sample.queue", true, false, false);}}

给定这个示例队列,一个从该队列获取消息并对其进行处理的侦听器如下所示,该流程是使用出色的Spring集成Java DSL库编写的:

@Configuration
public class RabbitInboundFlow {private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);@Autowiredprivate RabbitConfig rabbitConfig;@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();listenerContainer.setConnectionFactory(this.connectionFactory);listenerContainer.setQueues(this.rabbitConfig.sampleQueue());listenerContainer.setConcurrentConsumers(1);listenerContainer.setExclusive(true);return listenerContainer;}@Beanpublic IntegrationFlow inboundFlow() {return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer())).transform(Transformers.objectToString()).handle((m) -> {logger.info("Processed  {}", m.getPayload());}).get();}}

该流程非常简洁地用inboundFlow方法表示,RabbitMQ的消息有效负载从字节数组转换为String,最后只需将消息记录到日志中即可进行处理。

该流程的重要部分是侦听器配置,请注意将使用者设置为独占使用者的标志,并且在该使用者中将线程处理数设置为1。即使仅启动了应用程序的多个实例,该处理数也被设置为1。其中一个监听器将能够连接和处理消息。

现在来看问题,考虑一种情况,消息处理需要一段时间才能完成,并且在消息处理期间会回滚。 如果处理消息的应用程序实例在处理此类消息的过程中被停止,则行为是另一个实例将开始处理队列中的消息,当停止的实例回滚消息时,该回滚然后将邮件传递给新的排他消费者,从而使邮件混乱。

  • 如果您有兴趣进一步探索它,可以使用以下github项目来使用此功能:https://github.com/bijukunjummen/test-rabbit-exclusive。

翻译自: https://www.javacodegeeks.com/2014/12/rabbitmq-processing-messages-serially-using-spring-integration-java-dsl.html

RabbitMQ –使用Spring集成Java DSL串行处理消息相关推荐

  1. spring集成mq_使用Spring Integration Java DSL与Rabbit MQ集成

    spring集成mq 我最近参加了在拉斯维加斯举行的2016年Spring大会 ,很幸运地看到了我在软件世界中长期敬佩的一些人. 我亲自遇到了其中的两个人,他们实际上合并了几年前我与Spring In ...

  2. 使用Spring Integration Java DSL与Rabbit MQ集成

    我最近参加了在拉斯维加斯举行的2016年Spring会议 ,很幸运地看到了我在软件世界中长期敬佩的一些人. 我亲自遇到了其中的两个人,他们实际上合并了几年前我与Spring Integration相关 ...

  3. Spring Integration Java DSL示例–使用Jms名称空间工厂进一步简化

    在较早的博客文章中,我谈到了虚拟卢布·戈德堡流程,该流程通过一系列复杂的步骤将字符串变成大写,本文的前提是引入Spring Integration Java DSL,作为通过xml配置文件定义集成流程 ...

  4. Spring Integration Java DSL示例

    现在已经为Spring Integration引入了新的基于Java的DSL ,这使得可以使用基于纯Java的配置而不是基于Spring XML的配置来定义Spring Integration消息流. ...

  5. 消息中间件系列四:RabbitMQ与Spring集成

    一.RabbitMQ与Spring集成  准备工作: 分别新建名为RabbitMQSpringProducer和RabbitMQSpringConsumer的maven web工程 在pom.xml文 ...

  6. java 多线程 串行 加锁_Java多线程(2)线程锁

    多线程访问同一个资源进行读写操作,就很容易出一些问题(比如我们常见的读者写者,生产者消费者模型)所以我们会选择对他们设置信号量或者加锁,来限制同一个时刻只有一个线程对某个对象进行操作. 多线程是一个蛮 ...

  7. java 多线程 串行 加锁_java多线程 synchronized 与lock锁 实现线程安全

    如果有多个线程在同时运行,而这些线程可能会同时运行这段代码.程序每次运行结果和单线程运行的结果是一样 的,而且其他的变量的值也和预期的是一样的,就是线程安全的. 通过卖火车票的例子 火车站要卖票,我们 ...

  8. java mifare_Mifare 串行读取协议

    Mifare卡是一种非接触式的智能卡,我们了解他们的相关协议. 表一 命令格式如下: 协议头(Head):2字节,固定填充0xAA,0xBB 长度(Length):2字节,从此列后的所有有效字节数(包 ...

  9. php 串行化数据,php中对象的串行化

    我们大家有知道PHP串行化可以把变量包括对象,转化成连续bytes数据,你可以将串行化后的变量存在一个文件里或在网络上传输,然后再反串行化还原为原来的数据.文章这里就PHP串行化为大家详细的介绍.你在 ...

最新文章

  1. gdb汇编调试c程序
  2. 一个小网管的淘金梦----深圳往事(4)
  3. tableau实战系列(三十)- 多细节层次(LOD) 计算产品销售周期延申表达式详解
  4. import caffe失败 No module named caffe
  5. POJ2942-Knights of the Round Table【tarjan】
  6. HDR【openCV实现】
  7. Linux学习笔记13
  8. Object.create()和new object()和{}的区别
  9. 计算机视觉必备框架!Opencv系列学习实战
  10. centos 7 mysql 中文,解决centOS7 下mysql插入中文字符报错相关问题
  11. C++开源代码覆盖率工具OpenCppCoverage介绍(Windows)
  12. Linux系统下文件与目录操作
  13. 动态规划-hdoj-4832-百度之星2014初赛第二场
  14. LayaAir 定时器 Timer
  15. 黑苹果日记六(双系统引导)
  16. 计算机vb输入框函数,VB基本函数大全
  17. 360众筹网_360众筹平台
  18. JAVA基础知识学习全覆盖
  19. windows重装系统之后,开机显示“An operating system wasn't found,Try disconnecting any drives that...”(亲身遇到+解决方法)
  20. 财务复式记账和平行登记有什么区别

热门文章

  1. 使用ueditor实现多图片上传案例——截取字符串层Util(SubString_text)
  2. Java标识符与命名规则
  3. JS中数组的常用方法
  4. android 设置视频音量大小,为cocos2d-x添加调节视频音量的功能(Android)
  5. mysql common是什么_MySQL common_schema简介
  6. idea 2个配置 实时编译 autowire注解错
  7. java泛型程序设计——Varargs 警告+不能实例化类型变量
  8. 在diy的文件系统上创建文件的流程
  9. 使用互联网了解的两个月里_我两个月来对Quarkus的了解
  10. spring boot测试_测试Spring Boot有条件的合理方式