Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用
KafkaOffsetMonitor是一个可以用于监控Kafka的Topic及Consumer消费状况的工具,其配置和使用特别的方便。源项目Github地址为:https://github.com/quantifind/KafkaOffsetMonitor。
最简单的使用方式是从Github上下载一个最新的KafkaOffsetMonitor-assembly-0.2.1.jar,上传到某服务器上,然后执行一句命令就可以运行起来。但是在使用过程中有可能会发现页面反应缓慢或者无法显示相应内容的情况。据说这是由于jar包中的某些js等文件需要连接到网络,或者需要翻墙导致的。网上找的一个修改版的KafkaOffsetMonitor对应jar包,可以完全在本地运行,经过测试效果不错。下载地址是:http://pan.baidu.com/s/1ntzIUPN,在此感谢一下贡献该修改版的原作者。链接失效的话,可以博客下方留言联系我。
一、KafkaOffsetMonitor的使用
因为完全没有安装配置的过程,所以直接从KafkaOffsetMonitor的使用开始。
将KafkaOffsetMonitor-assembly-0.2.0.jar上传到服务器后,可以新建一个脚本用于启动该应用。脚本内容如下:
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \com.quantifind.kafka.offsetapp.OffsetGetterWeb \--zk m000:2181,m001:2181,m002:2181 \--port 8088 \--refresh 10.seconds \--retain 2.days
各参数的作用可以参考一下Github上的描述:
- offsetStorage valid options are ”zookeeper”, ”kafka” or ”storm”. Anything else falls back to ”zookeeper”
- zk the ZooKeeper hosts
- port on what port will the app be available
- refresh how often should the app refresh and store a point in the DB
- retain how long should points be kept in the DB
- dbName where to store the history (default ‘offsetapp’)
- kafkaOffsetForceFromStart only applies to ”kafka” format. Force KafkaOffsetMonitor to scan the commit messages from start (see notes below)
- stormZKOffsetBase only applies to ”storm” format. Change the offset storage base in zookeeper, default to ”/stormconsumers” (see notes below)
pluginsArgs additional arguments used by extensions (see below)
启动后,访问
m000:8088
端口,可以看到如下页面:
在这个页面上,可以看到当前Kafka集群中现有的Consumer Groups。在上图中有一个Visualizations选项卡,点击其中的Cluster Overview可以查看当前Kafka集群的Broker情况
接下来将继续上一篇Kafka相关的文章Kafka系列之-自定义Producer,在最后对Producer进行包装的基础上,分别实现一个简单的往随机Partition写messge,以及自定义Partitioner的Producer,对KafkaOffsetMonitor其他页面进行展示。
二、简单的Producer
1、新建一个Topic
首先为本次试验新建一个Topic,命令如下:
bin/kafka-topics.sh \--create \--zookeeper m000:2181 \--replication-factor 3 \--partition 3 \--topic kafkamonitor-simpleproducer
2、新建SimpleProducer代码
在上一篇文章中提到的Producer封装Github代码的基础上,写了一个往kafkamonitor-simpleproducer发送message的java代码。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;/*** Created by ckm on 2016/8/30.*/
public class SimpleProducer {public static void main(String[] args) {KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();int i = 0;String message = "";while (true) {message = "test-simple-producer : " + i ++;kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message);}}
}
程序运行效果:
3、ConsoleConsumer消费该topic
用kafka自带的ConsoleConsumer消费kafkamonitor-simpleproducer中的message。
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer
消费截图如下:
4、KafkaOffsetMonitor页面
(1)在Topic List选项卡中,我们可以看到刚才新建的kafkamonitor-simpleproducer
(2)点开后,能看到有一个console-consumer正在消费该topic
(3)继续进入该Consumer,可以查看该Consumer当前的消费状况
这张图片的左上角显示了当前Topic的生产速率,右上角显示了当前Consumer的消费速率。
图片中还有三种颜色的线条,蓝色的表示当前Topic中的Message数目,灰色的表示当前Consumer消费的offset位置,红色的表示蓝色灰色的差值,即当前Consumer滞后于Producer的message数目。
(4)看一眼各partition中的message消费情况
从上图可以看到,当前有3个Partition,每个Partition中的message数目分布很不均匀。这里可以与接下来的自定义Producer的情况进行一个对比。
三、自定义Partitioner的Producer
1、新建一个Topic
bin/kafka-topics.sh \--create \--zookeeper m000:2181 \--replication-factor 3 \--partition 3 \--topic kafkamonitor-partitionedproducer
2、Partitioner代码
逻辑很简单,循环依次往各Partition中发送message。
import kafka.producer.Partitioner;/*** Created by ckm on 2016/8/30.*/
public class TestPartitioner implements Partitioner {public TestPartitioner() {}@Overridepublic int partition(Object key, int numPartitions) {int intKey = (int) key;return intKey % numPartitions;}
}
3、Producer代码
将自定义的Partitioner设置到Producer,其他调用过程和二中类似。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;/*** Created by ckm on 2016/8/30.*/
public class PartitionedProducer {public static void main(String[] args) {KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner");int i = 0;String message = "";while (true) {message = "test-partitioner-producer : " + i;System.out.println(message);kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message);i ++;}}
}
代码运行效果如下图:
4、ConsoleConsumer消费Message
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-partitionedproducer
消费效果如下图:
5、KafkaOffsetMonitor页面
其他页面与上面的类似,这里只观察一下每个partition中的message数目与第二节中的对比。可以看到这里每个Partition中message分别是很均匀的。
注意事项:
注意这里有一个坑,默认情况下Producer往一个不存在的Topic发送message时会自动创建这个Topic。由于在这个封装中,有同时传递message和topic的情况,如果调用方法时传入的参数反了,将会在Kafka集群中自动创建Topic。在正常情况下,应该是先把Topic根据需要创建好,然后Producer往该Topic发送Message,最好把Kafka这个默认自动创建Topic的功能关掉。
那么,假设真的不小心创建了多余的Topic,在删除时,会出现“marked for deletion”提示,只是将该topic标记为删除,使用list命令仍然能看到。如果需要调整这两个功能的话,在server.properties中配置如下两个参数:
参数 | 默认值 | 作用 |
---|---|---|
auto.create.topics.enable | true | Enable auto creation of topic on the server |
delete.topic.enable | false | Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off |
转载自https://blog.csdn.net/dabokele/article/details/52373960
Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用相关推荐
- Kafka系列之-Kafka Protocol实例分析
本文基于A Guide To The Kafka Protocol文档,以及Spark Streaming中实现的org.apache.spark.streaming.kafka.KafkaClust ...
- kafka系列之kafka生产者与分区(3)
概要 当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量? 举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消 ...
- Kafka监控工具KafkaOffsetMonitor配置及使用
一.KafkaOffsetMonitor简述 KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的 ...
- 【kafka系列】kafka之生产者发送消息实践
目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...
- 《Kafka系列》Kafka详细教程入门
Kafka 1 消息队列--消息中间件 1.1 消息队列的作用 1.2 消息队列的概念--MQ Message 在互联网中,多台设备产生通信的数据的总称:可以是视频.文本.音频等等. Quene 一种 ...
- Kafka系列 - 10 Kafka副本|分区副本分配|手动调整分区副本|Leader Partition 负载平衡|增加副本因子
文章目录 1. 分区副本分配 2. 手动调整分区副本 3. Leader Partition 负载平衡 4. 增加副本因子 1. 分区副本分配 如果 kafka 服务器只有 4 个节点,那么设置 ka ...
- Kafka系列 - 14 Kafka消费者|分区的分配策略及再平衡|Range|RoundRobin|Sticky|CooperativeSticky
文章目录 1. 分区的分配以及再平衡 2. Range 分区分配以及再平衡 3. RoundRobin 分区分配以及再平衡 4. Sticky 分区分配以及再平衡 1. 分区的分配以及再平衡 一个co ...
- Kafka系列(五)、开启SASL安全认证以及配置ACL权限控制
目录 开启SASL 控制台配置用户 ACL授权 Python客户端访问 ACL常用命令 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴 ...
- Kafka系列(七)、Kafka套件 Confluent Platform 单机/集群部署
目录 简介 单机部署 集群部署 尾巴 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴开源的kafka-manager编译及部署使用 k ...
最新文章
- java开发常见的热词奇解
- 谷歌chrome浏览器的源码分析(三)
- 算法 --- 递归实现多级树展开结构
- jQuery表单校验
- Linux入门之常用命令(15) lsof
- httpd 处理模型
- 进程(并发,并行) join start 进程池 (同步异步)
- java easyui分页源码_SpringMVC+easyUI中datagrid分页实现_2014.5.1
- MySQL MHA详解(一)——基本原理
- mysql默认dba_DBA 基本常识 - 安装完 MySQL 后必须调整的 10 项配置 - iTeknical
- 300本计算机编程的经典书籍下载
- 全球编程厉害的14位大佬
- Python爬虫实例1
- 马哥Python培训怎么样——再次革命性升级
- WPS表格如何快速添加斜线表头
- w ndows摄像头驱动怎么安,电脑中如何安装摄像头驱动
- [产品]博客文章被企业群组收录的方法
- web网页动态分享facebook和twitter
- 无法打开文件“.lib”
- 微软发布2015年7月安全公告 安全狗提醒及时修复
热门文章
- php闪屏程序,节日闪屏的两种构成方式
- 当包装类的要与基本类型进行比较时候 需要先将包装类降级为基本类型
- java8-Optional
- 19.Delete Documents-官方文档摘录
- java io流缓冲理解
- continue break return的区别
- Hibernate框架之HQL查询与Criteria 查询的区别
- Kendo Web UI Grid里时间格式转换
- 桥梁模式 :Bridge(转自Terrylee)
- win10共享打印错误0x0000006_Win7打印机无法共享提示错误代码0x000006d9的解决方法...