应用场景 

前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略:

  • 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的成功率。

  • 自定义错误处理逻辑:如果业务上,消息处理失败之后有明确的降级逻辑可以弥补的,可以采用这种方式,但是2.0.x版本有Bug,2.1.x版本修复。

那么如果代码本身存在逻辑错误,无论重试多少次都不可能成功,也没有具体的降级业务逻辑,之前在深入思考中讨论过,可以通过日志,或者降级逻辑记录的方式把错误消息保存下来,然后事后分析、修复Bug再重新处理。但是很显然,这样做非常原始,并且太过笨拙,处理复杂度过高。所以,本文将介绍利用中间件特性来便捷地处理该问题的方案:使用RabbitMQ的DLQ队列。

动手试试 

准备一个会消费失败的例子,可以直接沿用前文的工程。也可以新建一个,然后创建如下代码的逻辑:

  1. @EnableBinding(TestApplication.TestTopic.class)

  2. @SpringBootApplication

  3. public class TestApplication {

  4.    public static void main(String[] args) {

  5.        SpringApplication.run(TestApplication.class, args);

  6.    }

  7.    @RestController

  8.    static class TestController {

  9.        @Autowired

  10.        private TestTopic testTopic;

  11.        /**

  12.         * 消息生产接口

  13.         *

  14.         * @param message

  15.         * @return

  16.         */

  17.        @GetMapping("/sendMessage")

  18.        public String messageWithMQ(@RequestParam String message) {

  19.            testTopic.output().send(MessageBuilder.withPayload(message).build());

  20.            return "ok";

  21.        }

  22.    }

  23.    /**

  24.     * 消息消费逻辑

  25.     */

  26.    @Slf4j

  27.    @Component

  28.    static class TestListener {

  29.        @StreamListener(TestTopic.INPUT)

  30.        public void receive(String payload) {

  31.            log.info("Received payload : " + payload);

  32.            throw new RuntimeException("Message consumer failed!");

  33.        }

  34.    }

  35.    interface TestTopic {

  36.        String OUTPUT = "example-topic-output";

  37.        String INPUT = "example-topic-input";

  38.        @Output(OUTPUT)

  39.        MessageChannel output();

  40.        @Input(INPUT)

  41.        SubscribableChannel input();

  42.    }

  43. }

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),并设置一下分组,比如:

  1. spring.cloud.stream.bindings.example-topic-input.destination=test-topic

  2. spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler

  3. spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

  4. spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true

  5. spring.cloud.stream.bindings.example-topic-output.destination=test-topic

这里加入了一个重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true,用来开启DLQ(死信队列)。完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到消费失败后抛出了异常,消息消费失败,记录了日志。此时,可以查看RabbitMQ的控制台如下:

其中,test-topic.stream-exception-handler.dlq队列就是test-topic.stream-exception-handler的dlq(死信)队列,当test-topic.stream-exception-handler队列中的消息消费时候之后,就会将这条消息原封不动的转存到dlq队列中。这样这些没有得到妥善处理的消息就通过简单的配置实现了存储,之后,我们还可以通过简单的操作对这些消息进行重新消费。我们只需要在控制台中点击test-topic.stream-exception-handler.dlq队列的名字进入到详情页面之后,使用Move messages功能,直接将这些消息移动回test-topic.stream-exception-handler队列,这样这些消息就能重新被消费一次。

如果Move messages功能中是如下内容:

  1. To move messages, the shovel plugin must be enabled, try:

  2. $ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

那是由于没有安装对应的插件,只需要根据提示的命令安装就能使用该命令了。

深入思考 

先来总结一下在引入了RabbitMQ的DLQ之后,对于消息异常处理更为完整一些的基本思路:

  1. 瞬时的环境抖动引起的异常,利用重试功能提高处理成功率

  2. 如果重试依然失败的,日志报错,并进入DLQ队列 日志告警通知相关开发人员,分析问题原因

  3. 解决问题(修复程序Bug、扩容等措施)之后,DLQ队列中的消息移回重新处理

  4. 在这样的整体思路中,可能还涉及一些微调,这里举几个常见例子,帮助读者进一步了解一些特殊的场景和配置使用!

场景一:有些消息在业务上存在时效性,进入死信队列之后,过一段时间再处理已经没有意义,这个时候如何过滤这些消息呢?

只需要配置一个参数即可:

  1. spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000

该参数可以控制DLQ队列中消息的存活时间,当超过配置时间之后,该消息会自动的从DLQ队列中移除。

场景二:可能进入DLQ队列的消息存在各种不同的原因(不同异常造成的),此时如果在做补救措施的时候,还希望根据这些异常做不同的处理时候,我们如何区分这些消息进入DLQ的原因呢?

再来看看这个参数:

  1. spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true

该参数默认是false,如果设置了死信队列的时候,会将消息原封不动的发送到死信队列(也就是上面例子中的实现),此时大家可以在RabbitMQ控制台中通过Get message(s)功能来看看队列中的消息,应该如下图所示:

这是一条原始消息。

如果我们该配置设置为true的时候,那么该消息在进入到死信队列的时候,会在headers中加入错误信息,如下图所示:

这样,不论我们是通过移回原通道处理还是新增订阅处理这些消息的时候就可以以此作为依据进行分类型处理了。

关于RabbitMQ的binder中还有很多关于DLQ的配置,这里不一一介绍了,上面几个是目前笔者使用过的几个,其他一些暂时认为采用默认配置已经够用,除非还有其他定制要求,或者是存量内容,需要去适配才会去配置。读者可以查看官方文档了解更多详情!

·END·

 近期热文:

  • 面试题:如何求根号2

  • Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

  • PolarDB数据库性能大赛:95后徐妈的经验分享

  • 自己写分布式配置中心(上篇)- 单机模式

  • Spring Cloud Stream消费失败后的处理策略(一):自动重试

  • Redis几个重要的健康指标

关注我

点击“阅读原文”,看本号其他精彩内容

Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)相关推荐

  1. Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

    应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍Rab ...

  2. Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

    应用场景  上一篇<Spring Cloud Stream消费失败后的处理策略(一):自动重试>介绍了默认就会生效的消息重试功能.对于一些因环境原因.网络抖动等不稳定因素引发的问题可以起到 ...

  3. Spring Cloud Stream消费失败后的处理策略(一):自动重试

    之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如: 如何处理消息重复消费? 如何消费自己生产的消息? 下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式. 不 ...

  4. 【Spring Cloud】OpenFeign和Spring Cloud Loadbalancer调用失败后的重试机制比较

    1 概述 搭建一个微服务系统,有两个服务,Client和Server,Server有三个实例A.B.C,我让Client调用Server,Loadbalancer负载分担默认采用轮询机制,当Serve ...

  5. Spring Cloud Stream 学习小清单

    由于最近一直在写Spring Cloud Stream相关的内容,在2018年最后一天,把之前写过的Spring Cloud Stream内容从基础,到入门,到深入,做一些小清单,方便大家查阅. 如果 ...

  6. Spring Cloud Stream中文翻译

    Ditmars.RELEASE 1.Spring Cloud Stream 介绍 Spring Cloud Stream是一个用于构建消息驱动应用的微服务框架.Spring Cloud Stream基 ...

  7. 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务

    ←←←←←←←←←←←← 快!点关注 让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务.首先,Spring Cloud Stream首先有什么好处?因为Spring AM ...

  8. Spring系列学习之Spring Cloud Stream App Starters 应用程序启动器

    英文原文:https://cloud.spring.io/spring-cloud-stream-app-starters/ 目录 Spring Cloud Stream App Starters 特 ...

  9. Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

    应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: ...

最新文章

  1. oracl 、mysql在线查看文档
  2. CentOS7上配置ELK
  3. IE打印控件推荐-4fang pazu
  4. ExtJS 的工具条及菜单
  5. Java设计模式(八):外观设计模式
  6. 【LeetCode】LC1672. 最富有客户的资产总量
  7. Spark On K8S 在有赞的实践与经验
  8. 8086汇编贪吃蛇(随机食物+速度递增)
  9. 代码,绘画,设计常用的颜色名称-16进制HEX编码-RGB编码 对照一览表
  10. vue watch 第一次不执行_Vue 实现前进刷新,后退不刷新的效果
  11. arcgis选出点规定范围的面
  12. jquery ui autocomplete输入中文不自动完成的问题
  13. 精通Android自定义View(八)绘制篇Canvas分析之绘制文本
  14. android studio moudel,Android Studio将module变为library
  15. html 图片隐藏 一部分,如何在HTML / CSS中仅显示图像的一部分?
  16. 电商基础(一):跳出率和退出率
  17. POJ--3268 Silver Cow Party(最短路)
  18. html中如何设置艺术字体,html里怎么把字体变成艺术字
  19. 虚拟机查看HWADDR(即MAC)地址
  20. C语言程序设计笔记(浙大翁恺版) 第二周:计算

热门文章

  1. golang web 框架 gin beego iris 对比
  2. cve-2016-6662 mysql远程代码执行/权限提升 漏洞
  3. C语言assert的用法
  4. 搭建Linux0.11系统环境
  5. Openstack-mitakaCentos7.2双节点搭建--(一)基础服务搭建
  6. Deep Learning and Shallow Learning
  7. php session域名共享,实现多域名下共用一个SESSION
  8. java一段时间后执行一块代码_java自带的ScheduledExecutorService定时任务正常执行一段时间后部分任务不执行...
  9. Design Pattern Strategy C
  10. mysql5.5数据库操作_命令行下mysql数据库基本操作