前言

消息堆积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等功能正是得益于消息中间件的消息堆积能力。然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的麻烦,比如消息堆积势必会影响上下游整个调用链的时效性,有些中间件如RabbitMQ在发生消息堆积时在某些情况下还会影响自身的性能。对于Kafka而言,虽然消息堆积不会对其自身性能带来多大的困扰,但难免不会影响上下游的业务,堆积过多有可能会造成磁盘爆满,或者触发日志清除策略而造成消息丢失的情况。如何利用好消息堆积这把双刃剑,监控是最为关键的一步。

正文

消息堆积是消费滞后(Lag)的一种表现形式,消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量,也称之为消费滞后(Lag)量。对于Kafka而言,消息被发送至Topic中,而Topic又分成了多个分区(Partition),每一个Partition都有一个预写式的日志文件,虽然Partition可以继续细分为若干个段文件(Segment),但是对于上层应用来说可以将Partition看成最小的存储单元(一个由多个Segment文件拼接的“巨型文件”)。每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。我们来看下图,其就是Partition的一个真实写照:

上图中有四个概念:

  • LogStartOffset:表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。
  • ConsumerOffset:消费位移,表示Partition的某个消费者消费到的位移位置。
  • HighWatermark:简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
  • LogEndOffset:简称LEO,
  • 代表Partition的最高日志位移,其值对消费者不可见。比如在ISR(In-Sync-Replicas)副本数等于3的情况下(如下图所示),消息发送到Leader
  • A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader
  • A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。

要计算Kafka中某个消费者的滞后量很简单,首先看看其消费了几个Topic,然后针对每个Topic来计算其中每个Partition的Lag,每个Partition的Lag计算就显得非常的简单了,参考下图:

由图可知消费Lag=HW - ConsumerOffset。对于这里大家有可能有个误区,就是认为Lag应该是LEO与ConsumerOffset之间的差值,笔者在这之前也犯过这样的错误认知,详细可以参考《如何使用JMX监控Kafka》。LEO是对消费者不可见的,既然不可见何来消费滞后一说。

那么这里就引入了一个新的问题,HW和ConsumerOffset的值如何获取呢?

首先来说说ConsumerOffset,Kafka中有两处可以存储,一个是Zookeeper,而另一个是”__consumer_offsets这个内部topic中,前者是0.8.x版本中的使用方式,但是随着版本的迭代更新,现在越来越趋向于后者。就拿1.0.0版本来说,虽然默认是存储在”__consumer_offsets”中,但是保不齐用于就将其存储在了Zookeeper中了。这个问题倒也不难解决,针对两种方式都去拉取,然后哪个有值的取哪个。不过这里还有一个问题,对于消费位移来说,其一般不会实时的更新,而更多的是定时更新,这样可以提高整体的性能。那么这个定时的时间间隔就是ConsumerOffset的误差区间之一。

再来说说HW,其也是Kafka中Partition的一个状态。有可能你会察觉到在Kafka的JMX中可以看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”这样一个属性,但是这个值不是LEO而是HW。

那么怎样正确的计算消费的Lag呢?对Kafka熟悉的同学可能会想到Kafka中自带的kafka-consumer_groups.sh脚本中就有Lag的信息,示例如下:

1[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID

2TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

3topic-test1 0 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID

4topic-test1 1 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID

5topic-test1 2 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID

6topic-test1 3 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID

71

我们深究一下kafka-consumer_groups.sh脚本,发现只有一句代码:

1exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

其含义就是执行kafka.admin.ConsumerGroupCommand而已。进一步深究,在ConsumerGroupCommand内部抓住了2句关键代码:

1val consumerGroupService = new KafkaConsumerGroupService(opts)

2val (state, assignments) = consumerGroupService.describeGroup()

代码详解:consumerGroupService的类型是ConsumerGroupServicesealed trait类型),而KafkaConsumerGroupService只是ConsumerGroupService的一种实现,还有一种实现是ZkConsumerGroupService,分别对应新版的消费方式(消费位移存储在__consumer_offsets中)和旧版的消费方式(消费位移存储在zk中),详细计算步骤参考下一段落的内容。opt参数是指“ –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID”等参数。第2句代码是调用describeGroup()方法来获取具体的信息,即二元组中的assignments,这个assignments中保存了上面打印信息中的所有内容。

Scala小知识:在Scala中trait(特征)相当于Java的接口,实际上它比接口更大强大。与Java中的接口不同的是,它还可以定义属性和方法的实现(JDK8起的接口默认方法)。一般情况下Scala中的类只能继承单一父类,但是如果是trait的话就可以继承多个,从结果来看是实现了多重继承。被sealed声明的trait仅能被同一文件的类继承。

ZkConsumerGroupService中计算消费lag的步骤如下:

通过zk获取一些基本信息,对应上面打印信息中的:TOPIC、PARTITION、CONSUMER-ID等,不过不会有HOST和CLIENT-ID。

1、通过OffsetFetchRequest请求获取消费位移(offset),如果获取失败则在通过zk获取。

2、通过OffsetReuqest请求获取分区的LogEndOffset(简称为LEO,可见的LEO)。

3、计算LogEndOffset与消费位移的差值来获取lag。

4、KafkaConsumerGroupService中计算消费lag的步骤如下:

通过DescibeGroupsRequest请求获取一些基本信息,不仅包括TOPIC、PARTITION、CONSUMER-ID,还有HOST和CLIENT-ID。其实还有通过

1、FindCoordinatorRequest请求来获取coordinator信息,如果不了解coordinator在这里也没影响。

2、通过OffsetFetchRequest请求获取消费位移。

3、通过OffsetReuqest请求获取分区的LogEndOffset(简称为LEO)。

4、计算LogEndOffset与消费位移的差值来获取lag。

可以看到KafkaConsumerGroupService与ZkConsumerGroupService的计算Lag的方式都差不多,但是KafkaConsumerGroupService能获取更多消费详情,并且ZkConsumerGroupService也被标注为@Deprecated的了,后面内容都针对KafkaConsumerGroupService来做说明。既然Kafka已经为我们提供了线程的方法来获取Lag,那么我们有何必再重复造轮子,这里笔者写了一个调用的KafkaConsumerGroupService的示例(KafkaConsumerGroupService是使用Scala语言编写的,在Java的程序里使用类似scala.collection.Seq这样的全名称以防止混淆):

1String[] agrs = {"--describe

kafka中topic默认属性_分享:Kafka 的 Lag 计算误区及正确实现相关推荐

  1. kafka中topic默认属性_kafka consumer 配置详解

    1.Consumer Group 与 topic 订阅 每个Consumer 进程都会划归到一个逻辑的Consumer Group中,逻辑的订阅者是Consumer Group.所以一条message ...

  2. Kafka 的 Lag 计算误区及正确实现

    前言 消息堆积是消息中间件的一大特色,消息中间件的流量削峰.冗余存储等功能正是得益于消息中间件的消息堆积能力.然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的 ...

  3. Kafka中topic的Partition,Kafka为什么这么快,Consumer的负载均衡及consumerGroup的概念(来自学习笔记)

    1.1. Kafka中topic的Partition  在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic ...

  4. 【kafka】Kafka中Topic级别配置

    1.概述 Kafka中topic级别配置 1.1 Topic级别配置 配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值. 创建topic参数可以设置一个或 ...

  5. kafka中Topic、Partition、Groups、Brokers概念辨析

    kafka消息队列有两种消费模式,分别是点对点模式和订阅/发布模式.具体比较可以参考Kafka基础–消息队列与消费模式. 下图是一个点对点的Kafka结构示意图,其中有以下几个部分: producer ...

  6. kafka中topic、partition、broker、consumerGroup、consumer之间的关系、区别及存在意义

    概念理解 topic: 逻辑概念,用于联系Producer 和 Consumer的message生产和消费.Producer 生产的消息放入一个topic中,由Consumer通过对同一个topic的 ...

  7. kafka offset保存在哪里_《Kafka成神之路》- 索引类型

    在Kafka的数据路径下有很多.index和.timeindex后缀文件: .index文件,即Kafka中的位移索引文件 .timeindex文件,即时间戳索引文件. 1 OffsetIndex - ...

  8. android中怎么保存checkbox中的checked属性_第二十四天HTML中的form表单

    form表单 用于收集用户信息,如:登录.注册等场景:所有要提交的数据都必须放在form标签中 action:提交地址.动作,与input标签中typy标签的submit属性相关联. ,提交地址是ac ...

  9. 上下文异常中的上下文属性_在没有适当上下文的情况下引发异常是一种不良习惯...

    上下文异常中的上下文属性 Allison Anders等人的<四个房间>(1995). 我不断重复同样的错误. 因此,该停止并制定规则以防止这种情况了. 错误不是致命的,但很烦人. 当查看 ...

最新文章

  1. 远程计算机管理权限,肿么获得远程计算机管理员权限
  2. [YTU]_2433( C++习题 对象数组求最大值)
  3. 解析Python中的线程与进程
  4. 1线程概念:线程和进程之间的关系,线程间可共享资源,线程间非共享资源,线程的优缺点
  5. React、PHP送书中奖名单,快看有你么!
  6. 线程数究竟设多少合理
  7. 服务器导流板的作用,前保险杠下导流板的作用是什么?
  8. 上帝手中的骰子——无所不能的贝叶斯(上篇)
  9. C语言解释器的实现--让脚本跑起来(六)
  10. Python开发工具PyCharm的web开发教程:创建并运行 Python 项目
  11. apollo简易高精度地图制作
  12. 线代 | 矩阵的迹 向量内积如何转化为迹
  13. 王家林 构建spark集群
  14. 截段八面体堆积 matlab,截角八面体可以充满空间
  15. 干货集中营(分享)-每日分享妹子图和技术干货
  16. STM32F103C8T6在Arduino IDE里编程
  17. 多层嵌套的CSS 3D动画技术详解
  18. HTML+CSS 土豆网鼠标经过显示遮罩
  19. python打开浏览器不显示浏览器页面_python+selenium 浏览器无界面模式运行
  20. 采访:来自于公众号的干货资料,助力兄弟斩获36W年薪岗!

热门文章

  1. 外设驱动库开发笔记39:按键操作驱动
  2. python中有数组吗_python有数组吗
  3. Linux系统常用的基本命令【转载CSDN象在舞】
  4. python中下拉菜单大小_自动化测试——Selenium+Python之下拉菜单的定位
  5. 什么是面向对象_什么是面向对象?新手程序员必掌握的技能
  6. 用python绘制熊猫图案_绘制带有熊猫和Matplotlib的一分钟烛台
  7. java 虚拟机 字节码,JAVA虚拟机:虚拟机字节码执行引擎
  8. js立即执行函数_《JS 函数的执行时机》
  9. linux查看日志命令_查看log日志基础命令
  10. 【LeetCode笔记】79. 单词搜索 剑指 Offer 12 矩阵中的路径(Java、dfs)