所谓滞后程度,就是指消费者当前落后于生产者的程度。

Lag 应该算是最最重要的监控指标了。它直接反映了一个消费者的运行情况。一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。

通常来说,Lag 的单位是消息数,而且我们一般是在主题这个级别上讨论 Lag 的,但实际上,Kafka 监控 Lag 的层级是在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值。

你在实际业务场景中必须时刻关注消费者的消费进度。

消费进度监控3 种方法。

  1. 使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
  2. 使用 Kafka Java Consumer API 编程。
  3. 使用 Kafka 自带的 JMX 监控指标。

Kafka 自带命令

Kafka 自带的命令行工具 bin/kafka-consumer-groups.sh

kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。


$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值。

示例: Kafka 集群的连接信息,即 localhost:9092。消费者组名:testgroup

它会按照消费者组订阅主题的分区进行展示,每个分区一行数据;其次,除了主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(即 LOG-END-OFFSET 列值)、该消费者组当前最新消费消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前两者的差值)、消费者实例 ID消费者连接 Broker 的主机名以及消费者的 CLIENT-ID 信息。

Kafka Java Consumer API [ Kafka 2.0.0 ]

代码示例:

第 1 处是调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移;

第 2 处则是获取订阅分区的最新消息位移;

第3 处就是执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象。


public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {Properties props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);try (AdminClient client = AdminClient.create(props)) {ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);try {Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));}} catch (InterruptedException e) {Thread.currentThread().interrupt();// 处理中断异常// ...return Collections.emptyMap();} catch (ExecutionException e) {// 处理ExecutionException// ...return Collections.emptyMap();} catch (TimeoutException e) {throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);}}}

Kafka JMX 监控指标

Kafka 默认提供的 JMX 监控指标来监控消费者的 Lag 值

Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标,里面有很多属性。和我们今天所讲内容相关的有两组属性:records-lag-maxrecords-lead-min它们分别表示此消费者在测试窗口时间内曾经达到的最大的 Lag 值和最小的 Lead 值。

Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理。

监控到 Lag 越来越大,消费者程序变得越来越慢了,至少是追不上生产者程序了.

Lead 越来越小,甚至是快接近于 0 了,消费者端要丢消息了

Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。倘若你的消费者程序足够慢,慢到它要消费的数据快被 Kafka 删除了,这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象。

Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。

引用:

Kafka核心技术与实战 - 胡夕

Kafka学习笔记 : 消费进度监控 [ 消费者 Lag 或 Consumer Lag ]相关推荐

  1. Kafka学习笔记 --- 生产者producer与消费者关系comsumer

    生产者:生产者可以将数据发布到所选择的topic(主题)中.生产者负责将记录分配到topic的哪一个 partition(分区)中.可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数( ...

  2. kafka_消费者组消费进度监控实现

    对于 Kafka 消费者,最重要的就是监控它们的消费进度,或者说监控它们消费的滞后程度(消费者 Lag 或 Consumer Lag). 所谓滞后程度,就是指消费者当前落后于生产者的程度.Lag 的单 ...

  3. Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover

    1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...

  4. 大数据 -- kafka学习笔记:知识点整理(部分转载)

    一 为什么需要消息系统 1.解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多 ...

  5. Kafka学习笔记(一):什么是消息队列?什么是Kafka?

    目录 一.消息队列的概述 (一)前置知识点 1.集群和分布式 2.队列(Queue)的含义 3.同步与异步的含义 (二)消息队列的含义与特点 二.Kafka (一) 概述 (二) 常用名词含义 导航栏 ...

  6. 13.zabbix学习笔记:zabbix监控之短信报警

    zabbix学习笔记:zabbix监控之短信报警 zabbix的报警方式有多种,除了常见的邮件报警外,特殊情况下还需要设置短信报警和微信报警等额外方式.本篇文章向大家介绍短信报警. 短信报警设置 短信 ...

  7. Kafka学习笔记(十)kakfa消费组和重平衡

    版权声明:本文为转载文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 原文链接:https://blog.csdn.net/weixin_39468305/articl ...

  8. Kafka学习笔记: Kafka 百惑梳理

    1. 消息经常堆积起来,不能消费了,重启服务就能继续消费了. 消息堆积可能原因如下:1. 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS:2. consumer消 ...

  9. Kafka学习笔记1

    目录 一.kafka概貌 1.主题与分区 2.压缩与解压缩 3.消费者组 4.位移主题 二.kafka重要参数 1.参数列表 2.如何防止消息丢失 三.幂等生产者与事务生产者 四.TCP连接 通过学习 ...

最新文章

  1. 艾伟:FCKeditor 配置、扩展
  2. [Linux]学习笔记(4)-su及passwd的用法介绍
  3. oracle修改表结构精度,常见问题--oracle10g修改表结构
  4. 从双十一强化体验认知,看苏宁的“自增强回路”增长飞轮
  5. React开发(235):document.body.clientHeight
  6. java学习(161):同步代码块
  7. 信息学奥赛C++语言:调整试题顺序
  8. php执行linux命令的6个函数
  9. (50)IO的延迟约束(输入延迟约束)
  10. Linux之文件属性详解
  11. bzoj1385 [Baltic2000]Division expression
  12. 提高页面渲染速度的建议以及方案
  13. c语言中的EOF是什么意思
  14. 前端之JS篇(七)——Web APIsDOM部分内容
  15. 恩淑 2004-12-18
  16. 基于JAVA宠物喂养资讯分享平台的设计与实现计算机毕业设计源码+系统+lw文档+部署
  17. 安装虚拟机步骤 详细
  18. ROS2极简总结-Nav2-行为树
  19. LINUX IIO子系统分析之一 IIO子系统概述
  20. 使用java进行SSL证书的签名与签验

热门文章

  1. 店宝宝:是什么事情让各大巨头抢着做?
  2. PHP 判断链表是否相交
  3. 嵌入式硬件工程师应具备哪些基本技能?
  4. OpenStack Swift学习笔记
  5. linux系统修改ip地址教程。
  6. postfix+extmail邮件系统完整安装
  7. Framer多车型切换实现的网络功能
  8. 串口服务器 linux,基于Linux的串口服务器设计与实现
  9. 场景特征描述子(全局特征)-GIST
  10. 机器学习算法:kNN和Weighted kNN