rocketmq消息积压监控java代码实现
最近在做彩信下发,需要下发的内容是以消息的形式存放在rocektMQ,遇上彩信消息未下发的情况,需要实时去查各topic的消息积压量
1、启动时装配监控客户端的bean
@Component
public class MQAdminExtConfig {private static final Logger log = LoggerFactory.getLogger(MQAdminExtConfig.class);@Value("${rocketmq.name-server}")private String nameServer;public static DefaultMQAdminExt defaultMQAdminExt;/*** 启动监控客户端*/@PostConstructpublic void initMqAdminExtConfig(){//初始化一个生产者,用于初始化参数log.info("init rocketMQ monitoer client,nameServer:{}....",nameServer);try {DefaultMQProducer producer = new DefaultMQProducer("GRP_P_MSG_PRIORITY_HIGH_BeiJing_8000");producer.setNamesrvAddr(nameServer);producer.start();} catch (MQClientException e) {e.printStackTrace();}try {defaultMQAdminExt = new DefaultMQAdminExt();defaultMQAdminExt.setNamesrvAddr(nameServer);defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));defaultMQAdminExt.start();} catch (Exception e) {e.printStackTrace();}}
}
/*** * @param consumerGroup 消费者组* @param topic topic* @return 当前topic的积压量*/private static long getBackLogMsg(String consumerGroup,String topic){long diff=0;log.info("BacklogMonitorUtil--getBackLogMsg param:consumerGroup:{},topic:{} ",consumerGroup,topic);try {ConsumeStats consumeStats = MQAdminExtConfig.defaultMQAdminExt.examineConsumeStats(consumerGroup);List<MessageQueue> mqList = new LinkedList();mqList.addAll(consumeStats.getOffsetTable().keySet());Collections.sort(mqList);for(MessageQueue queue :mqList){if(topic.equals(queue.getTopic())){OffsetWrapper offsetWrapper = (OffsetWrapper)consumeStats.getOffsetTable().get(queue);log.info("getBrokerOffset----------------{}",offsetWrapper.getBrokerOffset());log.info("getConsumerOffset-----------------{}",offsetWrapper.getConsumerOffset());diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();}}} catch (Exception e) {//当消费者未消费时此除会报错diff=0;log.error("get offset error -----------------{}",e);}return diff;}
这里本来想探究一下为什么当消费者不消费时会报错,先把错误贴出来
上面报的是一个topic路由找不到的错误,且topic是%RETRY%开头的,但是通过查看源码发现defaultMQAdminExt.examineConsumeStats的实现类,查询的topic直接就是
关于%RETRY%开头的topic,
consumer 消费失败,会把消息重新发往 %RETRY% + consumerGroup,这个 retry 消息会在一定时间后,真实送到 retry topic。
但是这里为什么会直接去查 %RETRY% + consumerGroup,没有搞明白,后续再继续跟踪~,有知道的老哥可以在评论区写下答案,感谢
修改
真实使用过程中还是出现了一些问题,即某些情况下
diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
这个diff会返回0,这里直接修改成
diff=consumeStats.computeTotalDiff();
rocketmq消息积压监控java代码实现相关推荐
- 阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
面试官:RocketMQ 消息积压了,增加消费者有用吗? 我:这个要看具体的场景,不同的场景下情况是不一样的. 面试官:可以详细说一下吗? 我:如果消费者的数量小于 MessageQueue 的数量, ...
- stopwatch java_利用StopWatch类监控Java代码执行时间并分析性能
springframework中的StopWatch类可以测量一个时间间隔的运行时间,也可以测量多个时间间隔的总运行时间.一般用来测量代码执行所用的时间或者计算性能数据,在优化代码性能上可以使用Sto ...
- RocketMq消息积压、消息重复、消息完整、消息顺序处理方案
消息积压 当生产端的生产效率大于消费端的消费效率,就会造成消息处理不完的情况. 排查方式:RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积.收发失败等问题?_rocketmq监控指标_ ...
- Java Kafka 消费积压监控
Java Kafka 消费积压监控 后端代码: Monitor.java代码: package com.suncreate.kafkaConsumerMonitor.service;import co ...
- RocketMQ 消息结构和消息类型
发送消息的一方称为生产者,负责生产消息,一般由业务系统负责生产消息.一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器.RocketMQ 提供多种发送方式,同步发送.异步发送.顺序 ...
- 诊断神器Arthas,动态跟踪Java代码,实时监控JVM状态
简介 Arthas 是Alibaba开源的Java诊断工具,动态跟踪Java代码:实时监控JVM状态,可以在不中断程序执行的情况下轻松完成JVM相关问题排查工作 .支持JDK 6+,支持Linux/M ...
- java 创建topic,RocketMQ在Java代码之中手动创建Topic
Rocketmq在Java代码之中手动创建Topic [原创,转载请注明出处] 我的 [博客园主页] [CSDN主页] [简书主页] 加V进Java交流群,备注Java交流:w1129574379 * ...
- java代码控制qq、微信发送消息
对于程序QQ.微信的程序控制,本人并不是直操纵他官方提供的接口,而且直接借用github大佬开发的第三方辅助程序,开放出来接口供java代码调用,实现java代码控制qq.微信发消息的功能. 一.代码 ...
- mq补偿机制java代码_RocketMQ源码分析之消息消费机制-消费端消息负载均衡机制与重新分布 - Java 技术驿站-Java 技术驿站...
1.消息消费需要解决的问题 首先再次重复啰嗦一下RocketMQ消息消费的一些基本元素的关系 主题 ---> 消息队列(MessageQueue) 1 对多 主题 ----> 消息生产者, ...
最新文章
- 有存款,才能过得更踏实
- 谷歌40人发表59页长文:为何真实场景中ML模型表现不好?
- Jmeter(三)断言和关联
- JUnit4套件测试
- 一个标签的72变,打造一个纯CSS图标库
- 面向对象的程序设计特点
- 【硬见小百科】数字万用表的工作原理
- 2019泰迪杯C题案例分析-python大数据自动化数据挖掘
- 新手兼职也能月入5000的副业项目,几乎零门槛
- 【javase基础】第六篇:方法的重载与递归
- C语言数字图像处理---ZPHOTOENGINE算法库使用
- android修改自动背光,自动背光算法-Android 8.1
- 黑马程序员——JAVA集合
- 剑指offer笔记(七) 第47题至第53题
- SM30表维护自动更新值
- codegear的希望
- 尚学堂java300集飞机小游戏实战
- [项目管理-4]:软硬件项目管理 - 人月神话:项目时间管理(时间)
- PHP 实现大数据(30w量级)表格导出(导出excel) 提高效率,减少内存消耗,终极解决方案
- Iphone 免费申请App ID