Java Kafka 消费积压监控

后端代码:

Monitor.java代码:

package com.suncreate.kafkaConsumerMonitor.service;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;/*** kafka消费监控** @author suxiang*/
public class Monitor {private static final Logger log = LoggerFactory.getLogger(Monitor.class);private String servers;private String topic;private String groupId;private long lastTime;private long lastTotalLag = 0L;private long lastLogSize = 0L;private long lastOffset = 0L;private double lastRatio = 0;private long speedLogSize = 0L;private long speedOffset = 0L;private String time;private List<ConsumerInfo> list;private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");public String getTime() {return time;}public void setTime(String time) {this.time = time;}public long getLastTotalLag() {return lastTotalLag;}public double getLastRatio() {return lastRatio;}public String getTopic() {return topic;}public String getGroupId() {return groupId;}public long getSpeedLogSize() {return speedLogSize;}public long getSpeedOffset() {return speedOffset;}public List<ConsumerInfo> getList() {return list;}public void setList(List<ConsumerInfo> list) {this.list = list;}private KafkaConsumer<String, String> consumer;private List<TopicPartition> topicPartitionList;private final DecimalFormat decimalFormat = new DecimalFormat("0.00");private ConsumerGroupsService consumerGroupsService;private String groupIdShort;private boolean needUpdate;/*** kafka消费监控** @param servers* @param consumerGroupsService* @param topic* @param groupId* @param needUpdate            true:需要更新 groupId 和 KafkaConsumer,groupId传递前缀即可;false:不需要更新 groupId 和 KafkaConsumer,groupId传递全称*/public Monitor(String servers, ConsumerGroupsService consumerGroupsService, String topic, String groupId, boolean needUpdate) {this.servers = servers;this.topic = topic;this.groupIdShort = groupId;this.groupId = consumerGroupsService.getGroupId(topic, groupId);this.consumerGroupsService = consumerGroupsService;this.needUpdate = needUpdate;this.list = new ArrayList<>();//消费者consumer = createConsumer();//查询 topic partitionstopicPartitionList = new ArrayList<>();List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);for (PartitionInfo partitionInfo : partitionInfoList) {TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());topicPartitionList.add(topicPartition);}}public void monitor(boolean addToList) {try {long startTime = System.currentTimeMillis();//查询 log sizeMap<Integer, Long> endOffsetMap = new HashMap<>();Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitionList);for (TopicPartition partitionInfo : endOffsets.keySet()) {endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));}//查询消费 offsetMap<Integer, Long> commitOffsetMap = new HashMap<>();for (TopicPartition topicAndPartition : topicPartitionList) {OffsetAndMetadata committed = consumer.committed(topicAndPartition);commitOffsetMap.put(topicAndPartition.partition(), committed.offset());}long endTime = System.currentTimeMillis();log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");//累加laglong totalLag = 0L;long logSize = 0L;long offset = 0L;if (endOffsetMap.size() == commitOffsetMap.size()) {for (Integer partition : endOffsetMap.keySet()) {long endOffset = endOffsetMap.get(partition);long commitOffset = commitOffsetMap.get(partition);long diffOffset = endOffset - commitOffset;totalLag += diffOffset;logSize += endOffset;offset += commitOffset;}} else {log.error("Topic:" + topic + "  consumer:" + consumer + "  topic partitions lost");}log.info("Topic:" + topic + "  logSize:" + logSize + "  offset:" + offset + "  totalLag:" + totalLag);if (lastTime > 0) {if (System.currentTimeMillis() - lastTime > 0) {speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0));speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0));}if (speedLogSize > 0) {String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0));lastRatio = Double.parseDouble(strRatio);log.info("Topic:" + topic + "  speedLogSize:" + speedLogSize + "  speedOffset:" + speedOffset + "  百分比:" + strRatio + "%");}}lastTime = System.currentTimeMillis();lastTotalLag = totalLag;lastLogSize = logSize;lastOffset = offset;if (addToList) {this.setTime(simpleDateFormat.format(new Date()));this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime()));if (this.list.size() > 500) {this.list.remove(0);}}} catch (Exception e) {log.error("Monitor error", e);}}private KafkaConsumer<String, String> createConsumer() {//消费者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new KafkaConsumer<String, String>(properties);}/*** 更新 groupId 和 KafkaConsumer*/public void update() {if (needUpdate) {try {String oldGroupId = this.groupId;this.groupId = consumerGroupsService.getGroupId(topic, groupIdShort);log.info("groupId 已更新 旧groupId=" + oldGroupId + " 新groupId=" + this.groupId);if (this.consumer != null) {try {this.consumer.close();} catch (Exception e) {log.error("consumer close error", e);}this.consumer = null;}this.consumer = createConsumer();log.info("KafkaConsumer 已更新");} catch (Exception e) {log.error("Monitor update error", e);}}}}

MonitorService.java代码:

package com.suncreate.kafkaConsumerMonitor.service;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.*;@Service
public class MonitorService {private static final Logger log = LoggerFactory.getLogger(MonitorService.class);@Value("${kafka.consumer.servers}")private String servers;@Autowiredprivate ConsumerGroupsService consumerGroupsService;private List<Monitor> monitorList;@PostConstructprivate void Init() {monitorList = new ArrayList<>();monitorList.add(new Monitor(servers, consumerGroupsService, "wifiData", "wifi-kafka-hbase", false));monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC", "EXTRACT-SAMPLE", false));monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC", "dblrecog-upload2vcn", false));monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC_FILTER", "yisa", true));monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC_FILTER", "kafka-filter-check", true));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "unifiedstorage-downloader", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "full-vehicle-data-storage-kafka2ch", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vehicle_store", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vcn-sk-upload-luyang", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vcn-sk-upload-yaohai", false));monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vcn-sk-upload-baohe", false));monitorList.add(new Monitor(servers, consumerGroupsService, "peopleFace", "kafka-filter-check", true));}public void monitorOnce(boolean addToList) {for (Monitor monitor : monitorList) {monitor.monitor(addToList);}}public List<ConsumerInfo> getConsumerList() {List<ConsumerInfo> list = new ArrayList<>();for (Monitor monitor : monitorList) {list.add(new ConsumerInfo(monitor.getTopic(), monitor.getGroupId(), monitor.getLastTotalLag(), monitor.getLastRatio(), monitor.getSpeedLogSize(), monitor.getSpeedOffset(), monitor.getTime()));}return list;}public List<ConsumerInfo> getDetails(String topic, String groupId) {for (Monitor monitor : monitorList) {if (monitor.getTopic().equals(topic) && monitor.getGroupId().equals(groupId)) {return monitor.getList();}}return new ArrayList<>();}/*** 更新 Monitor 和 consumerGroupsService*/public void update() {consumerGroupsService.update();for (Monitor monitor : monitorList) {monitor.update();}}}

ConsumerGroupsService.java代码:

用于获取kafka的topic下的所有消费者组,new Monitor传的groupId参数可能不是消费者组的全称,所以需要从topic的所有消费者组中匹配到全称。

由于对接的是华为FusionInsight平台的Kafka,所以需要使用带身份认证的端口连接,才能使用AdminClient类获取到所有消费者组。代码里把不带安全认证的端口21005换成带安全认证的端口21007。

package com.suncreate.kafkaConsumerMonitor.service;import kafka.admin.AdminClient;
import kafka.coordinator.group.GroupOverview;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import scala.collection.JavaConversions;import javax.annotation.PostConstruct;
import java.util.*;@Service
public class ConsumerGroupsService {private static final Logger log = LoggerFactory.getLogger(ConsumerGroupsService.class);@Value("${kafka.consumer.servers}")private String servers;private List<GroupOverview> groupListAll;@PostConstructprivate void Init() {try {//身份认证System.setProperty("java.security.auth.login.config", "/home/server/import/conf/jaas.conf");System.setProperty("java.security.krb5.conf", "/home/server/import/conf/krb5.conf");//System.setProperty("java.security.auth.login.config", "D:/Project/shiny/kafka-consumer-monitor/conf/jaas.conf");//System.setProperty("java.security.krb5.conf", "D:/Project/shiny/kafka-consumer-monitor/conf/krb5.conf");groupListAll = getAllGroups();} catch (Exception e) {log.error("ConsumerGroupsService Init 失败", e);}}private List<GroupOverview> getAllGroups() {List<GroupOverview> list = new ArrayList<>();Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers.replace("21005", "21007"));properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");AdminClient client = AdminClient.create(properties);try {list = scala.collection.JavaConversions.seqAsJavaList(client.listAllGroupsFlattened().toSeq());if (list != null) {log.info("ConsumerGroupsService Init 获取所有消费者组 成功 groupListAll size=" + groupListAll.size());} else {log.error("ConsumerGroupsService Init 获取所有消费者组 失败 groupListAll=null");}} catch (Exception e) {log.error("ConsumerGroupsService Init 获取所有消费者组 失败", e);} finally {client.close();}return list;}public String getGroupId(String topic, String groupId) {java.util.Set<String> groups = getConsumerGroups(topic);for (String item : groups) {if (item.indexOf(groupId) >= 0) {return item;}}return groupId;}private java.util.Set<String> getConsumerGroups(String topic) {Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers.replace("21005", "21007"));properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");AdminClient client = AdminClient.create(properties);java.util.Set<String> groups = new HashSet<String>();try {if (groupListAll != null) {for (GroupOverview overview : groupListAll) {String groupID = overview.groupId();Map<TopicPartition, Object> offsets = JavaConversions.mapAsJavaMap(client.listGroupOffsets(groupID));for (TopicPartition partition : offsets.keySet()) {if (partition.topic().equals(topic)) {groups.add(groupID);}}}}log.info("Topic:" + topic + " 消费者组集合:" + groups);} catch (Exception e) {log.error("getConsumerGroups error", e);} finally {client.close();}return groups;}public void update() {this.groupListAll = getAllGroups();}}

MonitorConfig.java代码:

package com.suncreate.kafkaConsumerMonitor.task;import com.suncreate.kafkaConsumerMonitor.service.MonitorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;import java.text.SimpleDateFormat;@Configuration
@EnableScheduling
public class MonitorConfig implements SchedulingConfigurer {private static final Logger logger = LoggerFactory.getLogger(MonitorConfig.class);private String cronExpression = "0 */3 * * * ?";//private String cronExpression = "*/20 * * * * ?";private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@Autowiredprivate MonitorService monitorService;@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {taskRegistrar.addTriggerTask(() -> {monitorService.update();monitorService.monitorOnce(true);}, triggerContext -> new CronTrigger(cronExpression).nextExecutionTime(triggerContext));}
}

MonitorController.java代码:

package com.suncreate.kafkaConsumerMonitor.controller;import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;
import com.suncreate.kafkaConsumerMonitor.model.LayuiData;
import com.suncreate.kafkaConsumerMonitor.service.MonitorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.List;@RestController
@RequestMapping("/monitor")
public class MonitorController {@Autowiredprivate MonitorService monitorService;@GetMapping("/getConsumers")public LayuiData getConsumers() {List<ConsumerInfo> list = monitorService.getConsumerList();LayuiData data = new LayuiData(list);return data;}@GetMapping("/monitorOnce")public void monitorOnce() {monitorService.monitorOnce(false);}@GetMapping("/getDetails")public LayuiData getDetails(String topic, String groupId) {List<ConsumerInfo> list = monitorService.getDetails(topic, groupId);LayuiData data = new LayuiData(list);return data;}
}

pom.xml文件(有些东西没用到或者备用,没有删):

pom文件中引用的jar包,跟开源的jar包版本完全一致,但jar包中的内容大不相同,所以必须引用华为平台给的jar包才行。需要注意jar包依赖的jar包也不能使用开源jar包,一定要引用到华为平台给的jar包。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.6.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.suncreate</groupId><artifactId>kafka-consumer-monitor</artifactId><version>1.0</version><name>kafka-consumer-monitor</name><description>Kafka消费积压监控预警</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.54</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.0</version></dependency><!-- postgresql --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><scope>runtime</scope></dependency><!-- elasticsearch --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>6.1.4</version></dependency><!-- oracle --><dependency><groupId>com.oracle</groupId><artifactId>ojdbc6</artifactId><version>11.1.0.7.0</version></dependency><!-- kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.11.0.1</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion><exclusion><groupId>net.sf.jopt-simple</groupId><artifactId>jopt-simple</artifactId></exclusion><exclusion><groupId>com.yammer.metrics</groupId><artifactId>metrics-core</artifactId></exclusion><exclusion><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></exclusion><exclusion><groupId>com.101tec</groupId><artifactId>zkclient</artifactId></exclusion><exclusion><groupId>org.scala-lang.modules</groupId><artifactId>scala-parser-combinators_2.11</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.1</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId></exclusion><exclusion><groupId>org.xerial.snappy</groupId><artifactId>snappy-java</artifactId></exclusion></exclusions></dependency><!-- kafka_2.11 依赖的jar包 --><dependency><groupId>net.sf.jopt-simple</groupId><artifactId>jopt-simple</artifactId><version>5.0.3</version><classifier>huawei</classifier></dependency><dependency><groupId>com.yammer.metrics</groupId><artifactId>metrics-core</artifactId><version>2.2.0</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.11</version><classifier>huawei</classifier></dependency><dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.10</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-parser-combinators_2.11</artifactId><version>1.0.4</version><classifier>huawei</classifier><exclusions><exclusion><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version><classifier>huawei</classifier></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><classifier>huawei</classifier></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.1</version><classifier>huawei</classifier></dependency><!-- kafka-clients 依赖的jar包 --><dependency><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId><version>1.3.0</version><classifier>huawei</classifier></dependency><dependency><groupId>org.xerial.snappy</groupId><artifactId>snappy-java</artifactId><version>1.1.2.6</version><classifier>huawei</classifier></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build></project>

前端使用了 Layui 和 ECharts 展示表格和图表

index.css代码:

.div-title {font-size: 18px;margin-top: 10px;margin-left: 10px;
}.div-right {text-align: right;
}.span-red {color: #ff0000;
}

index.html代码(展示topic、消费者组Consumer GroupId、Total Lag、Kafka数据生产速度、Kafka数据消费速度等):

<!DOCTYPE html>
<html lang="zh">
<head><meta charset="UTF-8"><title>Title</title><link rel="stylesheet" href="css/index.css"><link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" media="all"><script type="text/javascript" src="js/jquery-1.7.1.js"></script><script type="text/javascript" src="js/layui-v2.6.8/layui.js" charset="utf-8"></script>
</head>
<body><div class="div-title">Kafka 监控<button type="button" class="layui-btn layui-btn-sm" οnclick="refreshTable()">刷新</button>
</div>
<table class="layui-hide" id="myTable"></table><script type="text/javascript">var myTable;layui.use('table', function () {var table = layui.table;myTable = table.render({elem: '#myTable',url: '/home/monitor/getConsumers',cellMinWidth: 80, //全局定义常规单元格的最小宽度cols: [[{field: 'topic', width: 300, title: 'topic', sort: true},{field: 'groupId', width: 300, title: 'groupId'},{field: 'totalLag', width: 150, title: 'Total Lag', sort: true, templet: function (d) {if (d.delayDay * 24 > 2) {return '<div class="div-right"><span class="span-red">' + formatLongNum(d.totalLag) + '</span></div>'} else {return '<div class="div-right"><span>' + formatLongNum(d.totalLag) + '</span></div>'}}},{field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) {return '<div class="div-right">' + d.speedLogSize + '</div>'}},{field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: function (d) {return '<div class="div-right">' + d.speedOffset + '</div>'}},{field: 'ratio', width: 100, title: '消费/生产', templet: function (d) {if (d.ratio < 90) {return '<div class="div-right"><span class="span-red">' + d.ratio + '%</span></div>'} else {return '<div class="div-right"><span>' + d.ratio + '%</span></div>'}}},{field: 'delayDay', width: 150, title: '积压(天)', sort: true, templet: function (d) {if (d.delayDay * 24 > 2) {return '<div class="div-right"><span class="span-red">' + d.delayDay + '</span></div>'} else {return '<div class="div-right"><span>' + d.delayDay + '</span></div>'}}},{field: 'ope', width: 100, title: '操作', templet: function (d) {return '<a href="/home/detail.html?topic=' + d.topic + '&groupId=' + d.groupId + '" target="_blank" class="layui-btn layui-btn-sm" >详细</a>';}}]]});});function refreshTable() {if (myTable) {myTable.reload();}}setInterval(function () {refreshTable();}, 30000);function formatLongNum(num) {return (num + '').replace(/(\d{1,4})(?=(\d{4})+(?:$|\.))/g, '$1 ')}// setInterval(function () {//     $.get("/home/monitor/monitorOnce");// }, 30000);</script></body>
</html>

detail.html代码(展示单个消费者组的Total Lag、生产速度、消费速度以及Total Lag趋势图):

<!DOCTYPE html>
<html lang="zh">
<head><meta charset="UTF-8"><title>Title</title><link rel="stylesheet" href="css/index.css"><link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" media="all"><script type="text/javascript" src="js/jquery-1.7.1.js"></script><script type="text/javascript" src="js/layui-v2.6.8/layui.js" charset="utf-8"></script><script type="text/javascript" src="js/echarts-v4.7.0/echarts.min.js"></script>
</head>
<body><div class="div-title"><span id="detailTitle"></span> 明细<button type="button" class="layui-btn layui-btn-sm" οnclick="refreshTable()">刷新</button>
</div>
<div id="main" style="height:400px;"></div>
<table class="layui-hide" id="test"></table><script type="text/javascript">var myTable;var topic = getQueryVariable("topic");var groupId = getQueryVariable("groupId");$("#detailTitle").html(topic + "  " + groupId);layui.use('table', function () {var table = layui.table;myTable = table.render({elem: '#test',url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId,cellMinWidth: 80, //全局定义常规单元格的最小宽度initSort: {field: 'time', //排序字段,对应 cols 设定的各字段名type: 'desc' //排序方式  asc: 升序、desc: 降序、null: 默认排序},cols: [[{field: 'topic', width: 300, title: 'topic'},{field: 'groupId', width: 300, title: 'groupId'},{field: 'time', width: 180, title: '时间', sort: true},{field: 'totalLag', width: 150, title: 'Total Lag', templet: function (d) {if (d.delayDay * 24 > 2) {return '<div class="div-right"><span class="span-red">' + formatLongNum(d.totalLag) + '</span></div>'} else {return '<div class="div-right"><span>' + formatLongNum(d.totalLag) + '</span></div>'}}},{field: 'speedLogSize', width: 150, title: '生产速度(条/秒)', templet: function (d) {return '<div class="div-right">' + d.speedLogSize + '</div>'}},{field: 'speedOffset', width: 150, title: '消费速度(条/秒)', templet: function (d) {return '<div class="div-right">' + d.speedOffset + '</div>'}},{field: 'ratio', width: 100, title: '消费/生产', templet: function (d) {if (d.ratio < 90) {return '<div class="div-right"><span class="span-red">' + d.ratio + '%</span></div>'} else {return '<div class="div-right"><span>' + d.ratio + '%</span></div>'}}},{field: 'delayDay', width: 150, title: '积压(天)', templet: function (d) {if (d.delayDay * 24 > 2) {return '<div class="div-right"><span class="span-red">' + d.delayDay + '</span></div>'} else {return '<div class="div-right"><span>' + d.delayDay + '</span></div>'}}}]]});});function refreshTable() {if (myTable) {myTable.reload();}showChart();}setInterval(function () {refreshTable();}, 30000);function formatLongNum(num) {return (num + '').replace(/(\d{1,4})(?=(\d{4})+(?:$|\.))/g, '$1 ')}function getQueryVariable(variable) {var query = window.location.search.substring(1);var vars = query.split("&");for (var i = 0; i < vars.length; i++) {var pair = vars[i].split("=");if (pair[0] == variable) {return pair[1];}}return (false);}function showChart() {$.ajax({type: "GET",url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId,success: function (data) {if (data && data.data && data.data.length > 1) {debugger;var chartDom = document.getElementById('main');var myChart = echarts.init(chartDom);var option;var xAxis = [];var serseis = [];for (var i = 0; i < data.data.length; i++) {xAxis.push(data.data[i].time);serseis.push(data.data[i].totalLag);}option = {title: {show: true,text: "Total Lag 趋势图",x: 'center'},xAxis: {type: 'category',data: xAxis},yAxis: {type: 'value'},series: [{data: serseis,type: 'line'}]};myChart.setOption(option);}}});}showChart();</script></body>
</html>

源码(注意:博客中的代码比压缩包中的代码新):

https://files-cdn.cnblogs.com/files/s0611163/kafka-consumer-monitor.zip

效果图:

消费者组列表:

消费者组明细:

表格列说明:
1.消费/生产:消费速度sql教程除以生产速度,若大于100%,说明当前消费java基础教程速度比生产速度快,可能正在快速python基础教程消费掉积压的数据;若小于100%,说明当c#教程前消费速度比生产速度慢,会导致数据积压;若该列的值在100%上下波动,接近100%,说明服务稳定;若该列的值波动较大,说明服vb.net教程务不稳定
2.积压(天):根据Total Lag和生产速度估算的数据积压天数,由于夜晚数据量少白天数据量多,生产速度并不是一天的平均值,只是当前几分钟的平均值,所以这个值只是参考,并不准确

数据显示为红色说明:
1.当估算的积压(天)这一列大于2小时,即大于0.8333,积压(天)和Total Lag这两列数据显示为红色
2.当消费/生产这一列数据小于90,消费/生产这一列数据显示为红色

Java Kafka 消费积压监控相关推荐

  1. 解决kafka消费积压问题

    kafka消费积压 前文 问题定位 积压造成的原因 解决方法 更改配置 优化消费端 前文 遇到很多问题是因为消费积压导致的数据延迟,数据对校时问题重重.那么今天就记录下解决这个问题. 问题定位 消费积 ...

  2. java kafka 消费_java编程之Kafka_消费者API详解

    1 消息发送 1.异步发送导入依赖 org.apache.kafka kafka-clients 0.11.0.0 编写代码 需要用到的类: KafkaProducer:需要创建一个生产者对象,用来发 ...

  3. java kafka 消费_java利用kafka生产消费消息

    1.producer程序 package com.test.frame.kafka.controller; import kafka.javaapi.producer.Producer; import ...

  4. Kafka学习笔记 : 消费进度监控 [ 消费者 Lag 或 Consumer Lag ]

    所谓滞后程度,就是指消费者当前落后于生产者的程度. Lag 应该算是最最重要的监控指标了.它直接反映了一个消费者的运行情况.一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示 ...

  5. Zabbix监控Kafka topic积压数据

    Kafka Apache Kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点. Kafka适合离线和在线消息消费. Kafka消息 ...

  6. java kafka 多线程消费

    我们先来看下简单的kafka生产者和消费者模式代码: 生产者KafkaProducer /** * @author xiaofeng * @version V1.0 * @title: KafkaPr ...

  7. Bug:Zabbix对Kafka topic积压数据监控

    简述 <Zabbix监控Kafka topic积压数据>一文的目的是通过Zabbix自动发现实现对多个消费者组的Topic及Partition的Lag进行监控.因在实际监控中发现有问题,为 ...

  8. Kafka消费组rebalance原理

    消费者组是 Kafka 分布式消息处理的一个重要特征,用于管理消费者并促进扩展应用程序的能力.它们将任何一个主题的消费者组合在一起,并且主题内的分区被分配给这些消费者.当组的参与者发生变化时,消费者组 ...

  9. Kafka设计解析(十三)Kafka消费组(consumer group)

    转载自 huxihx,原文链接 Kafka消费组(consumer group) 一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka ...

最新文章

  1. 国内首个深度学习工程师认证标准发布
  2. 图解opengl曲线和曲面绘制
  3. java date dateformat_Java中SimpleDateFormat的使用方法
  4. OpenGL画图设备上下文与MFC设备上下文的对应
  5. 转:RabbitMQ 消息队列特性知多少
  6. 请上传sku预览图后重新操作_拼多多商家版APP新增商品操作步骤
  7. jquery解析XML
  8. Java 线程池ThreadPoolExecutor的应用与源码解析
  9. 安卓案例:利用帧动画实现游戏特效
  10. pythonjava有什么区别_Python与JAVA有何区别?
  11. MySQL8 Zip的下载和安装
  12. Java的JAR包, EAR包 ,WAR包内部结构
  13. Bossie Awards 开源大数据工具最佳列表
  14. Java 面向接口编程
  15. 卡诺图化简及逻辑函数的规范范式:SOP与POS形式
  16. 赛微微电子通过注册:拟募资8亿 年营收3.4亿
  17. 德勤:制造业企业数字化转型方案(PPT)
  18. 常见的防御DDoS攻击的方式有哪些?
  19. 技术科普丨平台效果调试篇1—灰区和权重
  20. 封装link或style中的css规则

热门文章

  1. JavaScript DOM编程艺术小记(五)---第四章-JavaScript图片库(实例)
  2. packetTracer作业
  3. STM32之增量式编码器电机测速
  4. matlab svm 语音识别,【情感识别】基于matlab支持向量机(SVM)的语音情感识别【含Matlab源码 543期】...
  5. this.Invoke和this.BeginInvoke的区别
  6. 如何标定加速度传感器的灵敏度
  7. 解决windows10下telnet的连接问题
  8. 理解水仙花数(看这一篇就够了)
  9. kubernetes布署方式介绍
  10. UNIX环境高级编程——select、poll和epoll