欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/how-to-monitor-kafka-with-jmx/

使用kafka做消息队列中间件时,为了实时监控其性能时,免不了要使用jmx调取kafka broker的内部数据,不管是自己重新做一个kafka集群的监控系统,还是使用一些开源的产品,比如yahoo的kafka manager, 其都需要使用jmx来监控一些敏感的数据。在kafka官网中 http://kafka.apache.org/082/documentation.html#monitoring 这样说:

Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.
The easiest way to see the available metrics to fire up jconsole and point it at a running kafka client or server; this will all browsing all metrics with JMX.

可见kafka官方也是提倡使用jmx并且提供了jmx的调用给用户以监控kafka.

本博文通过使用jmx调用kafka的几个监测项属性来讲述下如何使用jmx来监控kafka.
有关Jmx的使用可以参考:

  • 从零开始玩转JMX(一)——简介和Standard MBean
  • 从零开始玩转JMX(二)——Condition
  • 从零开始玩转JMX(三)——Model MBean
  • 从零开始玩转JMX(四)——Apache Commons Modeler & Dynamic MBean

在使用jmx之前需要确保kafka开启了jmx监控,kafka启动时要添加JMX_PORT=9999这一项,也就是:

JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &

博主自行搭建了一个kafka集群,只有两个节点。集群中有一个topic(name=default_channel_kafka_zzh_demo),分为5个partition(0 1 2 3 4).

这里讨论的kafka版本是0.8.1.x和0.8.2.x,这两者在使用jmx监控时会有差异,差异体现在ObjectName之中。熟悉kafka的同学知道,kafka有topic和partition这两个概念,topic中根据一定的策略来分为若干个partitions, 这里就以此举例来看,
在0.8.1.x中有关此项的属性的ObjectName(String值)为:
“kafka.log”:type=“Log”,name=“default_channel_kafka_zzh_demo-*-LogEndOffset”

而在0.8.2.x中有关的属性的ObjectName为:
kafka.log:type=Log,name=LogEndOffset,topic=default_channel_kafka_zzh_demo,partition=0

所以在程序中要区别对待。

这里采用三个监测项来演示如果使用jmx进行监控:

  1. 上面所说的offset (集群中的一个topic下的所有partition的LogEndOffset值,即logSize)
  2. sendCount(集群中的一个topic下的发送总量,这个值是集群中每个broker中此topic的发送量之和)
  3. sendTps(集群中的一个topic下的TPS, 这个值也是集群中每个broker中此topic的发送量之和)

首先是针对单个kafka broker的。

package kafka.jmx;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;/*** Created by hidden on 2016/12/8.*/
public class JmxConnection {private static Logger log = LoggerFactory.getLogger(JmxConnection.class);private MBeanServerConnection conn;private String jmxURL;private String ipAndPort = "localhost:9999";private int port = 9999;private boolean newKafkaVersion = false;public JmxConnection(Boolean newKafkaVersion, String ipAndPort){this.newKafkaVersion = newKafkaVersion;this.ipAndPort = ipAndPort;}public boolean init(){jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);try {JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);conn = connector.getMBeanServerConnection();if(conn == null){log.error("get connection return null!");return  false;}} catch (MalformedURLException e) {e.printStackTrace();return false;} catch (IOException e) {e.printStackTrace();return false;}return true;}public String getTopicName(String topicName){String s;if (newKafkaVersion) {s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName;} else {s = "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"" + topicName + "-MessagesInPerSec\"";}return s;}/*** @param topicName: topic name, default_channel_kafka_zzh_demo* @return 获取发送量(单个broker的,要计算某个topic的总的发送量就要计算集群中每一个broker之和)*/
public long getMsgInCountPerSec(String topicName){String objectName = getTopicName(topicName);Object val = getAttribute(objectName,"Count");String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName;if(val !=null){log.info("{}, Count:{}",debugInfo,(long)val);return (long)val;}return 0;
}/*** @param topicName: topic name, default_channel_kafka_zzh_demo* @return 获取发送的tps,和发送量一样如果要计算某个topic的发送量就需要计算集群中每一个broker中此topic的tps之和。*/public double getMsgInTpsPerSec(String topicName){String objectName = getTopicName(topicName);Object val = getAttribute(objectName,"OneMinuteRate");if(val !=null){double dVal = ((Double)val).doubleValue();return dVal;}return 0;}private Object getAttribute(String objName, String objAttr){ObjectName objectName =null;try {objectName = new ObjectName(objName);} catch (MalformedObjectNameException e) {e.printStackTrace();return null;}return getAttribute(objectName,objAttr);}private Object getAttribute(ObjectName objName, String objAttr){if(conn== null){log.error("jmx connection is null");return null;}try {return conn.getAttribute(objName,objAttr);} catch (MBeanException e) {e.printStackTrace();return null;} catch (AttributeNotFoundException e) {e.printStackTrace();return null;} catch (InstanceNotFoundException e) {e.printStackTrace();return null;} catch (ReflectionException e) {e.printStackTrace();return null;} catch (IOException e) {e.printStackTrace();return null;}}/*** @param topicName* @return 获取topicName中每个partition所对应的logSize(即offset)*/public Map<Integer,Long> getTopicEndOffset(String topicName){Set<ObjectName> objs = getEndOffsetObjects(topicName);if(objs == null){return null;}Map<Integer, Long> map = new HashMap<>();for(ObjectName objName:objs){int partId = getParId(objName);Object val = getAttribute(objName,"Value");if(val !=null){map.put(partId,(Long)val);}}return map;}private int getParId(ObjectName objName){if(newKafkaVersion){String s = objName.getKeyProperty("partition");return Integer.parseInt(s);}else {String s = objName.getKeyProperty("name");int to = s.lastIndexOf("-LogEndOffset");String s1 = s.substring(0, to);int from = s1.lastIndexOf("-") + 1;String ss = s.substring(from, to);return Integer.parseInt(ss);}}private Set<ObjectName> getEndOffsetObjects(String topicName){String objectName;if (newKafkaVersion) {objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*";}else{objectName = "\"kafka.log\":type=\"Log\",name=\"" + topicName + "-*-LogEndOffset\"";}ObjectName objName = null;Set<ObjectName> objectNames = null;try {objName = new ObjectName(objectName);objectNames = conn.queryNames(objName,null);} catch (MalformedObjectNameException e) {e.printStackTrace();return  null;} catch (IOException e) {e.printStackTrace();return null;}return objectNames;}
}

注意代码中对于两种不同kafka版本的区别处理。对应前面所说的三个检测项的方法为:

public Map<Integer,Long> getTopicEndOffset(String topicName)
public long getMsgInCountPerSec(String topicName)
public double getMsgInTpsPerSec(String topicName)

对于整个集群的处理需要另外一个类来保证,总体上是对集群中的每一个broker相应的值进行累加,且看代码:

package kafka.jmx;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** Created by hidden on 2016/12/8.*/
public class JmxMgr {private static Logger log = LoggerFactory.getLogger(JmxMgr.class);private static List<JmxConnection> conns = new ArrayList<>();public static boolean init(List<String> ipPortList, boolean newKafkaVersion){for(String ipPort:ipPortList){log.info("init jmxConnection [{}]",ipPort);JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort);boolean bRet = conn.init();if(!bRet){log.error("init jmxConnection error");return false;}conns.add(conn);}return true;}public static long getMsgInCountPerSec(String topicName){long val = 0;for(JmxConnection conn:conns){long temp = conn.getMsgInCountPerSec(topicName);val += temp;}return val;}public static double getMsgInTpsPerSec(String topicName){double val = 0;for(JmxConnection conn:conns){double temp = conn.getMsgInTpsPerSec(topicName);val += temp;}return val;}public static Map<Integer, Long> getEndOffset(String topicName){Map<Integer,Long> map = new HashMap<>();for(JmxConnection conn:conns){Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName);if(tmp == null){log.warn("get topic endoffset return null, topic {}", topicName);continue;}for(Integer parId:tmp.keySet()){//change if biggerif(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){map.put(parId, tmp.get(parId));}}}return map;}public static void main(String[] args) {List<String> ipPortList = new ArrayList<>();ipPortList.add("xx.101.130.1:9999");ipPortList.add("xx.101.130.2:9999");JmxMgr.init(ipPortList,true);String topicName = "default_channel_kafka_zzh_demo";System.out.println(getMsgInCountPerSec(topicName));System.out.println(getMsgInTpsPerSec(topicName));System.out.println(getEndOffset(topicName));}
}

运行结果:

2016-12-08 19:25:32 -[INFO] - [init jmxConnection [xx.101.130.1:9999]] - [kafka.jmx.JmxMgr:20]
2016-12-08 19:25:32 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35]
2016-12-08 19:25:33 -[INFO] - [init jmxConnection [xx.101.130.2:9999]] - [kafka.jmx.JmxMgr:20]
2016-12-08 19:25:33 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35]
2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:6000] - [kafka.jmx.JmxConnection:73]
2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:4384] - [kafka.jmx.JmxConnection:73]
10384
3.915592283987704E-65
{0=2072, 1=2084, 2=2073, 3=2083, 4=2072}

观察运行结果可以发现 6000+4384 = 10384 = 2072+2084+2073+2083+2072,小伙伴们可以揣摩下原因。
可以通过jconsole连接service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi或者service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi来查看相应的数据值。如下图:

也可以通过命令行的形式来查看某项数据,不过这里要借助一个jar包:cmdline-jmxclient-0.xx.3.jar,这个请自行下载,网上很多。
将这个jar放入某一目录,博主这里放在了linux系统下的/root/util目录中,以offset举例:
0.8.1.x版-读取topic=default_channel_kafka_zzh_demo,partition=0的Value值:

java -jar cmdline-jmxclient-0.10.3.jar - xx.101.130.1:9999 '"kafka.log":type="Log",name="default_channel_kafka_zzh_demo-0-LogEndOffset"' Value

0.8.2.x版-读取topic=default_channel_kafka_zzh_demo,partition=0的Value值:

java -jar cmdline-jmxclient-0.10.3.jar - xx.101.130.1:9999 kafka.log:type=Log,name=LogEndOffset,topic=default_channel_kafka_zzh_demo,partition=0

看出规律了嘛?如果还是没有,博主再提示一个小技巧,你可以用Jconsole打开相应的属性,然后将鼠标浮于其上,Jconsole会跳出tooltips来提示怎么拼这些属性的ObjectName.

欢迎跳转到本文的原文链接:https://honeypps.com/mq/how-to-monitor-kafka-with-jmx/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

如何使用JMX监控Kafka相关推荐

  1. 使用JMX监控Kafka

    http://blog.csdn.net/eric_sunah/article/details/44980385?utm_source=tuicool 使用JMX监控Kafka 标签: KafkaJM ...

  2. 【kafka】JMX 监控kafka FINER RMI TCP getConnectionId IOException

    1.背景 使用Jmx监控kafka相关信息,但是运行的时候报错如下 我的代码大致逻辑是 JMXServiceUrl jmx = new JMXServiceUrl(url) JMXConnector ...

  3. 【kafka】JMX 监控kafka kafka rmi NoSuchObjectException no such object in table

    1.背景 使用Jmx监控kafka相关信息,但是运行的时候报错如下 我的代码大致逻辑是 JMXServiceUrl jmx = new JMXServiceUrl(url) JMXConnector ...

  4. 【kafka】java使用jmx 监控Kafka

    1.概述 想使用java jmx监控kafka,关于jmx相关的概念请参考 [Java]java jmx 入门案例 进阶版参考:[Spring]SpringBoot 如何使用JMX 2.kafkal开 ...

  5. Kafka JMX 监控 之 jmxtrans + influxdb + grafana

    目录 效果图 环境准备 安装 influxdb 安装我们刚刚下载 influxdb rpm文件 查看默认配置 修改参数 启动 influxdb 查看启动状态 设置基本配置 influxdb 其他命令扩 ...

  6. 【Kafka】Window下kafka开启JMX监控

    1.概述 因为需要,需要在windows下开启kafka,然后kafka开启JMX监控 同样是修改kafka-server-start文件,但是修改的是kafka-server-start.bat I ...

  7. gc日志怎么看_你应该怎么监控Kafka?

    监控 Kafka,历来都是个老大难的问题.无论是在我维护的微信公众号,还是 Kafka QQ群里面,大家问得最多的问题,一定是 Kafka 的监控.大家提问的内容看似五花八门,但真正想了解的,其实都是 ...

  8. Prometheus监控Kafka集群

    prometheus监控kafka常见的有两种开源方案,一种是传统的部署exporter的方式,一种是通过jmx配置监控,本文将采用第二种方式进行配置. 项目地址: kafka_exporter:ht ...

  9. 10 Confluent_Kafka权威指南 第十章:监控kafka

    文章目录 CHAPTER 10 Monitoring Kafka 监控kafka Metric Basics 度量基础 Where Are the Metrics? 度量标准在哪? Internal ...

最新文章

  1. C语言运算符的优先级
  2. Spring的一些资源
  3. 基于Shodan Python库的批量攻击实践 撒旦网
  4. 用友华表cell的程序发布
  5. B-Trees Concepts B-树介绍(都快忘了:))
  6. 通过composer安装阿里大于接口扩展
  7. Linux下core文件调试方法收藏
  8. 关于计算机哪些学校好,计算机哪些学校好
  9. k8s核心技术-Helm(chart模板的使用下)---K8S_Google工作笔记0049
  10. 大数据平台目前存在的问题
  11. 用adb pull命令从android系统中读取文件失败的原因及解决办法
  12. 前端所有安全问题总结
  13. PPT母版和PPT模板
  14. UOJ275 [清华集训2016] 组合数问题 【Lucas定理】【数位DP】
  15. 如何查看win10电脑系统盘是哪个盘?
  16. 原麦格纳亚洲区总裁布鲁诺兰伯特出任宝沃汽车全球总裁
  17. 将多个word文档的内容合并到一个文档
  18. gethostbyname, gethostbyaddr(原来百度叫shifen 十分?)
  19. 么是形参?什么是实参?
  20. Bootloader的启动与功能

热门文章

  1. 在 IntelliJ IDEA 中误添加自定义的 JavaDoc 标签,如何删除
  2. Cortex-M3-异常与中断-向量表 s
  3. Data Lake Analytics: 使用DataWorks来调度DLA任务
  4. 基于HTML5的WebGL呈现A星算法的3D可视化
  5. iOS 一个开发者账号 多台Mac 共用
  6. Hadoop/HIVE错误解决方案汇总
  7. Android的手势交互
  8. Releasing Contexts 释放上下文
  9. 数据结构的简单理解(1)
  10. 牛客 - 共鸣问题(贪心+思维)