本文主要讨论一下kafka consumer offset lag的监控

方案

利用官方的类库

ConsumerOffsetChecker

ConsumerGroupCommand

利用官方的JMX

ConsumerOffsetChecker

在0.8.2.2版本如下

kafka_2.10-0.8.2.2-sources.jar!/kafka/tools/ConsumerOffsetChecker.scala

object ConsumerOffsetChecker extends Logging {

private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()

private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()

private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()

private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {

//...

}

private def processPartition(zkClient: ZkClient,

group: String, topic: String, pid: Int) {

//...

}

private def processTopic(zkClient: ZkClient, group: String, topic: String) {

topicPidMap.get(topic) match {

case Some(pids) =>

pids.sorted.foreach {

pid => processPartition(zkClient, group, topic, pid)

}

case None => // ignore

}

}

private def printBrokerInfo() {

println("BROKER INFO")

for ((bid, consumerOpt)

consumerOpt match {

case Some(consumer) =>

println("%s -> %s:%d".format(bid, consumer.host, consumer.port))

case None => // ignore

}

}

def main(args: Array[String]) {

//...

try {

zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)

val topicList = topics match {

case Some(x) => x.split(",").view.toList

case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir + "/owners").toList

}

topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*)

val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq

val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)

debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))

channel.send(OffsetFetchRequest(group, topicPartitions))

val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)

debug("Received offset fetch response %s.".format(offsetFetchResponse))

offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>

if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {

val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)

// this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool

// (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)

try {

val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong

offsetMap.put(topicAndPartition, offset)

} catch {

case z: ZkNoNodeException =>

if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))

offsetMap.put(topicAndPartition,-1)

else

throw z

}

}

else if (offsetAndMetadata.error == ErrorMapping.NoError)

offsetMap.put(topicAndPartition, offsetAndMetadata.offset)

else {

println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))

}

}

channel.disconnect()

println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))

topicList.sorted.foreach {

topic => processTopic(zkClient, group, topic)

}

if (options.has("broker-info"))

printBrokerInfo()

for ((_, consumerOpt)

consumerOpt match {

case Some(consumer) => consumer.close()

case None => // ignore

}

}

catch {

case t: Throwable =>

println("Exiting due to: %s.".format(t.getMessage))

}

finally {

for (consumerOpt

consumerOpt match {

case Some(consumer) => consumer.close()

case None => // ignore

}

}

if (zkClient != null)

zkClient.close()

if (channel != null)

channel.disconnect()

}

}

}

缺点就是该类是给命令行调用的,每调用一次,就new一次zkClient,对于监控用来说,不是太合适,需要改造一下,抽取zkClient出来

ConsumerGroupCommand

0.8.2.2以上版本使用ConsumerGroupCommand替代了ConsumerOffsetChecker

kafka_2.11-0.10.2.1-sources.jar!/kafka/admin/ConsumerGroupCommand.scala

object ConsumerGroupCommand extends Logging {

//...

def main(args: Array[String]) {

val opts = new ConsumerGroupCommandOptions(args)

if (args.length == 0)

CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")

// should have exactly one action

val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)

if (actions != 1)

CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")

opts.checkArgs()

val consumerGroupService = {

if (opts.useOldConsumer) {

System.err.println("Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).\n")

new ZkConsumerGroupService(opts)

} else {

System.err.println("Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).\n")

new KafkaConsumerGroupService(opts)

}

}

try {

if (opts.options.has(opts.listOpt))

consumerGroupService.listGroups().foreach(println(_))

else if (opts.options.has(opts.describeOpt)) {

val (state, assignments) = consumerGroupService.describeGroup()

val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head

assignments match {

case None =>

// applies to both old and new consumer

printError(s"The consumer group '$groupId' does not exist.")

case Some(assignments) =>

if (opts.useOldConsumer)

printAssignment(assignments, false)

else

state match {

case Some("Dead") =>

printError(s"Consumer group '$groupId' does not exist.")

case Some("Empty") =>

System.err.println(s"Consumer group '$groupId' has no active members.")

printAssignment(assignments, true)

case Some("PreparingRebalance") | Some("AwaitingSync") =>

System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")

printAssignment(assignments, true)

case Some("Stable") =>

printAssignment(assignments, true)

case other =>

// the control should never reach here

throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")

}

}

}

else if (opts.options.has(opts.deleteOpt)) {

consumerGroupService match {

case service: ZkConsumerGroupService => service.deleteGroups()

case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")

}

}

} catch {

case e: Throwable =>

printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))

} finally {

consumerGroupService.close()

}

}

}

也是基于命令行来设计的

JMX

这个是利用kafka本身写入的JMX的数据,就不用额外在去像ConsumerOffsetChecker去自己连接再去获取。比如

ObjectName oName = new ObjectName("kafka.producer:*");

Set metricsBeans = mBeanServer.queryNames(oName, null);

for (ObjectName mBeanName : metricsBeans) {

MBeanInfo metricsBean = mBeanServer.getMBeanInfo(mBeanName);

MBeanAttributeInfo[] metricsAttrs = metricsBean.getAttributes();

for (MBeanAttributeInfo metricsAttr : metricsAttrs) {

//get value

Object value = mBeanServer.getAttribute(mBeanName, metricsAttr.getName());

//process ...

}

}

小结

可以自己改造ConsumerOffsetChecker或者ConsumerGroupCommand,然后上报到statsd或者Prometheus。当然能利用JMX是最省事的了。

doc

java 获取kafka lag,聊聊kafka consumer offset lag的监控相关推荐

  1. 聊聊kafka consumer offset lag increase异常

    序 本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常. 查看consumer消费情况 Group Topic Pid Offset logSize Lag O ...

  2. 【kafka】kafka consumer offset lag获取的三者方式

    1.概述 本文主要讨论一下kafka consumer offset lag的监控 方案 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand 利用官方 ...

  3. java 获取泛型_聊聊Java泛型擦除那些事

    >版权申明]非商业目的注明出处可自由转载 博文地址:https://blog.csdn.net/ShuSheng0007/article/details/89789849 出自:shushen ...

  4. kafka监控获取logSize, offset, lag等信息

    由于项目需要,需要查看kafka消费信息lag(lag = logSize - offset) 参考https://www.aliyun.com/jiaocheng/775267.html 的实现方式 ...

  5. java连接kafka api_Kafka-JavaAPI(Producer And Consumer)

    Kafka--JAVA API(Producer和Consumer) Kafka 版本2.11-0.9.0.0 producer package com.yzy.spark.kafka; import ...

  6. java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord

    java消费kafka数据时报错 ERROR [Executor task launch worker for task 90] - Exception in task 0.0 in stage 54 ...

  7. java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接: Flink入门程序异常,记录一下跟大家分享. SLF4J: Failed to l ...

  8. spark2+kafka报错:java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe

    spark读取kafka数据 // Create DataFrame representing the stream of input lines from kafkaval lines = spar ...

  9. Kafka+SparkStreaming+Zookeeper(ZK存储Offset,解决checkpoint问题)

    创建一个topic ./kafka-topics.sh --create --zookeeper 192.168.1.244:2181,192.168.1.245:2181,192.168.1.246 ...

最新文章

  1. Petapoco 连接oracle11g 自动生成poco时遇到的问题
  2. spingboot使用redis连接池报错
  3. 【组合数学】排列组合 ( 多重集组合数示例 | 三个计数模型 | 选取问题 | 多重集组合问题 | 不定方程非负整数解问题 )
  4. 企业日志分析之linux系统message收集展示
  5. php print_r this,PHP 打印函数之 print print_r
  6. 【Python】Python的判断、循环和各种表达式(长文系列第②篇)
  7. Chrome 或将于2018年正式弃用 HPKP 公钥固定标准
  8. Spring Security 3.1 自定义 authentication provider
  9. python kmeans聚类 对二维坐标点聚类_Kmeans均值聚类算法原理以及Python如何实现
  10. 在iView中动态创建表格
  11. python 数据内容保留最后一位_python3-数据结构和算法 -- 1.3 保留最后 N 个元素
  12. es6删除数组某项_javascript基础系列:数组常用方法解析
  13. zigbee的各种profile【裁剪】
  14. 诛仙手游-各属性道法换算
  15. Matlab图形窗口大小的控制 ,plot窗口大小,figure大小,axis设置,实用
  16. 对接支付通道如何收费?支付接口收费标准
  17. IMDB影评数据集无法通过keras下载
  18. Linux系统1.md
  19. 快速排序详细图解分析(含代码示例)
  20. 客户端与服务器信息交互的流程,客户端与服务器的交互流程

热门文章

  1. FLASH鼠绘入门教程,主要让大家熟悉一下简单图形的绘制!
  2. redis之hkeys、hvals、hgetall的使用
  3. docker 下载mysql 8.0_docker安装mysql8.0
  4. GOE:Nintendo Switch™ 对战忍者口香糖动作游戏 『Ninjala』决定于2020年6月25日发售
  5. 服务端是如何主动推送信息到客户端的?
  6. 硬盘坏道如何屏蔽?最全攻略都在这了!
  7. C语言实现Base64编码转码
  8. 分享互联网宣传推广的操作要点
  9. list序列化为string存入数据库
  10. xp系统禁止开机启动服务器,win xp开机启动项怎么设置-win xp关闭开机启动项的方法 - 河东软件园...