Java Kafka 消费积压监控
Java Kafka 消费积压监控
后端代码:
Monitor.java代码:
package com.suncreate.kafkaConsumerMonitor.service;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*;/*** kafka消费监控** @author suxiang*/ public class Monitor {private static final Logger log = LoggerFactory.getLogger(Monitor.class);private String servers;private String topic;private String groupId;private long lastTime;private long lastTotalLag = 0L;private long lastLogSize = 0L;private long lastOffset = 0L;private double lastRatio = 0;private long speedLogSize = 0L;private long speedOffset = 0L;private String time;private List<ConsumerInfo> list;private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");public String getTime() {return time;}public void setTime(String time) {this.time = time;}public long getLastTotalLag() {return lastTotalLag;}public double getLastRatio() {return lastRatio;}public String getTopic() {return topic;}public String getGroupId() {return groupId;}public long getSpeedLogSize() {return speedLogSize;}public long getSpeedOffset() {return speedOffset;}public List<ConsumerInfo> getList() {return list;}public void setList(List<ConsumerInfo> list) {this.list = list;}private KafkaConsumer<String, String> consumer;private List<TopicPartition> topicPartitionList;private final DecimalFormat decimalFormat = new DecimalFormat("0.00");private ConsumerGroupsService consumerGroupsService;private String groupIdShort;private boolean needUpdate;/*** kafka消费监控** @param servers* @param consumerGroupsService* @param topic* @param groupId* @param needUpdate true:需要更新 groupId 和 KafkaConsumer,groupId传递前缀即可;false:不需要更新 groupId 和 KafkaConsumer,groupId传递全称*/public Monitor(String servers, ConsumerGroupsService consumerGroupsService, String topic, String groupId, boolean needUpdate) {this.servers = servers;this.topic = topic;this.groupIdShort = groupId;this.groupId = consumerGroupsService.getGroupId(topic, groupId);this.consumerGroupsService = consumerGroupsService;this.needUpdate = needUpdate;this.list = new ArrayList<>();//消费者consumer = createConsumer();//查询 topic partitionstopicPartitionList = new ArrayList<>();List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);for (PartitionInfo partitionInfo : partitionInfoList) {TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());topicPartitionList.add(topicPartition);}}public void monitor(boolean addToList) {try {long startTime = System.currentTimeMillis();//查询 log sizeMap<Integer, Long> endOffsetMap = new HashMap<>();Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitionList);for (TopicPartition partitionInfo : endOffsets.keySet()) {endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));}//查询消费 offsetMap<Integer, Long> commitOffsetMap = new HashMap<>();for (TopicPartition topicAndPartition : topicPartitionList) {OffsetAndMetadata committed = consumer.committed(topicAndPartition);commitOffsetMap.put(topicAndPartition.partition(), committed.offset());}long endTime = System.currentTimeMillis();log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");//累加laglong totalLag = 0L;long logSize = 0L;long offset = 0L;if (endOffsetMap.size() == commitOffsetMap.size()) {for (Integer partition : endOffsetMap.keySet()) {long endOffset = endOffsetMap.get(partition);long commitOffset = commitOffsetMap.get(partition);long diffOffset = endOffset - commitOffset;totalLag += diffOffset;logSize += endOffset;offset += commitOffset;}} else {log.error("Topic:" + topic + " consumer:" + consumer + " topic partitions lost");}log.info("Topic:" + topic + " logSize:" + logSize + " offset:" + offset + " totalLag:" + totalLag);if (lastTime > 0) {if (System.currentTimeMillis() - lastTime > 0) {speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0));speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0));}if (speedLogSize > 0) {String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0));lastRatio = Double.parseDouble(strRatio);log.info("Topic:" + topic + " speedLogSize:" + speedLogSize + " speedOffset:" + speedOffset + " 百分比:" + strRatio + "%");}}lastTime = System.currentTimeMillis();lastTotalLag = totalLag;lastLogSize = logSize;lastOffset = offset;if (addToList) {this.setTime(simpleDateFormat.format(new Date()));this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime()));if (this.list.size() > 500) {this.list.remove(0);}}} catch (Exception e) {log.error("Monitor error", e);}}private KafkaConsumer<String, String> createConsumer() {//消费者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new KafkaConsumer<String, String>(properties);}/*** 更新 groupId 和 KafkaConsumer*/public void update() {if (needUpdate) {try {String oldGroupId = this.groupId;this.groupId = consumerGroupsService.getGroupId(topic, groupIdShort);log.info("groupId 已更新 旧groupId=" + oldGroupId + " 新groupId=" + this.groupId);if (this.consumer != null) {try {this.consumer.close();} catch (Exception e) {log.error("consumer close error", e);}this.consumer = null;}this.consumer = createConsumer();log.info("KafkaConsumer 已更新");} catch (Exception e) {log.error("Monitor update error", e);}}}}
MonitorService.java代码:
package com.suncreate.kafkaConsumerMonitor.service;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service;import javax.annotation.PostConstruct; import java.util.*;@Service public class MonitorService {private static final Logger log = LoggerFactory.getLogger(MonitorService.class);@Value("${kafka.consumer.servers}")private String servers;@Autowiredprivate ConsumerGroupsService consumerGroupsService;private List<Monitor> monitorList;@PostConstructprivate void Init() {monitorList = new ArrayList<>();monitorList.add(new Monitor(servers, consumerGroupsService, "wifiData", "wifi-kafka-hbase", false));monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC", "EXTRACT-SAMPLE", false));monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC", "dblrecog-upload2vcn", false));monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC_FILTER", "yisa", true));monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC_FILTER", "kafka-filter-check", true));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "unifiedstorage-downloader", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "full-vehicle-data-storage-kafka2ch", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vehicle_store", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vcn-sk-upload-luyang", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vcn-sk-upload-yaohai", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vcn-sk-upload-baohe", false));monitorList.add(new Monitor(servers, consumerGroupsService, "peopleFace", "kafka-filter-check", true));}public void monitorOnce(boolean addToList) {for (Monitor monitor : monitorList) {monitor.monitor(addToList);}}public List<ConsumerInfo> getConsumerList() {List<ConsumerInfo> list = new ArrayList<>();for (Monitor monitor : monitorList) {list.add(new ConsumerInfo(monitor.getTopic(), monitor.getGroupId(), monitor.getLastTotalLag(), monitor.getLastRatio(), monitor.getSpeedLogSize(), monitor.getSpeedOffset(), monitor.getTime()));}return list;}public List<ConsumerInfo> getDetails(String topic, String groupId) {for (Monitor monitor : monitorList) {if (monitor.getTopic().equals(topic) && monitor.getGroupId().equals(groupId)) {return monitor.getList();}}return new ArrayList<>();}/*** 更新 Monitor 和 consumerGroupsService*/public void update() {consumerGroupsService.update();for (Monitor monitor : monitorList) {monitor.update();}}}
ConsumerGroupsService.java代码:
用于获取kafka的topic下的所有消费者组,new Monitor传的groupId参数可能不是消费者组的全称,所以需要从topic的所有消费者组中匹配到全称。
由于对接的是华为FusionInsight平台的Kafka,所以需要使用带身份认证的端口连接,才能使用AdminClient类获取到所有消费者组。代码里把不带安全认证的端口21005换成带安全认证的端口21007。
package com.suncreate.kafkaConsumerMonitor.service;import kafka.admin.AdminClient; import kafka.coordinator.group.GroupOverview; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import scala.collection.JavaConversions;import javax.annotation.PostConstruct; import java.util.*;@Service public class ConsumerGroupsService {private static final Logger log = LoggerFactory.getLogger(ConsumerGroupsService.class);@Value("${kafka.consumer.servers}")private String servers;private List<GroupOverview> groupListAll;@PostConstructprivate void Init() {try {//身份认证System.setProperty("java.security.auth.login.config", "/home/server/import/conf/jaas.conf");System.setProperty("java.security.krb5.conf", "/home/server/import/conf/krb5.conf");//System.setProperty("java.security.auth.login.config", "D:/Project/shiny/kafka-consumer-monitor/conf/jaas.conf");//System.setProperty("java.security.krb5.conf", "D:/Project/shiny/kafka-consumer-monitor/conf/krb5.conf");groupListAll = getAllGroups();} catch (Exception e) {log.error("ConsumerGroupsService Init 失败", e);}}private List<GroupOverview> getAllGroups() {List<GroupOverview> list = new ArrayList<>();Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers.replace("21005", "21007"));properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");AdminClient client = AdminClient.create(properties);try {list = scala.collection.JavaConversions.seqAsJavaList(client.listAllGroupsFlattened().toSeq());if (list != null) {log.info("ConsumerGroupsService Init 获取所有消费者组 成功 groupListAll size=" + groupListAll.size());} else {log.error("ConsumerGroupsService Init 获取所有消费者组 失败 groupListAll=null");}} catch (Exception e) {log.error("ConsumerGroupsService Init 获取所有消费者组 失败", e);} finally {client.close();}return list;}public String getGroupId(String topic, String groupId) {java.util.Set<String> groups = getConsumerGroups(topic);for (String item : groups) {if (item.indexOf(groupId) >= 0) {return item;}}return groupId;}private java.util.Set<String> getConsumerGroups(String topic) {Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers.replace("21005", "21007"));properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");AdminClient client = AdminClient.create(properties);java.util.Set<String> groups = new HashSet<String>();try {if (groupListAll != null) {for (GroupOverview overview : groupListAll) {String groupID = overview.groupId();Map<TopicPartition, Object> offsets = JavaConversions.mapAsJavaMap(client.listGroupOffsets(groupID));for (TopicPartition partition : offsets.keySet()) {if (partition.topic().equals(topic)) {groups.add(groupID);}}}}log.info("Topic:" + topic + " 消费者组集合:" + groups);} catch (Exception e) {log.error("getConsumerGroups error", e);} finally {client.close();}return groups;}public void update() {this.groupListAll = getAllGroups();}}
MonitorConfig.java代码:
package com.suncreate.kafkaConsumerMonitor.task;import com.suncreate.kafkaConsumerMonitor.service.MonitorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger;import java.text.SimpleDateFormat;@Configuration @EnableScheduling public class MonitorConfig implements SchedulingConfigurer {private static final Logger logger = LoggerFactory.getLogger(MonitorConfig.class);private String cronExpression = "0 */3 * * * ?";//private String cronExpression = "*/20 * * * * ?";private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@Autowiredprivate MonitorService monitorService;@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {taskRegistrar.addTriggerTask(() -> {monitorService.update();monitorService.monitorOnce(true);}, triggerContext -> new CronTrigger(cronExpression).nextExecutionTime(triggerContext));} }
MonitorController.java代码:
package com.suncreate.kafkaConsumerMonitor.controller;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import com.suncreate.kafkaConsumerMonitor.model.LayuiData; import com.suncreate.kafkaConsumerMonitor.service.MonitorService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.List;@RestController @RequestMapping("/monitor") public class MonitorController {@Autowiredprivate MonitorService monitorService;@GetMapping("/getConsumers")public LayuiData getConsumers() {List<ConsumerInfo> list = monitorService.getConsumerList();LayuiData data = new LayuiData(list);return data;}@GetMapping("/monitorOnce")public void monitorOnce() {monitorService.monitorOnce(false);}@GetMapping("/getDetails")public LayuiData getDetails(String topic, String groupId) {List<ConsumerInfo> list = monitorService.getDetails(topic, groupId);LayuiData data = new LayuiData(list);return data;} }
pom.xml文件(有些东西没用到或者备用,没有删):
pom文件中引用的jar包,跟开源的jar包版本完全一致,但jar包中的内容大不相同,所以必须引用华为平台给的jar包才行。需要注意jar包依赖的jar包也不能使用开源jar包,一定要引用到华为平台给的jar包。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.6.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.suncreate</groupId><artifactId>kafka-consumer-monitor</artifactId><version>1.0</version><name>kafka-consumer-monitor</name><description>Kafka消费积压监控预警</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.54</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.0</version></dependency><!-- postgresql --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><scope>runtime</scope></dependency><!-- elasticsearch --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>6.1.4</version></dependency><!-- oracle --><dependency><groupId>com.oracle</groupId><artifactId>ojdbc6</artifactId><version>11.1.0.7.0</version></dependency><!-- kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.11.0.1</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion><exclusion><groupId>net.sf.jopt-simple</groupId><artifactId>jopt-simple</artifactId></exclusion><exclusion><groupId>com.yammer.metrics</groupId><artifactId>metrics-core</artifactId></exclusion><exclusion><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></exclusion><exclusion><groupId>com.101tec</groupId><artifactId>zkclient</artifactId></exclusion><exclusion><groupId>org.scala-lang.modules</groupId><artifactId>scala-parser-combinators_2.11</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.1</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId></exclusion><exclusion><groupId>org.xerial.snappy</groupId><artifactId>snappy-java</artifactId></exclusion></exclusions></dependency><!-- kafka_2.11 依赖的jar包 --><dependency><groupId>net.sf.jopt-simple</groupId><artifactId>jopt-simple</artifactId><version>5.0.3</version><classifier>huawei</classifier></dependency><dependency><groupId>com.yammer.metrics</groupId><artifactId>metrics-core</artifactId><version>2.2.0</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.11</version><classifier>huawei</classifier></dependency><dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.10</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-parser-combinators_2.11</artifactId><version>1.0.4</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version><classifier>huawei</classifier></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><classifier>huawei</classifier></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.1</version><classifier>huawei</classifier></dependency><!-- kafka-clients 依赖的jar包 --><dependency><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId><version>1.3.0</version><classifier>huawei</classifier></dependency><dependency><groupId>org.xerial.snappy</groupId><artifactId>snappy-java</artifactId><version>1.1.2.6</version><classifier>huawei</classifier></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build></project>
前端使用了 Layui 和 ECharts 展示表格和图表
index.css代码:
.div-title {font-size: 18px;margin-top: 10px;margin-left: 10px; }.div-right {text-align: right; }.span-red {color: #ff0000; }
index.html代码(展示topic、消费者组Consumer GroupId、Total Lag、Kafka数据生产速度、Kafka数据消费速度等):
<!DOCTYPE html> <html lang="zh"> <head><meta charset="UTF-8"><title>Title</title><link rel="stylesheet" href="css/index.css"><link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" media="all"><script type="text/javascript" src="js/jquery-1.7.1.js"></script><script type="text/javascript" src="js/layui-v2.6.8/layui.js" charset="utf-8"></script> </head> <body><div class="div-title">Kafka 监控<button type="button" class="layui-btn layui-btn-sm" οnclick="refreshTable()">刷新</button> </div> <table class="layui-hide" id="myTable"></table><script type="text/javascript">var myTable;layui.use('table', function () {var table = layui.table;myTable = table.render({elem: '#myTable',url: '/home/monitor/getConsumers',cellMinWidth: 80, //全局定义常规单元格的最小宽度cols: [[{field: 'topic', width: 300, title: 'topic', sort: true},{field: 'groupId', width: 300, title: 'groupId'},{field: 'totalLag', width: 150, title: 'Total Lag', sort: true, templet: function (d) {if (d.delayDay * 24 > 2) {return '<div class="div-right"><span class="span-red">' + formatLongNum(d.totalLag) + '</span></div>'} else {return '<div class="div-right"><span>' + formatLongNum(d.totalLag) + '</span></div>'}}},{field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) {return '<div class="div-right">' + d.speedLogSize + '</div>'}},{field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: function (d) {return '<div class="div-right">' + d.speedOffset + '</div>'}},{field: 'ratio', width: 100, title: '消费/生产', templet: function (d) {if (d.ratio < 90) {return '<div class="div-right"><span class="span-red">' + d.ratio + '%</span></div>'} else {return '<div class="div-right"><span>' + d.ratio + '%</span></div>'}}},{field: 'delayDay', width: 150, title: '积压(天)', sort: true, templet: function (d) {if (d.delayDay * 24 > 2) {return '<div class="div-right"><span class="span-red">' + d.delayDay + '</span></div>'} else {return '<div class="div-right"><span>' + d.delayDay + '</span></div>'}}},{field: 'ope', width: 100, title: '操作', templet: function (d) {return '<a href="/home/detail.html?topic=' + d.topic + '&groupId=' + d.groupId + '" target="_blank" class="layui-btn layui-btn-sm" >详细</a>';}}]]});});function refreshTable() {if (myTable) {myTable.reload();}}setInterval(function () {refreshTable();}, 30000);function formatLongNum(num) {return (num + '').replace(/(\d{1,4})(?=(\d{4})+(?:$|\.))/g, '$1 ')}// setInterval(function () {// $.get("/home/monitor/monitorOnce");// }, 30000);</script></body> </html>
detail.html代码(展示单个消费者组的Total Lag、生产速度、消费速度以及Total Lag趋势图):
<!DOCTYPE html> <html lang="zh"> <head><meta charset="UTF-8"><title>Title</title><link rel="stylesheet" href="css/index.css"><link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" media="all"><script type="text/javascript" src="js/jquery-1.7.1.js"></script><script type="text/javascript" src="js/layui-v2.6.8/layui.js" charset="utf-8"></script><script type="text/javascript" src="js/echarts-v4.7.0/echarts.min.js"></script> </head> <body><div class="div-title"><span id="detailTitle"></span> 明细<button type="button" class="layui-btn layui-btn-sm" οnclick="refreshTable()">刷新</button> </div> <div id="main" style="height:400px;"></div> <table class="layui-hide" id="test"></table><script type="text/javascript">var myTable;var topic = getQueryVariable("topic");var groupId = getQueryVariable("groupId");$("#detailTitle").html(topic + " " + groupId);layui.use('table', function () {var table = layui.table;myTable = table.render({elem: '#test',url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId,cellMinWidth: 80, //全局定义常规单元格的最小宽度initSort: {field: 'time', //排序字段,对应 cols 设定的各字段名type: 'desc' //排序方式 asc: 升序、desc: 降序、null: 默认排序},cols: [[{field: 'topic', width: 300, title: 'topic'},{field: 'groupId', width: 300, title: 'groupId'},{field: 'time', width: 180, title: '时间', sort: true},{field: 'totalLag', width: 150, title: 'Total Lag', templet: function (d) {if (d.delayDay * 24 > 2) {return '<div class="div-right"><span class="span-red">' + formatLongNum(d.totalLag) + '</span></div>'} else {return '<div class="div-right"><span>' + formatLongNum(d.totalLag) + '</span></div>'}}},{field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) {return '<div class="div-right">' + d.speedLogSize + '</div>'}},{field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: function (d) {return '<div class="div-right">' + d.speedOffset + '</div>'}},{field: 'ratio', width: 100, title: '消费/生产', templet: function (d) {if (d.ratio < 90) {return '<div class="div-right"><span class="span-red">' + d.ratio + '%</span></div>'} else {return '<div class="div-right"><span>' + d.ratio + '%</span></div>'}}},{field: 'delayDay', width: 150, title: '积压(天)', templet: function (d) {if (d.delayDay * 24 > 2) {return '<div class="div-right"><span class="span-red">' + d.delayDay + '</span></div>'} else {return '<div class="div-right"><span>' + d.delayDay + '</span></div>'}}}]]});});function refreshTable() {if (myTable) {myTable.reload();}showChart();}setInterval(function () {refreshTable();}, 30000);function formatLongNum(num) {return (num + '').replace(/(\d{1,4})(?=(\d{4})+(?:$|\.))/g, '$1 ')}function getQueryVariable(variable) {var query = window.location.search.substring(1);var vars = query.split("&");for (var i = 0; i < vars.length; i++) {var pair = vars[i].split("=");if (pair[0] == variable) {return pair[1];}}return (false);}function showChart() {$.ajax({type: "GET",url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId,success: function (data) {if (data && data.data && data.data.length > 1) {debugger;var chartDom = document.getElementById('main');var myChart = echarts.init(chartDom);var option;var xAxis = [];var serseis = [];for (var i = 0; i < data.data.length; i++) {xAxis.push(data.data[i].time);serseis.push(data.data[i].totalLag);}option = {title: {show: true,text: "Total Lag 趋势图",x: 'center'},xAxis: {type: 'category',data: xAxis},yAxis: {type: 'value'},series: [{data: serseis,type: 'line'}]};myChart.setOption(option);}}});}showChart();</script></body> </html>
源码(注意:博客中的代码比压缩包中的代码新):
https://files-cdn.cnblogs.com/files/s0611163/kafka-consumer-monitor.zip
效果图:
消费者组列表:
消费者组明细:
表格列说明:
1.消费/生产:消费速度sql教程除以生产速度,若大于100%,说明当前消费java基础教程速度比生产速度快,可能正在快速python基础教程消费掉积压的数据;若小于100%,说明当c#教程前消费速度比生产速度慢,会导致数据积压;若该列的值在100%上下波动,接近100%,说明服务稳定;若该列的值波动较大,说明服vb.net教程务不稳定
2.积压(天):根据Total Lag和生产速度估算的数据积压天数,由于夜晚数据量少白天数据量多,生产速度并不是一天的平均值,只是当前几分钟的平均值,所以这个值只是参考,并不准确
数据显示为红色说明:
1.当估算的积压(天)这一列大于2小时,即大于0.8333,积压(天)和Total Lag这两列数据显示为红色
2.当消费/生产这一列数据小于90,消费/生产这一列数据显示为红色
Java Kafka 消费积压监控相关推荐
- 解决kafka消费积压问题
kafka消费积压 前文 问题定位 积压造成的原因 解决方法 更改配置 优化消费端 前文 遇到很多问题是因为消费积压导致的数据延迟,数据对校时问题重重.那么今天就记录下解决这个问题. 问题定位 消费积 ...
- java kafka 消费_java编程之Kafka_消费者API详解
1 消息发送 1.异步发送导入依赖 org.apache.kafka kafka-clients 0.11.0.0 编写代码 需要用到的类: KafkaProducer:需要创建一个生产者对象,用来发 ...
- java kafka 消费_java利用kafka生产消费消息
1.producer程序 package com.test.frame.kafka.controller; import kafka.javaapi.producer.Producer; import ...
- Kafka学习笔记 : 消费进度监控 [ 消费者 Lag 或 Consumer Lag ]
所谓滞后程度,就是指消费者当前落后于生产者的程度. Lag 应该算是最最重要的监控指标了.它直接反映了一个消费者的运行情况.一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示 ...
- Zabbix监控Kafka topic积压数据
Kafka Apache Kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点. Kafka适合离线和在线消息消费. Kafka消息 ...
- java kafka 多线程消费
我们先来看下简单的kafka生产者和消费者模式代码: 生产者KafkaProducer /** * @author xiaofeng * @version V1.0 * @title: KafkaPr ...
- Bug:Zabbix对Kafka topic积压数据监控
简述 <Zabbix监控Kafka topic积压数据>一文的目的是通过Zabbix自动发现实现对多个消费者组的Topic及Partition的Lag进行监控.因在实际监控中发现有问题,为 ...
- Kafka消费组rebalance原理
消费者组是 Kafka 分布式消息处理的一个重要特征,用于管理消费者并促进扩展应用程序的能力.它们将任何一个主题的消费者组合在一起,并且主题内的分区被分配给这些消费者.当组的参与者发生变化时,消费者组 ...
- Kafka设计解析(十三)Kafka消费组(consumer group)
转载自 huxihx,原文链接 Kafka消费组(consumer group) 一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka ...
最新文章
- 国内首个深度学习工程师认证标准发布
- 图解opengl曲线和曲面绘制
- java date dateformat_Java中SimpleDateFormat的使用方法
- OpenGL画图设备上下文与MFC设备上下文的对应
- 转:RabbitMQ 消息队列特性知多少
- 请上传sku预览图后重新操作_拼多多商家版APP新增商品操作步骤
- jquery解析XML
- Java 线程池ThreadPoolExecutor的应用与源码解析
- 安卓案例:利用帧动画实现游戏特效
- pythonjava有什么区别_Python与JAVA有何区别?
- MySQL8 Zip的下载和安装
- Java的JAR包, EAR包 ,WAR包内部结构
- Bossie Awards 开源大数据工具最佳列表
- Java 面向接口编程
- 卡诺图化简及逻辑函数的规范范式:SOP与POS形式
- 赛微微电子通过注册:拟募资8亿 年营收3.4亿
- 德勤:制造业企业数字化转型方案(PPT)
- 常见的防御DDoS攻击的方式有哪些?
- 技术科普丨平台效果调试篇1—灰区和权重
- 封装link或style中的css规则
热门文章
- JavaScript DOM编程艺术小记(五)---第四章-JavaScript图片库(实例)
- packetTracer作业
- STM32之增量式编码器电机测速
- matlab svm 语音识别,【情感识别】基于matlab支持向量机(SVM)的语音情感识别【含Matlab源码 543期】...
- this.Invoke和this.BeginInvoke的区别
- 如何标定加速度传感器的灵敏度
- 解决windows10下telnet的连接问题
- 理解水仙花数(看这一篇就够了)
- kubernetes布署方式介绍
- UNIX环境高级编程——select、poll和epoll