之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如:

如何处理消息重复消费

如何消费自己生产的消息

下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式。

不过不论哪种方式,都需要与具体业务结合,解决不同业务场景可能出现的问题。

今天第一节,介绍一下Spring Cloud Stream中默认就已经配置了的一个异常解决方案:重试!

应用场景 

依然要明确一点,任何解决方案都要结合具体的业务实现来确定,不要有了锤子看什么问题都是钉子。那么重试可以解决什么问题呢?由于重试的基础逻辑并不会改变,所以通常重试只能解决因环境不稳定等外在因素导致的失败情况,比如:当我们接收到某个消息之后,需要调用一个外部的Web Service做一些事情,这个时候如果与外部系统的网络出现了抖动,导致调用失败而抛出异常。这个时候,通过重试消息消费的具体逻辑,可能在下一次调用的时候,就能完成整合业务动作,从而解决刚才所述的问题。

动手试试 

先通过一个小例子来看看Spring Cloud Stream默认的重试机制是如何运作的。之前在如何消费自己生产的消息一文中的例子,我们可以继续沿用,或者也可以精简一些,都写到一个主类中,比如下面这样:

  1. @EnableBinding(TestApplication.TestTopic.class)

  2. @SpringBootApplication

  3. public class TestApplication {

  4.    public static void main(String[] args) {

  5.        SpringApplication.run(TestApplication.class, args);

  6.    }

  7.    @RestController

  8.    static class TestController {

  9.        @Autowired

  10.        private TestTopic testTopic;

  11.        /**

  12.         * 消息生产接口

  13.         * @param message

  14.         * @return

  15.         */

  16.        @GetMapping("/sendMessage")

  17.        public String messageWithMQ(@RequestParam String message) {

  18.            testTopic.output().send(MessageBuilder.withPayload(message).build());

  19.            return "ok";

  20.        }

  21.    }

  22.    /**

  23.     * 消息消费逻辑

  24.     */

  25.    @Slf4j

  26.    @Component

  27.    static class TestListener {

  28.        @StreamListener(TestTopic.INPUT)

  29.        public void receive(String payload) {

  30.            log.info("Received: " + payload);

  31.            throw new RuntimeException("Message consumer failed!");

  32.        }

  33.    }

  34.    interface TestTopic {

  35.        String OUTPUT = "example-topic-output";

  36.        String INPUT = "example-topic-input";

  37.        @Output(OUTPUT)

  38.        MessageChannel output();

  39.        @Input(INPUT)

  40.        SubscribableChannel input();

  41.    }

  42. }

内容很简单,既包含了消息的生产,也包含了消息消费。与之前例子不同的就是在消息消费逻辑中,主动的抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

  1. spring.cloud.stream.bindings.example-topic-input.destination=test-topic

  2. spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

  1. 2018-12-10 11:20:21.345  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello

  2. 2018-12-10 11:20:22.350  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello

  3. 2018-12-10 11:20:24.354  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello

  4. 2018-12-10 11:20:54.651 ERROR 30499 --- [w2p2yKethOsqg-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.didispace.stream.TestApplication$TestListener#receive[1 args]; nested exception is java.lang.RuntimeException: Message consumer failed!, failedMessage=GenericMessage [payload=byte[5], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=test-topic, amqp_receivedExchange=test-topic, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=test-topic.anonymous.EuqBJu66Qw2p2yKethOsqg, amqp_redelivered=false, id=a89adf96-7de2-f29d-20b6-2fcb0c64cd8c, amqp_consumerTag=amq.ctag-XFy6vXU2w4RB_NRBzImWTA, contentType=application/json, timestamp=1544412051638}]

  5.    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)

  6.    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)

  7.    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)

  8.    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)

  9.    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)

  10.    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)

  11.    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)

  12.    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)

  13.    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)

  14.    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)

  15.    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)

  16.    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)

  17.    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)

  18.    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)

  19.    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:60)

  20.    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:214)

  21.    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)

  22.    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)

  23.    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:211)

  24.    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)

  25.    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)

  26.    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)

  27.    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)

  28.    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)

  29.    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)

  30.    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)

  31.    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)

  32.    at java.lang.Thread.run(Thread.java:748)

  33. Caused by: java.lang.RuntimeException: Message consumer failed!

  34.    at com.didispace.stream.TestApplication$TestListener.receive(TestApplication.java:65)

  35.    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

  36.    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

  37.    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

  38.    at java.lang.reflect.Method.invoke(Method.java:498)

  39.    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)

  40.    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)

  41.    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)

  42.    ... 27 more

从日志中可以看到,一共输出了三次Received: hello,也就是说消息消费逻辑执行了3次,然后抛出了最终执行失败的异常。

设置重复次数 默认情况下Spring Cloud Stream会重试3次,我们也可以通过配置的方式修改这个默认配置,比如下面的配置可以将重试次数调整为1次:

  1. spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

对于一些纯内部计算逻辑,不需要依赖外部环境,如果出错通常是代码逻辑错误的情况下,不论我们如何重试都会继续错误的业务逻辑可以将该参数设置为0,避免不必要的重试影响消息处理的速度。

深入思考 

完成了上面的基础尝试之后,再思考下面两个问题:

问题一:如果在重试过程中消息处理成功了,还会有异常信息吗?

答案是不会。因为重试过程是消息处理的一个整体,如果某一次重试成功了,会任务对所收到消息的消费成功了。

这个问题可以在上述例子中做一些小改动来验证,比如:

  1. @Slf4j

  2. @Component

  3. static class TestListener {

  4.    int counter = 1;

  5.    @StreamListener(TestTopic.INPUT)

  6.    public void receive(String payload) {

  7.        log.info("Received: " + payload + ", " + counter);

  8.        if (counter == 3) {

  9.            counter = 1;

  10.            return;

  11.        } else {

  12.            counter++;

  13.            throw new RuntimeException("Message consumer failed!");

  14.        }

  15.    }

  16. }

通过加入一个计数器,当重试是第3次的时候,不抛出异常来模拟消费逻辑处理成功了。此时重新运行程序,并调用接口localhost:8080/sendMessage?message=hello,可以获得如下日志结果,并没有异常打印出来。

  1. 2018-12-10 16:07:38.390  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 1

  2. 2018-12-10 16:07:39.398  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 2

  3. 2018-12-10 16:07:41.402  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 3

也就是,虽然前两次消费抛出了异常,但是并不影响最终的结果,也不会打印中间过程的异常,避免了对日志告警产生误报等问题。


问题二:如果重试都失败之后应该怎么办呢?

如果消息在重试了还是失败之后,目前的配置唯一能做的就是将异常信息记录下来,进行告警。由于日志中有消息的消息信息描述,所以应用维护者可以根据这些信息来做一些补救措施。

当然,这样的做法显然不是最好的,因为太过麻烦。那么怎么做才好呢?

扫描下方二维码关注我,且听下回分解!

·END·

 近期热文:

  • Redis几个重要的健康指标

  • 面试投行的20个Java问题

  • 持久化DDD聚合

  • 全面了解 Nginx 到底能做什么

  • Spring Cloud Netflix Zuul中的速率限制

  • 从内部自用到对外服务,配置管理的演进和设计优化实践

  • Spring Cloud Data Flow 中的 ETL

关注我

点击“阅读原文”,看本号其他精彩内容

Spring Cloud Stream消费失败后的处理策略(一):自动重试相关推荐

  1. Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

    应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍Rab ...

  2. Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

    应用场景  前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的 ...

  3. Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

    应用场景  上一篇<Spring Cloud Stream消费失败后的处理策略(一):自动重试>介绍了默认就会生效的消息重试功能.对于一些因环境原因.网络抖动等不稳定因素引发的问题可以起到 ...

  4. 【Spring Cloud】OpenFeign和Spring Cloud Loadbalancer调用失败后的重试机制比较

    1 概述 搭建一个微服务系统,有两个服务,Client和Server,Server有三个实例A.B.C,我让Client调用Server,Loadbalancer负载分担默认采用轮询机制,当Serve ...

  5. Spring Cloud Stream 学习小清单

    由于最近一直在写Spring Cloud Stream相关的内容,在2018年最后一天,把之前写过的Spring Cloud Stream内容从基础,到入门,到深入,做一些小清单,方便大家查阅. 如果 ...

  6. Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

    应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: ...

  7. Spring Cloud Stream如何消费自己生产的消息?

    在上一篇<Spring Cloud Stream如何处理消息重复消费?>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的 ...

  8. Spring Cloud Stream如何消费自己生产的消息

    在上一篇<Spring Cloud Stream如何处理消息重复消费>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的问 ...

  9. Spring Cloud Stream如何处理消息重复消费

    最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题.通过沟通与排查下来主要还是用户对消费组的认识不够.其实,在之前的博文 ...

最新文章

  1. linux磁盘怎么分区比较好,500G的硬盘,怎么分区比较合理?
  2. 6张脑图系统讲透python爬虫和数据分析、数据挖掘
  3. sdn体系的三个平面_软件定义网络基础---SDN控制平面
  4. java akka 实战_Akka实战:分散、聚合模式
  5. Flask-SQLAlchemy 对数据库的增查改删
  6. python设计模式7-桥接模式
  7. 三维坐标 偏转_三维坐标变换原理-平移, 旋转, 缩放
  8. annotationprocessor 提示找不到类_StackOverflow上87万访问量的问题:什么是“找不到符号”?...
  9. spring配合Junit进行单元测试
  10. 51Nod-1018 排序【排序】
  11. 服装设计与工程_百度百科
  12. 数学建模 - 时间序列分析
  13. SPSS-论文常用格式-三线表
  14. Python 给图片加文字或logo水印(附代码) | Python工具
  15. WIN10下没有NVIDIA控制面板的解决办法
  16. 数据挖掘实验二结果(构建cube的三个维度,即三个txt,然后做各种查询)C++实现(代码调试环境为Windows下的CLion使用WSL的Linux)
  17. 【JY】45天缩短到4天,突发性Fluent仿真任务怎么破?
  18. YOLOV5训练代码train.py注释与解析
  19. 讲一个故事:Redis的默认端口是6379
  20. 火狐浏览器打不开,但是进程中有,怎么办?

热门文章

  1. Native wifi API使用
  2. TCHAR char wchar_t PTSTR PCSTR printf() wprintf()——_tprintf()解析
  3. 计算机系统的搭建步骤,电脑搭建Node.js开发环境的操作教程[多图]
  4. android开发读书笔记,android开发权威指南读书笔记
  5. NeHe教程Qt实现——lesson13
  6. 监视mysql 哪些指标_MySQL 监控指标
  7. 机器人x展架制作_门型展架80*180易拉宝x展架海报架广告架立牌展示架地推海报2元优惠券券后价20元...
  8. 陈老师Linux内核概述导学
  9. 装有linux的硬盘装到不同主机,把ubuntu系统安装进移动硬盘,可在不同电脑上运行...
  10. 51单片机串行口c语言编程,51单片机串口通信c语言编程