RocketMQ(四)——消息重试
文章目录
- 一、 Producer端重试
- 二、 Consumer端重试
- 1. Exception
- 2. Timeout
- 总结
对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试。
一、 Producer端重试
生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:
/*** Producer,发送消息*/
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("group_name");producer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");producer.setRetryTimesWhenSendFailed(3);producer.start();for (int i = 0; i < 100; i++) {try {Message msg = new Message("TopicTest", // topic"TagA", // tag("HelloWorld - RocketMQ" + i).getBytes() // body);SendResult sendResult = producer.send(msg, 1000);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
*生产者端失败重试*
上图代码示例的处理手段是:如果该条消息在1S内没有发送成功,那么重试3次。
producer.setRetryTimesWhenSendFailed(3); //失败的情况重发3次
producer.send(msg, 1000); //消息在1S内没有发送成功,就会重试
二、 Consumer端重试
消费者端的失败,分为2种情况,一个是exception,一个是timeout。
1. Exception
消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
这里涉及到一些问题,需要我们思考下,比如,消费者消费消息的状态有哪些定义?如果失败,MQ将采取什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,还是1条呢?而且在重试的过程中,需要保证不重复消费吗?
public enum ConsumeConcurrentlyStatus {/*** Success consumption*/CONSUME_SUCCESS,/*** Failure consumption,later try to consume*/RECONSUME_LATER;
}
*ConsumeConcurrentlyStatus枚举的源码*
通过查看源码,消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)
RECONSUME_LATER的策略
在启动broker的过程中,可以观察到上图日志,你会发现RECONSUME_LATER的策略:如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,…直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了!
RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要再重试呢,因为重试解决不了问题了!这该如何做呢?
看一段代码:
/*** Consumer,订阅消息*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {MessageExt msg = msgs.get(0);String msgbody = new String(msg.getBody(), "utf-8");System.out.println(msgbody + " Receive New Messages: " + msgs);if (msgbody.equals("HelloWorld - RocketMQ4")) {System.out.println("======错误=======");int a = 1 / 0;}} catch (Exception e) {e.printStackTrace();if (msgs.get(0).getReconsumeTimes() == 3) {// 该条消息可以存储到DB或者LOG日志中,或其他处理方式return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功} else {return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}
RECONSUME_LATER的重试测试代码
生产端发送了10条消息,看一下消费端的运行效果:
RECONSUME_LATER的重试效果
观察上图发现,HelloWorld - RocketMQ4的消息的***reconsumeTimes***属性值发生了变化,其实这个属性就代表了消息重试的次数!因此我们可以通过reconsumeTimes属性,让MQ超过了多少次之后让他不再重试,而是记录日志等处理,也就是上面代码catch中的内容。
2. Timeout
比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(比如集群中一个broker失败,就尝试另一个broker)
延续Exception的思路,也就是消费端没有给RocketMQ返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS或return ConsumeConcurrentlyStatus.RECONSUME_LATER,这样的就认为没有到达Consumer端。
下面进行模拟:
1)消费端有consumer1和consumer2这样一个集群。
2)consumer1端的业务代码中暂停1分钟并且不发送接收状态给RocketMQ。
public class Consumer1 {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("Topic1", "Tag1 || Tag2 || Tag3");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try { String topic = msg.getTopic();String msgBody = new String(msg.getBody(),"utf-8");String tags = msg.getTags();System.out.println("收到消息:" + " topic:" + topic + " ,tags:" + tags + " ,msg:" + msgBody);// 表示业务处理时间System.out.println("=========开始暂停==========");Thread.sleep(60000);}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}
Consumer1端Timeout异常测试代码
3)启动consumer1和consumer2。
4)启动Producer,只发送一条数据。
看一下此时consumer1和consumer2的运行结果:
Consumer1
Consumer2-未接收到消息
发现consumer1接收到消息并且暂停,consumer2未接收到消息。
5)关闭consumer1。
观察consumer2的运行结果:
Consumer2-接收到消息
总结
Producer端没什么好说的,Consumer端值得注意。对于消费消息而言,存在2种指定的状态(成功 OR 失败重试),如果一条消息在消费端处理没有返回这2个状态,那么相当于这条消息没有达到消费者,势必会再次发送给消费者!也即是消息的处理必须有返回值,否则就进行重发。
RocketMQ(四)——消息重试相关推荐
- RocketMQ之消息重试。
RocketMQ使用过程中,如何进行消息重试. 首先,我们需要明确,只有当消费模式为 MessageModel.CLUSTERING(集群模式) 时,Broker才会自动进行重试,对于广播消息是不会重 ...
- 【RocketMQ】消息重试、重试次数设置、死信队列
文章目录 1. 死信队列 1.1 死信特性 1.2 查看死信消息 2.重试次数参数 2.1 Producer端重试 2.2 Consumer端重试 3.1 异常重试 3.2 超时重试 参考 1. 死信 ...
- RocketMQ错误消息重试策略之Consumer的重试机制(Exception情况)
consumer端重试 消费者端的失败,分为2种情况,一个是exception,一个是timeout. exception 消息正常的到了消费者,结果消费者发生异常,处理失败了.例如反序列化失败,消息 ...
- RocketMQ错误消息重试策略之Consumer的重试机制(timeout情况)
timeout 比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止! 也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败, ...
- RocketMQ错误消息重试策略之重试情况的分析
- RocketMQ的消费者消息重试和生产者消息重投
详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分. 文章目录 1 生产者重试 2 消费者重试 2.1 异常重试 2.1.1 并发消费的重试 2 ...
- 深入理解RocketMQ延迟消息
延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...
- rocketmq广播消息为什么不能重试_几分钟带你看懂“消息队列和RocketMQ”的入门总结
消息队列扫盲 消息队列顾名思义就是存放消息的队列,队列我就不解释了,别告诉我你连队列都不知道似啥吧? 所以问题并不是消息队列是什么,而是 消息队列为什么会出现?消息队列能用来干什么?用它来干这些事会带 ...
- RocketMQ同步消息、异步消息、单向消息详解
一.RocketMQ 支持 3 种消息发送方式 : 1.同步消息(sync message ) producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送 ...
最新文章
- 2021年春季学期-信号与系统-第七次作业参考答案-第八小题
- 如何成为领袖? 学习任正非小沃森郭士纳
- Kafka解惑之Old Producer(1)—— Beginning
- KMP字符串匹配算法理解(转)
- 差分隐私与可穿戴式设备调查【笔记】
- python 冒泡排序 时间复杂度
- camera(16)---双摄持续扩散 摄像头供应链阵营变动加剧
- python经典实例-Python机器学习经典实例
- 转载:eclipse 搭建SSH项目(第二篇,有具体的项目例子)
- 3dmax2020下载3dmax2020下载安装详细教程
- 如何用Java解压缩WAR文件
- for循环及判断语句的20个经典习题
- xp计算机无法正常启动,修复XP系统无法正常启动进入不了计算机的解决办法
- android 自动测光,Android Camera1中的对焦与测光
- 前端网页水印实现方法
- 面试-数据知识点准备
- 写给女儿高中编程课老师的一封信
- 加速linux开机速度 systemctl
- 用循环不定式来证明冒泡排序的正确性
- 【数据结构】— 「时间复杂度」与「空间复杂度」