RocketMQ错误消息重试策略之Consumer的重试机制(Exception情况)
consumer端重试
消费者端的失败,分为2种情况,一个是exception,一个是timeout。
exception
消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
消息的状态:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//package org.apache.rocketmq.client.consumer.listener;public enum ConsumeConcurrentlyStatus {CONSUME_SUCCESS,RECONSUME_LATER;private ConsumeConcurrentlyStatus() {}
}
可以看到,消息的状态分为成功或者失败。如果返回的状态为失败会怎么样呢?
在启动broker的日志中可以看到这样的信息:
INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h
这个表示了,如果消息消费失败,那么消息将会在1s、5s、10s后重试,一直到2h后不再重试。
其实,有些时候并不需要重试这么多次,一般重试3~5次即可。这个时候就可以通过msg.getReconsumeTimes()获取重试次数进行控制。
package cn.learn.rocketmq.error;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class ConsumerDemo {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("LEARN_IM");consumer.setNamesrvAddr("localhost:9876");// 订阅topic,接收此Topic下的所有消息consumer.subscribe("my-test-topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->" + msgs);if(msgs.get(0).getReconsumeTimes() >= 3){// 重试3次后,不再进行重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();}}
RocketMQ错误消息重试策略之Consumer的重试机制(Exception情况)相关推荐
- RocketMQ错误消息重试策略之Consumer的重试机制(timeout情况)
timeout 比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止! 也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败, ...
- RocketMQ错误消息重试策略之重试情况的分析
- 【RocketMQ】消息重试、重试次数设置、死信队列
文章目录 1. 死信队列 1.1 死信特性 1.2 查看死信消息 2.重试次数参数 2.1 Producer端重试 2.2 Consumer端重试 3.1 异常重试 3.2 超时重试 参考 1. 死信 ...
- 从gRPC的重试策略说起
本文首发在 技术成长之道 博客,访问 hechen0.com 查看更多,或者微信搜索「技术成长之道」关注我的公众号,或者扫描下方二维码关注 公众号获得第一时间更新通知! 本文让你了解 重试解决什么问题 ...
- RabbitMQ消费失败重试策略、及重试策略应用场景详解
前言: RabbitMQ消费者一般情况下,如果消费失败出现异常,那么消费端默认是无限重试消费,这样就会带来非常不好的一个情况,就是陷入死循环,一直报错一直重试.所以我们需要对消费异常重试次数.重试间隔 ...
- 常用的重试技术—如何优雅的重试(Spring-Retry)等
背景 分布式环境下,重试是高可用技术中的一个部分,大家在调用RPC接口或者发送MQ时,针对可能会出现网络抖动请求超时情况采取一下重试操作,自己简单的编写重试大多不够优雅,而重试目前已有很多技术实现和框 ...
- rocketmq广播消息为什么不能重试_几分钟带你看懂“消息队列和RocketMQ”的入门总结
消息队列扫盲 消息队列顾名思义就是存放消息的队列,队列我就不解释了,别告诉我你连队列都不知道似啥吧? 所以问题并不是消息队列是什么,而是 消息队列为什么会出现?消息队列能用来干什么?用它来干这些事会带 ...
- RocketMQ重试策略及与Springboot整合
注:基础请参考上一篇文章RocketMQ简介与安装及入门 1. 重试策略 在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重试需要分2种,分别是pro ...
- RocketMQ之消息重试。
RocketMQ使用过程中,如何进行消息重试. 首先,我们需要明确,只有当消费模式为 MessageModel.CLUSTERING(集群模式) 时,Broker才会自动进行重试,对于广播消息是不会重 ...
最新文章
- 移动端最强适配(rem适配之px2rem) 移动端结合Vuex实现简单loading加载效果
- class没有发布到tomcat_基于Tomcat的Websocket范例及permessage-deflate扩展特性的研究
- docker 安装jenkins
- 【Shell】fix 1032报错信息的脚本
- golang 编写的在线redis 内存分析工具 rma4go
- Redis 巧用数据类型实现亿级数据统计
- cocos2d之json使用实例
- JSP 2.2规范 对jsp:useBean的解释
- FreeSwitch中的会议功能
- 服务器电源常见故障判断及处理方法
- docker阿里云镜像加速器
- 心灵终结不显示服务器,红色警戒2心灵终结3.0常见问题及解决方法
- 让IPFS星际文件系统永久保存你的数据
- 计算机考试画箭头,word绘图教程:画箭头、大括号、曲线等常用自选图形-word技巧-电脑技巧收藏家...
- 鸿雁召开智能家居新品发布会,智能面板等多款全屋智能新品亮相
- Discuz修改导读设置,显示更多热帖和精华帖
- 038 Divisible Subsequences
- ddr走线教程_DDR走线规则
- Stream流-分组操作
- 业务和商业才是技术的驱动力