文章目录

  • 一、 Producer端重试
  • 二、 Consumer端重试
    • 1. Exception
    • 2. Timeout
  • 总结

对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试。

一、 Producer端重试

生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:

/*** Producer,发送消息*/
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("group_name");producer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");producer.setRetryTimesWhenSendFailed(3);producer.start();for (int i = 0; i < 100; i++) {try {Message msg = new Message("TopicTest",               // topic"TagA",                                   // tag("HelloWorld - RocketMQ" + i).getBytes()   // body);SendResult sendResult = producer.send(msg, 1000);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}

*生产者端失败重试*

上图代码示例的处理手段是:如果该条消息在1S内没有发送成功,那么重试3次。

producer.setRetryTimesWhenSendFailed(3); //失败的情况重发3次
producer.send(msg, 1000); //消息在1S内没有发送成功,就会重试


二、 Consumer端重试

消费者端的失败,分为2种情况,一个是exception,一个是timeout。

1. Exception

消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
这里涉及到一些问题,需要我们思考下,比如,消费者消费消息的状态有哪些定义?如果失败,MQ将采取什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,还是1条呢?而且在重试的过程中,需要保证不重复消费吗?

public enum ConsumeConcurrentlyStatus {/*** Success consumption*/CONSUME_SUCCESS,/*** Failure consumption,later try to consume*/RECONSUME_LATER;
}

*ConsumeConcurrentlyStatus枚举的源码*

通过查看源码,消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)

RECONSUME_LATER的策略

在启动broker的过程中,可以观察到上图日志,你会发现RECONSUME_LATER的策略:如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,…直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了!
RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要再重试呢,因为重试解决不了问题了!这该如何做呢?
看一段代码:

/*** Consumer,订阅消息*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {MessageExt msg = msgs.get(0);String msgbody = new String(msg.getBody(), "utf-8");System.out.println(msgbody + " Receive New Messages: " + msgs);if (msgbody.equals("HelloWorld - RocketMQ4")) {System.out.println("======错误=======");int a = 1 / 0;}} catch (Exception e) {e.printStackTrace();if (msgs.get(0).getReconsumeTimes() == 3) {// 该条消息可以存储到DB或者LOG日志中,或其他处理方式return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功} else {return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}

RECONSUME_LATER的重试测试代码

生产端发送了10条消息,看一下消费端的运行效果:

RECONSUME_LATER的重试效果

观察上图发现,HelloWorld - RocketMQ4的消息的***reconsumeTimes***属性值发生了变化,其实这个属性就代表了消息重试的次数!因此我们可以通过reconsumeTimes属性,让MQ超过了多少次之后让他不再重试,而是记录日志等处理,也就是上面代码catch中的内容。

2. Timeout

比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(比如集群中一个broker失败,就尝试另一个broker)
延续Exception的思路,也就是消费端没有给RocketMQ返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS或return ConsumeConcurrentlyStatus.RECONSUME_LATER,这样的就认为没有到达Consumer端。
下面进行模拟:

1)消费端有consumer1和consumer2这样一个集群。
2)consumer1端的业务代码中暂停1分钟并且不发送接收状态给RocketMQ。

public class Consumer1 {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("Topic1", "Tag1 || Tag2 || Tag3");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {                    String topic = msg.getTopic();String msgBody = new String(msg.getBody(),"utf-8");String tags = msg.getTags();System.out.println("收到消息:" + " topic:" + topic + " ,tags:" + tags + " ,msg:" + msgBody);// 表示业务处理时间System.out.println("=========开始暂停==========");Thread.sleep(60000);}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}

Consumer1端Timeout异常测试代码

3)启动consumer1和consumer2。
4)启动Producer,只发送一条数据。
看一下此时consumer1和consumer2的运行结果:

Consumer1

Consumer2-未接收到消息

发现consumer1接收到消息并且暂停,consumer2未接收到消息。

5)关闭consumer1。
观察consumer2的运行结果:

Consumer2-接收到消息

总结

Producer端没什么好说的,Consumer端值得注意。对于消费消息而言,存在2种指定的状态(成功 OR 失败重试),如果一条消息在消费端处理没有返回这2个状态,那么相当于这条消息没有达到消费者,势必会再次发送给消费者!也即是消息的处理必须有返回值,否则就进行重发。

RocketMQ(四)——消息重试相关推荐

  1. RocketMQ之消息重试。

    RocketMQ使用过程中,如何进行消息重试. 首先,我们需要明确,只有当消费模式为 MessageModel.CLUSTERING(集群模式) 时,Broker才会自动进行重试,对于广播消息是不会重 ...

  2. 【RocketMQ】消息重试、重试次数设置、死信队列

    文章目录 1. 死信队列 1.1 死信特性 1.2 查看死信消息 2.重试次数参数 2.1 Producer端重试 2.2 Consumer端重试 3.1 异常重试 3.2 超时重试 参考 1. 死信 ...

  3. RocketMQ错误消息重试策略之Consumer的重试机制(Exception情况)

    consumer端重试 消费者端的失败,分为2种情况,一个是exception,一个是timeout. exception 消息正常的到了消费者,结果消费者发生异常,处理失败了.例如反序列化失败,消息 ...

  4. RocketMQ错误消息重试策略之Consumer的重试机制(timeout情况)

    timeout 比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止! 也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败, ...

  5. RocketMQ错误消息重试策略之重试情况的分析

  6. RocketMQ的消费者消息重试和生产者消息重投

    详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分. 文章目录 1 生产者重试 2 消费者重试 2.1 异常重试 2.1.1 并发消费的重试 2 ...

  7. 深入理解RocketMQ延迟消息

    延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...

  8. rocketmq广播消息为什么不能重试_几分钟带你看懂“消息队列和RocketMQ”的入门总结

    消息队列扫盲 消息队列顾名思义就是存放消息的队列,队列我就不解释了,别告诉我你连队列都不知道似啥吧? 所以问题并不是消息队列是什么,而是 消息队列为什么会出现?消息队列能用来干什么?用它来干这些事会带 ...

  9. RocketMQ同步消息、异步消息、单向消息详解

    一.RocketMQ 支持 3 种消息发送方式 : 1.同步消息(sync message ) producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送 ...

最新文章

  1. 2021年春季学期-信号与系统-第七次作业参考答案-第八小题
  2. 如何成为领袖? 学习任正非小沃森郭士纳
  3. Kafka解惑之Old Producer(1)—— Beginning
  4. KMP字符串匹配算法理解(转)
  5. 差分隐私与可穿戴式设备调查【笔记】
  6. python 冒泡排序 时间复杂度
  7. camera(16)---双摄持续扩散 摄像头供应链阵营变动加剧
  8. python经典实例-Python机器学习经典实例
  9. 转载:eclipse 搭建SSH项目(第二篇,有具体的项目例子)
  10. 3dmax2020下载3dmax2020下载安装详细教程
  11. 如何用Java解压缩WAR文件
  12. for循环及判断语句的20个经典习题
  13. xp计算机无法正常启动,修复XP系统无法正常启动进入不了计算机的解决办法
  14. android 自动测光,Android Camera1中的对焦与测光
  15. 前端网页水印实现方法
  16. 面试-数据知识点准备
  17. 写给女儿高中编程课老师的一封信
  18. 加速linux开机速度 systemctl
  19. 用循环不定式来证明冒泡排序的正确性
  20. 【数据结构】— 「时间复杂度」与「空间复杂度」

热门文章

  1. jquery 这些小技巧你懂吗
  2. 电动汽车有序充电调度优化
  3. 架构师之路18年精选100篇
  4. OpenCV检测和追踪车辆
  5. Lex Yacc (一) 入门
  6. 每日自动签到工具的制作思路
  7. matlab中继电器叫什么,解析汽车继电器中继电器各脚的区别及接线
  8. 【设计模式】2、创建型模式
  9. ubuntu1804搭建git服务器
  10. dell 服务器硬盘failed,DELL服务器PERC-5-6RAID操作系统下诊断硬盘故障(tty)手册.pdf