最近在做彩信下发,需要下发的内容是以消息的形式存放在rocektMQ,遇上彩信消息未下发的情况,需要实时去查各topic的消息积压量

1、启动时装配监控客户端的bean

@Component
public class MQAdminExtConfig {private static final Logger log = LoggerFactory.getLogger(MQAdminExtConfig.class);@Value("${rocketmq.name-server}")private String nameServer;public static DefaultMQAdminExt  defaultMQAdminExt;/*** 启动监控客户端*/@PostConstructpublic void initMqAdminExtConfig(){//初始化一个生产者,用于初始化参数log.info("init rocketMQ monitoer client,nameServer:{}....",nameServer);try {DefaultMQProducer producer = new DefaultMQProducer("GRP_P_MSG_PRIORITY_HIGH_BeiJing_8000");producer.setNamesrvAddr(nameServer);producer.start();} catch (MQClientException e) {e.printStackTrace();}try {defaultMQAdminExt = new DefaultMQAdminExt();defaultMQAdminExt.setNamesrvAddr(nameServer);defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));defaultMQAdminExt.start();} catch (Exception e) {e.printStackTrace();}}
}
  /*** * @param consumerGroup    消费者组* @param topic           topic* @return      当前topic的积压量*/private static long getBackLogMsg(String consumerGroup,String topic){long diff=0;log.info("BacklogMonitorUtil--getBackLogMsg param:consumerGroup:{},topic:{} ",consumerGroup,topic);try {ConsumeStats consumeStats = MQAdminExtConfig.defaultMQAdminExt.examineConsumeStats(consumerGroup);List<MessageQueue> mqList = new LinkedList();mqList.addAll(consumeStats.getOffsetTable().keySet());Collections.sort(mqList);for(MessageQueue queue :mqList){if(topic.equals(queue.getTopic())){OffsetWrapper offsetWrapper = (OffsetWrapper)consumeStats.getOffsetTable().get(queue);log.info("getBrokerOffset----------------{}",offsetWrapper.getBrokerOffset());log.info("getConsumerOffset-----------------{}",offsetWrapper.getConsumerOffset());diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();}}} catch (Exception e) {//当消费者未消费时此除会报错diff=0;log.error("get offset error -----------------{}",e);}return diff;}

这里本来想探究一下为什么当消费者不消费时会报错,先把错误贴出来

上面报的是一个topic路由找不到的错误,且topic是%RETRY%开头的,但是通过查看源码发现defaultMQAdminExt.examineConsumeStats的实现类,查询的topic直接就是

关于%RETRY%开头的topic
consumer 消费失败,会把消息重新发往 %RETRY% + consumerGroup,这个 retry 消息会在一定时间后,真实送到 retry topic。
但是这里为什么会直接去查 %RETRY% + consumerGroup,没有搞明白,后续再继续跟踪~,有知道的老哥可以在评论区写下答案,感谢

修改

真实使用过程中还是出现了一些问题,即某些情况下

  diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();

这个diff会返回0,这里直接修改成

 diff=consumeStats.computeTotalDiff();

rocketmq消息积压监控java代码实现相关推荐

  1. 阿里二面:RocketMQ 消息积压了,增加消费者有用吗?

    面试官:RocketMQ 消息积压了,增加消费者有用吗? 我:这个要看具体的场景,不同的场景下情况是不一样的. 面试官:可以详细说一下吗? 我:如果消费者的数量小于 MessageQueue 的数量, ...

  2. stopwatch java_利用StopWatch类监控Java代码执行时间并分析性能

    springframework中的StopWatch类可以测量一个时间间隔的运行时间,也可以测量多个时间间隔的总运行时间.一般用来测量代码执行所用的时间或者计算性能数据,在优化代码性能上可以使用Sto ...

  3. RocketMq消息积压、消息重复、消息完整、消息顺序处理方案

    消息积压 当生产端的生产效率大于消费端的消费效率,就会造成消息处理不完的情况. 排查方式:RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积.收发失败等问题?_rocketmq监控指标_ ...

  4. Java Kafka 消费积压监控

    Java Kafka 消费积压监控 后端代码: Monitor.java代码: package com.suncreate.kafkaConsumerMonitor.service;import co ...

  5. RocketMQ 消息结构和消息类型

    发送消息的一方称为生产者,负责生产消息,一般由业务系统负责生产消息.一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器.RocketMQ 提供多种发送方式,同步发送.异步发送.顺序 ...

  6. 诊断神器Arthas,动态跟踪Java代码,实时监控JVM状态

    简介 Arthas 是Alibaba开源的Java诊断工具,动态跟踪Java代码:实时监控JVM状态,可以在不中断程序执行的情况下轻松完成JVM相关问题排查工作 .支持JDK 6+,支持Linux/M ...

  7. java 创建topic,RocketMQ在Java代码之中手动创建Topic

    Rocketmq在Java代码之中手动创建Topic [原创,转载请注明出处] 我的 [博客园主页] [CSDN主页] [简书主页] 加V进Java交流群,备注Java交流:w1129574379 * ...

  8. java代码控制qq、微信发送消息

    对于程序QQ.微信的程序控制,本人并不是直操纵他官方提供的接口,而且直接借用github大佬开发的第三方辅助程序,开放出来接口供java代码调用,实现java代码控制qq.微信发消息的功能. 一.代码 ...

  9. mq补偿机制java代码_RocketMQ源码分析之消息消费机制-消费端消息负载均衡机制与重新分布 - Java 技术驿站-Java 技术驿站...

    1.消息消费需要解决的问题 首先再次重复啰嗦一下RocketMQ消息消费的一些基本元素的关系 主题 ---> 消息队列(MessageQueue) 1 对多 主题 ----> 消息生产者, ...

最新文章

  1. 有存款,才能过得更踏实
  2. 谷歌40人发表59页长文:为何真实场景中ML模型表现不好?
  3. Jmeter(三)断言和关联
  4. JUnit4套件测试
  5. 一个标签的72变,打造一个纯CSS图标库
  6. 面向对象的程序设计特点
  7. 【硬见小百科】数字万用表的工作原理
  8. 2019泰迪杯C题案例分析-python大数据自动化数据挖掘
  9. 新手兼职也能月入5000的副业项目,几乎零门槛
  10. 【javase基础】第六篇:方法的重载与递归
  11. C语言数字图像处理---ZPHOTOENGINE算法库使用
  12. android修改自动背光,自动背光算法-Android 8.1
  13. 黑马程序员——JAVA集合
  14. 剑指offer笔记(七) 第47题至第53题
  15. SM30表维护自动更新值
  16. codegear的希望
  17. 尚学堂java300集飞机小游戏实战
  18. [项目管理-4]:软硬件项目管理 - 人月神话:项目时间管理(时间)
  19. PHP 实现大数据(30w量级)表格导出(导出excel) 提高效率,减少内存消耗,终极解决方案
  20. Iphone 免费申请App ID

热门文章

  1. 数值分析内容概览及学习总结
  2. onFinishInflate()
  3. java西游记释厄传super,西游记释厄传super出招表
  4. LogHub 智能日志分析通用公开数据集
  5. Python一炮句搞定网页登录验证码自动输入
  6. Cinema Director使用教程
  7. 基于深度学习的蛋白序列设计方法——proteinMPNN
  8. Xilinx FPGA平台DDR3设计保姆式教程(1)DDR3基础简介
  9. Bootstrap Affix和过渡效果插件的详细使用【前端Bootstrap框架】
  10. trace32专栏——基础调试