文章目录

  • 管理与监控
    • 32 | KafkaAdminClient:Kafka的运维利器
      • 引入原因
      • 如何使用?
      • 工作原理
      • 构造和销毁 AdminClient 实例
      • 常见的 AdminClient 应用实例

管理与监控

32 | KafkaAdminClient:Kafka的运维利器

引入原因

首先,不论是 Windows 平台,还是 Linux 平台,命令行的脚本都只能运行在控制台上。如果想要在应用程序、运维框架或是监控平台中集成它们,会非常得困难。

其次,这些命令行脚本很多都是通过连接 ZooKeeper 来提供服务的。目前,社区已经越来越不推荐任何工具直连 ZooKeeper 了,因为这会带来一些潜在的问题,比如这可能会绕过 Kafka 的安全设置。

最后,运行这些脚本需要使用 Kafka 内部的类实现,也就是 Kafka 服务器端的代码。实际上,社区还是希望用户只使用 Kafka 客户端代码,通过现有的请求机制来运维管理集群。这样的话,所有运维操作都能纳入到统一的处理机制下,方便后面的功能演进。

基于以上原因,社区于 0.11 版本正式推出了 Java 客户端版的 AdminClient,并不断地在后续的版本中对它进行完善。

如何使用?

如果使用的是 Maven,需要增加以下依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.0</version>
</dependency>

如果使用的是 Gradle,需要增加以下依赖项:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'

功能

在 2.3 版本中,AdminClient 提供的功能有 9 大类。

  1. 主题管理:包括主题的创建、删除和查询。
  2. 权限管理:包括具体权限的配置与删除。
  3. 配置参数管理:包括 Kafka 各种资源的参数设置、详情查询。所谓的 Kafka 资源,主要有
    Broker、主题、用户、Client-id 等。
  4. 副本日志管理:包括副本底层日志路径的变更和详情查询。
  5. 分区管理:即创建额外的主题分区。
  6. 消息删除:即删除指定位移之前的分区消息。
  7. Delegation Token 管理:包括 Delegation Token 的创建、更新、过期和详情查询。
  8. 消费者组管理:包括消费者组的查询、位移查询和删除。
  9. Preferred 领导者选举:推选指定主题分区的 Preferred Broker 为领导者。

工作原理

从设计上来看,AdminClient 是一个双线程的设计:前端主线程和后端 I/O 线程。 前端线程负责将用户要执行的操作转换成对应的请求,然后再将请求发送到后端 I/O 线程的队列中;而后端 I/O 线程从队列中读取相应的请求,然后发送到对应的 Broker 节点上,之后把执行结果保存起来,以便等待前端线程的获取。

如图所示,前端主线程会创建名为 Call 的请求对象实例。该实例有两个主要的任务。

  1. 构建对应的请求对象。 比如,如果要创建主题,那么就创建 CreateTopicsRequest;如果是查询消费者组位移,就创建 OffsetFetchRequest。
  2. 指定响应的回调逻辑。 比如从 Broker 端接收到 CreateTopicsResponse 之后要执行的动作。一旦创建好 Call 实例,前端主线程会将其放入到新请求队列(New Call Queue)中,此时,前端主线程的任务就算完成了。它只需要等待结果返回即可。

后端 I/O 线程使用了 3 个队列来承载不同时期的请求对象,它们分别是新请求队列、待发送请求队列和处理中请求队列。

为什么要使用 3 个呢? 因为目前新请求队列的线程安全是由 Java 的 monitor 锁来保证的。为了确保前端主线程不会因为 monitor 锁被阻塞,后端 I/O 线程会定期地将新请求队列中的所有 Call 实例全部搬移到待发送请求队列中进行处理。待发送请求队列和处理中请求队列只由后端 I/O 线程处理,因此无需任何锁机制来保证线程安全。

当 I/O 线程在处理某个请求时,它会显式地将该请求保存在处理中请求队列。一旦处理完成,I/O 线程会自动地调用 Call 对象中的回调逻辑完成最后的处理。把这些都做完之后,I/O 线程会通知前端主线程说结果已经准备完毕,这样前端主线程能够及时获取到执行操作的结果。AdminClient 是使用 Java Object 对象的 wait 和 notify 实现的这种通知机制。

严格来说,AdminClient 并没有使用 Java 已有的队列去实现上面的请求队列,它是使用 ArrayList 和 HashMap 这样的简单容器类,再配以 monitor 锁来保证线程安全的。

后端 I/O 线程的名字的前缀是 kafka-admin-client-thread如果发现 AdminClient 程序貌似在正常工作,但执行的操作没有返回结果,或者 hang 住了,这可能是因为 I/O 线程出现问题导致的,可以使用 jstack 命令去查看一下 AdminClient 程序,确认下 I/O 线程是否在正常工作。

构造和销毁 AdminClient 实例

AdminClient 对象的完整类路径是 org.apache.kafka.clients.admin.AdminClient,而不是 kafka.admin.AdminClient。后者就是服务器端的 AdminClient,它已经不被推荐使用了。

创建 AdminClient 实例和创建 KafkaProducer 或 KafkaConsumer 实例的方法是类似的,需要手动构造一个 Properties 对象或 Map 对象,然后传给对应的方法。社区专门为 AdminClient 提供了几十个专属参数,最常见而且必须要指定的参数,是 bootstrap.servers 参数。如果要销毁 AdminClient 实例,需要显式调用 AdminClient 的 close 方法。

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);try (AdminClient client = AdminClient.create(props)) {// 执行要做的操作……
}

常见的 AdminClient 应用实例

创建主题

String newTopicName = "test-topic";
try (AdminClient client = AdminClient.create(props)) {NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));result.all().get(10, TimeUnit.SECONDS);
}

这段代码调用 AdminClient 的 createTopics 方法创建对应的主题。构造主题的类是 NewTopic 类,它接收主题名称、分区数和副本数三个字段。

目前,AdminClient 各个方法的返回类型都是名为 ***Result 的对象。这类对象会将结果以 Java Future 的形式封装起来。如果要获取运行结果,需要调用相应的方法来获取对应的 Future 对象,然后再调用相应的 get 方法来取得执行结果。

查询消费者组位移

String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) {ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);Map<TopicPartition, OffsetAndMetadata> offsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);System.out.println(offsets);
}

调用 AdminClient 的 listConsumerGroupOffsets 方法去获取指定消费者组的位移数据。

获取 Broker 磁盘占用

try (AdminClient client = AdminClient.create(props)) {DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定Broker idlong size = 0L;for (Map<String, DescribeLogDirsResponse.LogDirInfo> logDirInfoMap : ret.all().get().values()) {size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(topicPartitionReplicaInfoMap ->topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size)).mapToLong(Long::longValue).sum();}System.out.println(size);
}

这段代码的主要思想是,使用 AdminClient 的 describeLogDirs 方法获取指定 Broker 上所有分区主题的日志路径信息,然后把它们累积在一起,得出总的磁盘占用量。

32 | KafkaAdminClient:Kafka的运维利器相关推荐

  1. 运维利器:万能的 strace

    第一章 运维利器:万能的 strace 高效运维 | 2016-05-24 20:10 个人简介: 王子勇,腾讯高级业务运维工程师,有8年研发与运维工作经验.崇尚开源,喜欢钻研系统技术,曾给rsysl ...

  2. 运维利器万能的 strace

    转载至运维利器万能的 strace | 运维生存时间 原文地址http://www.ttlsa.com/linux-command/winner-versatile-strace/ strace是什么 ...

  3. 运维利器-ClusterShell集群管理操作记录

    在运维实战中,如果有若干台数据库服务器,想对这些服务器进行同等动作,比如查看它们当前的即时负载情况,查看它们的主机名,分发文件等等,这个时候该怎么办?一个个登陆服务器去操作,太傻帽了!写个shell去 ...

  4. DevOps实战 —— 如何高效地远程部署?自动化运维利器 Fabric 教程

    如何高效地远程部署?自动化运维利器 Fabric 教程 关于 Python 自动化的话题,在上一篇文章中,我介绍了 Invoke 库,它是 Fabric 的最重要组件之一.Fabric 也是一个被广泛 ...

  5. 云计算时代,企业IT资产安全运维利器——行云管家堡垒机

    为了保障网络和数据不受来自外部和内部用户的入侵和破坏,企业通常通过部署堡垒机来管理企业内部IT资产.但随着云计算逐渐发展成为企业IT架构的基础设施,传统堡垒机很难适应云的变化,已经无法对企业IT资产进 ...

  6. Kafka常用运维操作命令

    目录 1. 集群管理 2. topic管理 kafka-topic.sh脚本 2.1 列出集群上所有topic 2.2 创建topic 2.3 查看topic详细信息 2.4 修改(增加)topic分 ...

  7. strace跟踪java,linux运维利器—–strace命令

    strace常用来跟踪进程执行时的系统调用和所接收的信号. 在Linux世界,进程不能直接访问硬件设备,当进程需要访问硬件设备(比如读取磁盘文件,接收网络数据等等)时,必须由用户态模式切换至内核态模式 ...

  8. 运维利器:钉钉机器人脚本告警(Linux Shell 篇)

    写在前面的话 目前换了几家公司,且最近几家都是以钉钉作为公司 OA 聊天工具,总的来说还是很不错的.最近去了新公司,由于公司以前没有运维,所以监控,做自动化等方面都没有实施,恰逢这个机会把最近做的关于 ...

  9. nvme驱动_用户态NVMe运维利器 SPDK NVMe 字符设备

    ------------ 作者简介 刘孝冬 Intel 高级软件工程师 专注于开源存储SPDK及ISA-L软件的开发. ------------ 随着数据中心规模的不断扩大与延展,硬件设备的运行维护已 ...

最新文章

  1. QIIME 2教程. 29参考数据库DataResources(2021.2)
  2. javaMP3转pcm 百度语音识别
  3. IC/FPGA笔试/面试题分析(十一)基础概念(三态门等)
  4. ASP.NET Core WebApi 返回统一格式参数
  5. 基于Xml 的IOC 容器-分配路径处理策略
  6. 如何以子类的形式运行多进程?
  7. 好朋友的爬虫共享资料,真佩服
  8. zsacm20120226省赛前个人赛第1场(结题报告)
  9. 【渝粤教育】国家开放大学2018年春季 7406-22T金融统计分析 参考试题
  10. 2022-2-20stream流的复习
  11. 互联⽹名词⼤全——商业模式篇
  12. 99%的手机Root方法都在这里
  13. 扁平化ui设计界面的方式以及扁平化ui图标设计特点
  14. 计算机应用基础考试试题及答案 在word中,用户建立的文件默认,2009年10月全国自考计算机应用基础历年真题...
  15. 解决目标检测中密集遮挡问题——Repulsion loss
  16. GPU阵列 安特卫普大学展示桌面超级计算机Fastra II
  17. 星速配资:煤炭概念股大幅拉升 看好投资机会
  18. WML语言基础(WAP建站)五
  19. 抓住人性的套路出牌,实现利益的最大化
  20. Java计算1到100阶乘和

热门文章

  1. vue实现点击按钮调用摄像头扫码
  2. 川教版八年级计算机教学计划,川教版八年级下信息技术教学计划.doc
  3. 超详细分析Windows变慢原因及解决方法
  4. Windows 2003 变慢原因分析及解决
  5. 【学习笔记】ACP敏捷项目管理
  6. linux 命令 是mmc 大小,u-boot中mmc命令使用
  7. 阿里云短信平台实现手机验证码登录
  8. 使用ffmpeg把mp4与m3u8相互转换的操作
  9. PTA 1031 查验身份证 (c语言)
  10. 如何找出zeppelin的登入帳號密碼?