应用场景

之前我们已经通过《Spring Cloud Stream消费失败后的处理策略(一):自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队。

动手试试

准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:

  1. @EnableBinding(TestApplication.TestTopic.class)

  2. @SpringBootApplication

  3. public class TestApplication {

  4.   public static void main(String[] args) {

  5.       SpringApplication.run(TestApplication.class, args);

  6.   }

  7.   @RestController

  8.   static class TestController {

  9.       @Autowired

  10.       private TestTopic testTopic;

  11.       /**

  12.        * 消息生产接口

  13.        *

  14.        * @param message

  15.        * @return

  16.        */

  17.       @GetMapping("/sendMessage")

  18.       public String messageWithMQ(@RequestParam String message) {

  19.           testTopic.output().send(MessageBuilder.withPayload(message).build());

  20.           return "ok";

  21.       }

  22.   }

  23.   /**

  24.    * 消息消费逻辑

  25.    */

  26.   @Slf4j

  27.   @Component

  28.   static class TestListener {

  29.       private int count = 1;

  30.       @StreamListener(TestTopic.INPUT)

  31.       public void receive(String payload) {

  32.           log.info("Received payload : " + payload + ", " + count);

  33.           throw new RuntimeException("Message consumer failed!");

  34.       }

  35.   }

  36.   interface TestTopic {

  37.       String OUTPUT = "example-topic-output";

  38.       String INPUT = "example-topic-input";

  39.       @Output(OUTPUT)

  40.       MessageChannel output();

  41.       @Input(INPUT)

  42.       SubscribableChannel input();

  43.   }

  44. }

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

  1. spring.cloud.stream.bindings.example-topic-input.destination=test-topic

  2. spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler

  3. spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

  4. spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true

  5. spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到程序不断的抛出了消息消费异常。这是由于这里我们多加了一个配置:spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true。在该配置作用之下,消息消费失败之后,并不会将该消息抛弃,而是将消息重新放入队列,所以消息的消费逻辑会被重复执行,直到这条消息消费成功为止。

深入思考

在完成了上面的这个例子之后,可能读者会有下面两个常见问题:

问题一:之前介绍的Spring Cloud Stream默认提供的默认功能(spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts)与本文所说的重入队列实现的重试有什么区别?

Spring Cloud Stream默认提供的默认功能只是对处理逻辑的重试,它们的处理逻辑是由同一条消息触发的。而本文所介绍的重新入队史通过重新将消息放入队列而触发的,所以实际上是收到了多次消息而实现的重试。


问题二:如上面的例子那样,消费一直不成功,这些不成功的消息会被不断堆积起来,如何解决这个问题?

对于这个问题,我们可以联合前文介绍的DLQ队列来完善消息的异常处理。

我们只需要增加如下配置,自动绑定dlq队列:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true

然后改造一下消息处理程序,可以根据业务情况,为进入dlq队列增加一个条件,比如下面的例子:

@StreamListener(TestTopic.INPUT)public void receive(String payload) {    log.info("Received payload : " + payload + ", " + count);    if (count == 3) {        count = 1;        throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!");    } else {        count ++;        throw new RuntimeException("Message consumer failed!");    }}

设定了计数器count,当count为3的时候抛出AmqpRejectAndDontRequeueException这个特定的异常。此时,当只有当抛出这个异常的时候,才会将消息放入DLQ队列,从而不会造成严重的堆积问题。

·END·

 近期热文:

  • 想通关「限流」?只要这一篇

  • 面试题:如何求根号2

  • Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

  • Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

  • PolarDB数据库性能大赛:95后徐妈的经验分享

  • 自己写分布式配置中心(上篇)- 单机模式

  • Spring Cloud Stream消费失败后的处理策略(一):自动重试

  • Redis几个重要的健康指标

点击“阅读原文”,看本号其他精彩内容

Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)相关推荐

  1. Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

    应用场景  前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的 ...

  2. Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

    应用场景  上一篇<Spring Cloud Stream消费失败后的处理策略(一):自动重试>介绍了默认就会生效的消息重试功能.对于一些因环境原因.网络抖动等不稳定因素引发的问题可以起到 ...

  3. Spring Cloud Stream消费失败后的处理策略(一):自动重试

    之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如: 如何处理消息重复消费? 如何消费自己生产的消息? 下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式. 不 ...

  4. 【Spring Cloud】OpenFeign和Spring Cloud Loadbalancer调用失败后的重试机制比较

    1 概述 搭建一个微服务系统,有两个服务,Client和Server,Server有三个实例A.B.C,我让Client调用Server,Loadbalancer负载分担默认采用轮询机制,当Serve ...

  5. Spring Cloud Stream 学习小清单

    由于最近一直在写Spring Cloud Stream相关的内容,在2018年最后一天,把之前写过的Spring Cloud Stream内容从基础,到入门,到深入,做一些小清单,方便大家查阅. 如果 ...

  6. Spring Cloud Stream核心原理介绍

    一.简介  目前,市面上消息中间件产品种类繁多,譬如RabbitMq,RocektMq,Kafka,Azure EventHub, Amazon Kenesis.各种中间件的原理.机制差异很大,但归根 ...

  7. Spring系列学习之Spring Cloud Stream App Starters 应用程序启动器

    英文原文:https://cloud.spring.io/spring-cloud-stream-app-starters/ 目录 Spring Cloud Stream App Starters 特 ...

  8. Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

    应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: ...

  9. Spring Cloud Stream如何消费自己生产的消息?

    在上一篇<Spring Cloud Stream如何处理消息重复消费?>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的 ...

最新文章

  1. Android 内存优化
  2. SpeedTree导入到虚幻UE4的注意事项
  3. 【听课笔记】2009 Google OpenSocial-CSDN开放平台交流会笔记
  4. 再次携号转网_湖北省通信管理局召开视频会议 再次强调携号转网服务要求
  5. 从Google Mesa到百度PALO(数仓)
  6. MySql DATE_FORMAT函数用法
  7. 栈应用:判断字符串中括号是否成对出现
  8. 你真的搞懂ES6模块的导入导出规则了吗
  9. MySQL设置mysqld_MySQL指定mysqld启动时所加载的配置文件
  10. java有趣的平方数,蓝桥杯——四数平方(2016JavaB第7题)
  11. Tensorflow2.0学习(八) — tf.dataset自定义图像数据集
  12. HDU2122 Ice_cream’s world III 【最小生成树】
  13. 小程序uv访客怎么刷_微信小程序获取访客数据-使用攻略
  14. Linux:DNS服务
  15. 2020年国内优秀原创IT技术书都在这了
  16. hdfs 元数据维护机制
  17. python的argparse模块add_argument详解
  18. 傲游研发中心在京成立
  19. python 创建空的numpy数组_真假美猴王-Numpy数据与Python数组的区别与联系
  20. http请求 响应数据格式

热门文章

  1. linux tcpdump monitor模式 抓不到包 解决办法
  2. linux centos 没有service命令 安装方法
  3. linux shell 字符串比较相等、不相等
  4. linux docker中gdb调试断点不停
  5. Android导入工程提示Invalid project description
  6. 利用Gallery和ImageView实现图片浏览器
  7. linux 信号 core,Shell 信号发送与捕捉
  8. 初中计算机实践研究计划,初中信息技术个人研修计划
  9. C++ - 编写一个从字符串转变成长整型的函数
  10. java获取jsp 组件,利用Observer模式解决组件间通信问题-JSP教程,Java技巧及代码