提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

背景

1 RocketMQ介绍

1.1 RocketMQ 特点

1.2 RocketMQ 优势

1.3.RocketMQ环境介绍

2、添加微信群机器人

3、监控rocketMQ消费组,发送到企业微信

3.1原理:

3.2 准备mysql 表:

3.3 python代码如下:

3.4、crontab 定时调度或者加入dolphinsheduler 调度

总结


背景

RocketMQ 是重要的消息中间件,消息中间件的重要作用:解耦,异步,削峰,当前业务是消费RocketMQ数据写入hbase,通过业务hbase和rocketMQ监控发现,如果集群有大任务占用集群资源,hbase写入比较缓慢,rocketMQ消息就会堆积,于是就需要监控rockeMQ消息堆积情况,如果消息堆积较多,就发送到企业微信告警。

1 RocketMQ介绍

RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

1.1 RocketMQ 特点

支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型

在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 (RocketMQ可以保证严格的消息顺序,而ActiveMQ无法保证)

支持拉(pull)和推(push)两种消息模式

pull其实就是消费者主动从MQ中去拉消息,而push则像rabbit MQ一样,是MQ给消费者推送消息。但是RocketMQ的push其实是基于pull来实现的。
它会先由一个业务代码从MQ中pull消息,然后再由业务代码push给特定的应用/消费者。其实底层就是一个pull模式

单一队列百万消息的堆积能力 (RocketMQ提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟)

支持多种消息协议,如 JMS、MQTT 等

分布式高可用的部署架构,满足至少一次消息传递语义(RocketMQ原生就是支持分布式的,而ActiveMQ原生存在单点性)

提供 docker 镜像用于隔离测试和云集群部署

提供配置、指标和监控等功能丰富的 Dashboard

1.2 RocketMQ 优势

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:

支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
支持 18 个级别的延迟消息(Kafka 不支持)
支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
支持 Consumer 端 Tag 过滤,减少不必要的网络传输(即过滤由MQ完成,而不是由消费者完成。RabbitMQ 和 Kafka 不支持)
支持重复消费(RabbitMQ 不支持,Kafka 支持)
2 RocketMQ 基本概念
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。

1.3.RocketMQ环境介绍

当前安装的rocketMQ 版本:3.4.6

架构:2主2从

具体安装步骤请参考:阿里云主机安装RocketMQ 集群支持外网访问

并安装了rocketMQ console:http://10.0.0.130:12581

2、添加微信群机器人

输入名称,添加机器人后会生成webhook地址;

注意webhook地址不要泄露

3、监控rocketMQ消费组,发送到企业微信

3.1原理:

rocketMQ console 消费组消费url:http://10.0.0.130:12581/consumer/groupList.query

通过python访问该地址,获取的数据写入到mysql,当消费延迟超过阈值时,则向企业微信发送一条告警消息。

然后将该程序加入到调度系统,或者使用crontab 每隔5分钟运行一次。

3.2 准备mysql 表:

创建表:


SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for tbl_rocketmq_diff_m
-- ----------------------------
DROP TABLE IF EXISTS `tbl_rocketmq_diff_m`;
CREATE TABLE `tbl_rocketmq_diff_m`  (`SubscriptionGroup` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`version` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`Quantity` bigint(10) NULL DEFAULT NULL,`consumeType` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`messageModel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,`consumeTps` bigint(20) NULL DEFAULT NULL,`diffTotal` bigint(20) NULL DEFAULT NULL,`upload_time` datetime(0) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Compact;SET FOREIGN_KEY_CHECKS = 1;

3.3 python代码如下:

#!/usr/bin/python
# _*_ coding:UTF-8 _*_import requests
import sys
import json
import urllib
import time
import pymysql
from datetime import datetimefrom urllib.parse import quote,unquote
#file = open("./data1.txt", "a",encoding='utf-8')def checkJson(key,json):if (key in json) :return json[key]else:return ''
'''{"status":0,"data":[{"group":"MistakeRecordConsumerGroup","version":"V3_5_9","count":25,"consumeType":"CONSUME_PASSIVELY","messageModel":"CLUSTERING","consumeTps":1,"diffTotal":4},{"group":"consumerGroup_heart","version":"V3_5_9","count":7,"consumeType":"CONSUME_PASSIVELY","messageModel":"CLUSTERING","consumeTps":1182,"diffTotal":776},{"group":"producerGroup_record_upload","version":"V3_5_9","count":4,"consumeType":"CONSUME_PASSIVELY","messageModel":"CLUSTERING","consumeTps":206,"diffTotal":8083},{"group":"producerGroup_heart_data","version":"V3_5_9","count":4,"consumeType":"CONSUME_PASSIVELY","messageModel":"CLUSTERING","consumeTps":1183,"diffTotal":7},{"group":"producerGroup_springclass_data","version":"V3_5_9","count":4,"consumeType":"CONSUME_PASSIVELY","messageModel":"CLUSTERING","consumeTps":0,"diffTotal":0},{"group":"test_consumer_group","version":"V3_5_9","count":1,"consumeType":"CONSUME_PASSIVELY","messageModel":"CLUSTERING","consumeTps":0,"diffTotal":0},{"group":"consumer_test","version":"HigherVersion","count":1,"consumeType":"CONSUME_PASSIVELY","messageModel":"CLUSTERING","consumeTps":0,"diffTotal":0},{"group":"bigdata_noahdata_test_group","version":"V3_5_9","count":1,"consumeType":"CONSUME_PASSIVELY","messageModel":"CLUSTERING","consumeTps":0,"diffTotal":0},{"group":"CID_ONS-HTTP-PROXY","version":null,"count":0,"consumeType":null,"messageModel":null,"consumeTps":0,"diffTotal":-1},{"group":"CID_ONSAPI_PERMISSION","version":null,"count":0,"consumeType":null,"messageModel":null,"consumeTps":0,"diffTotal":-1},{"group":"TOOLS_CONSUMER","version":null,"count":0,"consumeType":null,"messageModel":null,"consumeTps":0,"diffTotal":-1},{"group":"CID_ONSAPI_PULL","version":null,"count":0,"consumeType":null,"messageModel":null,"consumeTps":0,"diffTotal":-1},{"group":"FILTERSRV_CONSUMER","version":null,"count":0,"consumeType":null,"messageModel":null,"consumeTps":0,"diffTotal":-1},{"group":"SELF_TEST_C_GROUP","version":null,"count":0,"consumeType":null,"messageModel":null,"consumeTps":0,"diffTotal":-1},{"group":"CID_ONSAPI_OWNER","version":null,"count":0,"consumeType":null,"messageModel":null,"consumeTps":0,"diffTotal":-1}],"errMsg":null}
'''threshold = 50000
def get_now_time():"""获取当前日期时间:return:当前日期时间"""now =  time.localtime()now_time = time.strftime("%Y-%m-%d %H:%M:%S", now)# now_time = time.strftime("%Y-%m-%d ", now)return now_timedef parseMain():mysql_host = 'localhost'mysql_db = 'hive'mysql_user = 'user****'mysql_password = 'password****'mysql_port = 3306url = 'http://10.0.0.130:12581/consumer/groupList.query'headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36','cache-control': 'no-cache','content-type': 'application/json',}db = pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, password=mysql_password, db=mysql_db,charset='utf8')html = requests.get(url, headers=headers)jobj = json.loads(html.text)jevents = jobj['data']jtmp = []for key in jevents:group = checkJson('group', key)version = checkJson('version', key)count = checkJson('count', key)consumeType = checkJson('consumeType', key)messageModel = checkJson('messageModel', key)consumeTps = checkJson('consumeTps', key)diffTotal = checkJson('diffTotal', key)upload_time = get_now_time()if count !=0:try:cursor = db.cursor()sql = 'INSERT INTO tbl_rocketmq_diff_m(SubscriptionGroup,version,Quantity,consumeType,messageModel,consumeTps,diffTotal,upload_time) values("' + str(group) + '","' + str(version) + '","' + str(count) + '","' + str(consumeType) + '","' + str(messageModel) + '","' + str(consumeTps) + '","' + str(diffTotal) + '","' + str(upload_time) + '")'cursor.execute(sql)db.commit()except Exception as e:print('error sql:' +  sql)if (diffTotal > threshold) :jtmp.append(key)jAlert = "**告警通知:[" + str(len(jtmp)) + "]** \n"for key in jtmp:group = checkJson('group', key)diffTotal = checkJson('diffTotal', key)consumeTps = checkJson('consumeTps',key)upload_time = get_now_time()jAlert = jAlert + ">问题名称:**<font color=\"warning\">" + "消费延迟" + "</font>**\n" \+ ">告警来源:<font color=\"comment\">" + "rockectMQ" + "</font>\n" \+ ">告警时间:<font color=\"comment\">" + str(upload_time) + "</font>\n" \+ ">问题详情:<font color=\"comment\">" + "消费组:" + str(group) + ",消费延迟大于"+str(threshold)+",当前值延迟(" + str(diffTotal) + ")" +" Tps:("+str(consumeTps)+ ")</font>\n" \+ ">问题描述:<font color=\"comment\">" + "消费延迟过大" + "</font>\n" \+ ">目前状态:<font color=\"comment\">" + "firing" + "</font>\n" \+ ">告警级别:<font color=\"comment\">" + "critical" + "</font>\n" \+ "————————————————\n"if len(jtmp) > 0:data = {}markdown = {}markdown['content'] = jAlertdata['msgtype'] = 'markdown'data['markdown'] = markdownjdata = json.dumps(data, sort_keys=False, indent=4, separators=(',', ': '))response = requests.post('https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=***',headers=headers, data=jdata)print('response:' + response.text)response.close()db.close()if __name__ == '__main__':parseMain()

3.4、crontab 定时调度或者加入dolphinsheduler 调度

略:

总结

本文结合实际业务场景介绍了监控rocketmq消费者组消息堆积,将RocketMQ消费者组消费数据写入mysql,并向企业微信发送告警,作为抛砖引玉,大家可以结合自己场景,修改脚本做其他业务的监控。若有疑问,可以留言咨询。

监控RocketMQ消费数据延迟告警发送企业微信相关推荐

  1. Zabbix 系统监控(三)VMware 虚拟平台监控、邮件告警、企业微信告警配置

    Vmware 虚拟平台监控.邮件告警.企业微信告警配置 8 Vmware 虚拟平台监控 阅读 zabbix 官方文档,官方提供了 Vmware 虚拟机监控模板,并对模板进行了解释说明,但未对相应名词做 ...

  2. 网站首页被篡改? 看我使用PhantomJS利器实现网站自动监控修改并截图发送企业微信预警

    欢迎关注「WeiyiGeek」 设为「星标⭐」每天带你 基础入门 到 进阶实践 再到 放弃学习! 涉及 网络安全运维.应用开发.物联网IOT.学习路径 .个人感悟 等知识 "  花开堪折直须 ...

  3. Prometheus配置和使用Alertmanager发送告警至企业微信

    注:本文基于CentOS 7.4编写 1.准备工作 1.1 创建应用 注册企业微信,这个不细说.注册完成后,点击应用管理->应用->创建应用 1.2 获取应用ID和秘钥 按照要求创建应用后 ...

  4. 企业微信机器人推送mysql_进阶功能|将数据推送到企业微信群机器人

    当通过金数据收集到新数据/新客资后,如何才能在企业微信中快速通知特定负责人进行处理呢? 小金特别准备了一份文档,帮助大家了解学习噢- 前期准备工作 1.金数据账户 2.WinDeal账户 3.企业微信 ...

  5. python实现由通知接口发送企业微信通知

    python实现由通知接口发送企业微信通知 问题背景 要点说明 1. 请求接口鉴权并返回token值 2.请求通知接口发送企业微信通知 3.通知内容中传入参数 问题背景 项目中需要通过 python ...

  6. jenkins使用python脚本发送企业微信通知

    如果只是想实现将jenkins的构建结果发送到企业微信进行通知,最简便的方式是安装Qy Wechat Notification Plugin插件,网上教程很多就不重复写了,可参考:https://ww ...

  7. 企业微信推送消息延迟_通过企业微信发送提醒消息 支持markdown

    师太大佬: 最近一直在使用方糖推送,看到LOC大佬的企业微信推送感觉NB,隧稍作修改发上来分享给大家食用~ LOC大佬的GITHUB:https://github.com/kaixin1995/Inf ...

  8. PrometheusAlert+prometheus+Alertmanager实现各种类型告警 (企业微信告警、飞书告警、钉钉告警、)

    Prometheus Alert 是开源的运维告警中心消息转发系统,支持主流的监控系统 Prometheus,日志系统 Graylog 和数据可视化系统 Grafana 发出的预警消息.通知渠道支持钉 ...

  9. 发送企业微信工资条消息

    企业微信新版本不支持获取人员详情信息,可用老版本获取,然后用新版本发送. import pandas as pd import json import requests from os.path im ...

最新文章

  1. 皮一皮:原来微信备注还有这个用...
  2. uglify压缩angular控制器注意
  3. boost::geometry::azimuth用法的测试程序
  4. 查找域名、由名字查找某个熟知的端口、由名字查找协议
  5. git 常用的撤销操作
  6. 由于找不到mfc110.dll,无法继续执行代码的解决方法
  7. java 创建日程到期提醒_Java 多线程与高并发,基础概念回顾
  8. 利用Python K-means实现简单图像聚类
  9. 学习Unix其实就这样简单
  10. 前端是Sencha Touch+ Cordova(转)
  11. mysql insert 性能_MySQL 提高Insert性能
  12. 直觉模糊集的基本要素
  13. 【线性回归】-最小二乘法求一元线性回归公式推导及代码实现
  14. 颠覆式编程:软件2.0
  15. 什么是scrum敏捷项目管理
  16. 步进电机、伺服电机、舵机的原理和区别?
  17. 我们上语文英语音乐计算机和美术英语,“制作课程表”教学设计.ppt
  18. vba 冻结窗格_在VBA中进行调试-2A)代码窗格(F7)
  19. 倒计时c#/unity
  20. 百度小程序坐拥三大亿级流量入口 如何低成本制作百度小程序?

热门文章

  1. 电线曲挠试验机的发展 洛克仪器Labverse
  2. 乱世王者服务器维护,《乱世王者》数据互通计划开启公告
  3. python和c语言哪一个好_python和c语言先学哪个好
  4. 幸福的瑞星卡卡狮 郁闷的江民防火墙
  5. c# 读cpu温度,显卡温度,硬盘温度,风扇转速,硬件信息,cpu占用,附赠项目源码
  6. 【Python】opencv展示汉字及与pillow对比
  7. 创享投资贾珂:我们的“泛娱乐+”时代
  8. Adaboost 人脸检测:Haar特征及积分图、分类器的级联
  9. 西湖论剑 2023 比赛复现
  10. 房企数据中台:核心是提高销售,客户开什么车也有用 | 地产圆桌会⑤