本文主要讨论一下kafka consumer offset lag的监控

方案

  • 利用官方的类库

ConsumerOffsetChecker
ConsumerGroupCommand

  • 利用官方的JMX

ConsumerOffsetChecker

在0.8.2.2版本如下
kafka_2.10-0.8.2.2-sources.jar!/kafka/tools/ConsumerOffsetChecker.scala

object ConsumerOffsetChecker extends Logging {private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {//...}private def processPartition(zkClient: ZkClient,group: String, topic: String, pid: Int) {//...}private def processTopic(zkClient: ZkClient, group: String, topic: String) {topicPidMap.get(topic) match {case Some(pids) =>pids.sorted.foreach {pid => processPartition(zkClient, group, topic, pid)}case None => // ignore}}private def printBrokerInfo() {println("BROKER INFO")for ((bid, consumerOpt) <- consumerMap)consumerOpt match {case Some(consumer) =>println("%s -> %s:%d".format(bid, consumer.host, consumer.port))case None => // ignore}}def main(args: Array[String]) {//...try {zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)val topicList = topics match {case Some(x) => x.split(",").view.toListcase None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir +  "/owners").toList}topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*)val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeqval channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))channel.send(OffsetFetchRequest(group, topicPartitions))val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)debug("Received offset fetch response %s.".format(offsetFetchResponse))offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)// this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool// (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)try {val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLongoffsetMap.put(topicAndPartition, offset)} catch {case z: ZkNoNodeException =>if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))offsetMap.put(topicAndPartition,-1)elsethrow z}}else if (offsetAndMetadata.error == ErrorMapping.NoError)offsetMap.put(topicAndPartition, offsetAndMetadata.offset)else {println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))}}channel.disconnect()println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))topicList.sorted.foreach {topic => processTopic(zkClient, group, topic)}if (options.has("broker-info"))printBrokerInfo()for ((_, consumerOpt) <- consumerMap)consumerOpt match {case Some(consumer) => consumer.close()case None => // ignore}}catch {case t: Throwable =>println("Exiting due to: %s.".format(t.getMessage))}finally {for (consumerOpt <- consumerMap.values) {consumerOpt match {case Some(consumer) => consumer.close()case None => // ignore}}if (zkClient != null)zkClient.close()if (channel != null)channel.disconnect()}}
}

缺点就是该类是给命令行调用的,每调用一次,就new一次zkClient,对于监控用来说,不是太合适,需要改造一下,抽取zkClient出来

ConsumerGroupCommand

0.8.2.2以上版本使用ConsumerGroupCommand替代了ConsumerOffsetChecker
kafka_2.11-0.10.2.1-sources.jar!/kafka/admin/ConsumerGroupCommand.scala

object ConsumerGroupCommand extends Logging {//...def main(args: Array[String]) {val opts = new ConsumerGroupCommandOptions(args)if (args.length == 0)CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")// should have exactly one actionval actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)if (actions != 1)CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")opts.checkArgs()val consumerGroupService = {if (opts.useOldConsumer) {System.err.println("Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).\n")new ZkConsumerGroupService(opts)} else {System.err.println("Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).\n")new KafkaConsumerGroupService(opts)}}try {if (opts.options.has(opts.listOpt))consumerGroupService.listGroups().foreach(println(_))else if (opts.options.has(opts.describeOpt)) {val (state, assignments) = consumerGroupService.describeGroup()val groupId = opts.options.valuesOf(opts.groupOpt).asScala.headassignments match {case None =>// applies to both old and new consumerprintError(s"The consumer group '$groupId' does not exist.")case Some(assignments) =>if (opts.useOldConsumer)printAssignment(assignments, false)elsestate match {case Some("Dead") =>printError(s"Consumer group '$groupId' does not exist.")case Some("Empty") =>System.err.println(s"Consumer group '$groupId' has no active members.")printAssignment(assignments, true)case Some("PreparingRebalance") | Some("AwaitingSync") =>System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")printAssignment(assignments, true)case Some("Stable") =>printAssignment(assignments, true)case other =>// the control should never reach herethrow new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")}}}else if (opts.options.has(opts.deleteOpt)) {consumerGroupService match {case service: ZkConsumerGroupService => service.deleteGroups()case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")}}} catch {case e: Throwable =>printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))} finally {consumerGroupService.close()}}
}

也是基于命令行来设计的

JMX

这个是利用kafka本身写入的JMX的数据,就不用额外在去像ConsumerOffsetChecker去自己连接再去获取。比如

            ObjectName oName = new ObjectName("kafka.producer:*");Set<ObjectName> metricsBeans = mBeanServer.queryNames(oName, null);for (ObjectName mBeanName : metricsBeans) {MBeanInfo metricsBean = mBeanServer.getMBeanInfo(mBeanName);MBeanAttributeInfo[] metricsAttrs = metricsBean.getAttributes();for (MBeanAttributeInfo metricsAttr : metricsAttrs) {//get valueObject value = mBeanServer.getAttribute(mBeanName, metricsAttr.getName());//process ...}}

小结

可以自己改造ConsumerOffsetChecker或者ConsumerGroupCommand,然后上报到statsd或者Prometheus。当然能利用JMX是最省事的了。

doc

  • kafka官方JMX+Reporters

聊聊kafka consumer offset lag的监控相关推荐

  1. java 获取kafka lag,聊聊kafka consumer offset lag的监控

    序 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方的JM ...

  2. 聊聊kafka consumer offset lag increase异常

    序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常. 查看consumer消费情况 Group Topic Pid Offset logSize Lag O ...

  3. 【kafka】kafka consumer offset lag获取的三者方式

    1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...

  4. Flink当中使用kafka Consumer

    Flink与kafka结合使用的三个优势: 第一:kafka可以作为Flink的Source和Sink来使用: 第二:Kafka的Partition机制和Flink的并行度机制可以深度结合,从而提高数 ...

  5. Kafka学习笔记 : 消费进度监控 [ 消费者 Lag 或 Consumer Lag ]

    所谓滞后程度,就是指消费者当前落后于生产者的程度. Lag 应该算是最最重要的监控指标了.它直接反映了一个消费者的运行情况.一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示 ...

  6. kafka监控获取logSize, offset, lag等信息

    由于项目需要,需要查看kafka消费信息lag(lag = logSize - offset) 参考https://www.aliyun.com/jiaocheng/775267.html 的实现方式 ...

  7. Kafka Consumer位移(Offset)提交——解决Consumer重复消费和消息丢失问题

    本文目录 1.Consumer 位移(offset) 1.2 位移(offset)的作用 2. 位移(offset)提交导致的问题 2.1 消息丢失 2.2 消息重复消费 3 Consumer位移提交 ...

  8. kafka consumer消费者 offset groupID详解

    kafka consumer:消费者可以从多个broker中读取数据.消费者可以消费多个topic中的数据. 因为Kafka的broker是无状态的,所以consumer必须使用partition o ...

  9. 聊聊kafka client chunkQueue 与 MaxLag值

    为什么80%的码农都做不了架构师?>>>    序 前面一篇文章讨论了ConsumerFetcherManager的MaxLag与ConsumerOffsetChecker的lag值 ...

  10. Kafka Consumer多线程实例

    Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖.社区最近也在探讨正式用这套consumer API替换Scala ...

最新文章

  1. 学学这个垃圾×××网站怎么埋头赚大钱的!
  2. ipv6寻址_有类和无类寻址:IPV4寻址| 计算机网络
  3. mysql 运行 compile_install mysql by compile(通过编译安装mysql)
  4. 我和51CTO的缘分【我与51CTO一“七”成长】
  5. 海思hi3518 移植live555 实现H264的RTSP播放
  6. 月薪仅18K的NLP工程师,回炉重造吧!
  7. javascript oo实现
  8. 【导航仿真】基于matlab GUI PSINS导航仿真【含Matlab源码 1496期】
  9. Chrome安装Axure RP插件
  10. 锁存器、触发器和寄存器
  11. 基于JTT808协议的车联网网关中间件
  12. 干货|龙芯架构明御综合日志审计分析平台迁移技术
  13. pintos (1) -- Alarm Clock
  14. android 头像球_Android自定义View实现圆形头像效果
  15. 45.帧缓冲设备(Framebuffer),LCD
  16. 1024分辨率《X战警:第一战》BD中英双字无水印
  17. 北大博士把“计算机底层原理“讲的如此通俗易懂
  18. Android 按键消息处理 1
  19. 手机拍的照片计算机内存不足怎么办,苹果手机照片太多,内存不够用怎么办?...
  20. 免费解决Android studio 3.0更新后搜狗输入法卡顿问题

热门文章

  1. bzoj1355——2016——3——15
  2. 查看手机已经记住的WIFI密码
  3. android Cursor用法
  4. c3p0,dbcp与druid 三大连接池的区别[转]
  5. fw-cloud-framework项目配置、启动问题
  6. 面试题(4)--基础篇
  7. 回顾Vue计算属性VS其他语法有感
  8. [HDU1754]I Hate It线段树裸题
  9. java web 中的servlet讲解
  10. 通向从容之道——Getting things done读书笔记