欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-admin-client-1-principles-and-demos/


前言

一般情况下,我们都习惯使用Kafka中bin目录下的脚本工具来管理查看Kafka,但是有些时候需要将某些管理查看的功能集成到系统(比如Kafka Manager)中,那么就需要调用一些API来直接操作Kafka了。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端代码,采用Scala编写)下的AdminClient和AdminUtils来实现部分的集群管理操作,比如笔者之前在Kafka解析之topic创建(1)和Kafka解析之topic创建(2)两篇文章中所讲解的Topic的创建就用到了AdminUtils类。而在Kafka0.11.0.0版本之后,又多了一个AdminClient,这个是在kafka-client包下的,这是一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient,这个就是本文所要陈述的重点了。

功能与原理介绍

在Kafka官网中这么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准):

  1. 创建Topic:createTopics(Collection<NewTopic> newTopics)
  2. 删除Topic:deleteTopics(Collection<String> topics)
  3. 罗列所有Topic:listTopics()
  4. 查询Topic:describeTopics(Collection<String> topicNames)
  5. 查询集群信息:describeCluster()
  6. 查询ACL信息:describeAcls(AclBindingFilter filter)
  7. 创建ACL信息:createAcls(Collection<AclBinding> acls)
  8. 删除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
  9. 查询配置信息:describeConfigs(Collection<ConfigResource> resources)
  10. 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
  11. 修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
  12. 查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers)
  13. 查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
  14. 增加分区:createPartitions(Map<String, NewPartitions> newPartitions)

其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。主要实现步骤:

  1. 客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
  2. 客户端发送请求至Kafka Broker。
  3. Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
  4. 客户端接收相应的回执并进行解析处理。
    和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和回执类的两个基本父类。

示例

下面就以创建Topic来举一个简单的KafkaAdminClient的使用案例,【代码清单1】:

private static final String NEW_TOPIC = "topic-test2";
private static final String brokerUrl = "localhost:9092";private static AdminClient adminClient;@BeforeClass
public static void beforeClass(){Properties properties = new Properties();properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);adminClient = AdminClient.create(properties);
}@AfterClass
public static void afterClass(){adminClient.close();
}@Test
public void createTopics() {NewTopic newTopic = new NewTopic(NEW_TOPIC,4, (short) 1);Collection<NewTopic> newTopicList = new ArrayList<>();newTopicList.add(newTopic);adminClient.createTopics(newTopicList);
}

示例中的createTopics()方法就创建了一个分区数为4,副本因子为1的“topic-test2”的Topic。

代码剖析

下面来详细介绍一下KafkaAdminClient中现有的listTopics()方法(这个方法的实现相对干净利落,代码量少、易于讲解)的实现方式,以便可以了解KafkaAdminClient中的大体脉络。listTopics()方法的具体代码如【代码清单2】所示:

public ListTopicsResult listTopics(final ListTopicsOptions options) {final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();final long now = time.milliseconds();runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),new LeastLoadedNodeProvider()) {@OverrideAbstractRequest.Builder createRequest(int timeoutMs) {return MetadataRequest.Builder.allTopics();}@Overridevoid handleResponse(AbstractResponse abstractResponse) {MetadataResponse response = (MetadataResponse) abstractResponse;Cluster cluster = response.cluster();Map<String, TopicListing> topicListing = new HashMap<>();for (String topicName : cluster.topics()) {boolean internal = cluster.internalTopics().contains(topicName);if (!internal || options.shouldListInternal())topicListing.put(topicName, new TopicListing(topicName, internal));}topicListingFuture.complete(topicListing);}@Overridevoid handleFailure(Throwable throwable) {topicListingFuture.completeExceptionally(throwable);}}, now);return new ListTopicsResult(topicListingFuture);
}

listTopics()方法接收一个ListTopicsOptions类型的参数,KafkaAdminClient中基本所有的应用类方法都有一个类似XXXOptions类型的参数,这个类型一般只包含timeoutMs这个成员变量,用来设定请求的超时时间,如果没有指定则使用默认的request.timeout.ms参数值,即30000ms。就拿查询Topic信息所对应的DescribeTopicsOptions来说,其就包含一个timeoutMs参数,具体如【代码清单3】所示:

public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {}
public abstract class AbstractOptions<T extends AbstractOptions> {private Integer timeoutMs = null;@SuppressWarnings("unchecked")public T timeoutMs(Integer timeoutMs) {this.timeoutMs = timeoutMs;return (T) this;}public Integer timeoutMs() {return timeoutMs;}
}

不过ListTopicsOptions扩展了一个成员变量listInternal,用来指明是否需要罗列内部Topic,比如在Kafka解析之topic创建(1)中提及的“__consumer_offsets”和“transaction_state”就是两个内部Topic。ListTopicsOptions的代码如【代码清单4】所示:

public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {private boolean listInternal = false;public ListTopicsOptions listInternal(boolean listInternal) {this.listInternal = listInternal;return this;}public boolean shouldListInternal() {return listInternal;}
}

listInternal的值默认为false,如果同时要罗列出目前的内部Topic的话就需要将这个listInternal设置为true,示例代码如【代码清单5】所示:

@Test
public void listTopicsIncludeInternal() throws ExecutionException, InterruptedException {ListTopicsOptions listTopicsOptions = new ListTopicsOptions();listTopicsOptions.listInternal(true);ListTopicsResult result = adminClient.listTopics(listTopicsOptions);Collection<TopicListing> list = result.listings().get();System.out.println(list);
}

接下去继续讲解listTopics()方法,其返回值为ListTopicResult类型。与ListTopicsOptions对应,KafkaAdminClient中基本所有的应用类方法都有一个类似XXXResult类型的返回值,其内部一般包含一个KafkaFuture,用于异步发送请求之后等待操作结果。KafkaFuture实现了Java中的Future接口,用来支持链式调用以及其他异步编程模型,可以看成是Java8中CompletableFuture的一个小型版本,其中也有类似thenApply、complete、completeExceptionally的方法。

再来看代码清单2中的 runnable.call(new Call(“listTopics”, calcDeadlineMs(now, options.timeoutMs()),new LeastLoadedNodeProvider()) 这行代码,runnable的类型是AdminClientRunnable,其是KafkaAdminClient负责处理与服务端交互请求的服务线程。AdminClientRunnable中的call方法用作入队一个Call请求,进而对其处理。Call请求代表与服务端的一次请求交互,比如listTopics和createTopics都是一次Call请求,AdminClientRunnable线程负责处理这些Call请求。

Call类是一个抽象类,构造方法接收三个参数:本次请求的名称callName、超时时间deadlineMs、以及节点提供器nodeProvider。nodeProvider是NodeProvider类型,用来提供本次请求所交互的Broker节点。Call类中还有3个抽象方法:createRequest()、handleResponse()、handleFailure(),分别用来创建请求、处理回执和处理失败。在代码清单2中,对于listTopics()方法而言,其内部原理就是发送MetadataRequest请求然后处理MetadataResponse,其处理逻辑峰封装在createRequest()、handleResponse()、handleFailure()这三个方法之中了。

综上,如果要自定义实现一个功能,只需要三个步骤:

  1. 自定义XXXOptions;
  2. 自定义XXXResult返回值;
  3. 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。

KafkaAdminClient目前而言尚未形成一个完全体,里面还可以扩展很多功能,就拿上一篇文章《如何获取Kafka的消费者详情——从Scala到Java的切换》中介绍的而言,目前KafkaAdminClient尚未实现describeConsumerGroup和listGroupOffsets的功能,所以需要进一步的升级改造。篇幅限制,这部分内容将在下一篇文章进行介绍,如果想要先睹为快,可以参考下代码实现,详细的逻辑解析敬请期待….

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-admin-client-1-principles-and-demos/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


集群管理工具KafkaAdminClient——原理与示例相关推荐

  1. 集群管理工具KafkaAdminClient——改造

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  2. Docker swarm集群管理工具

    1. 简介 Docker Swarm 是Docker的集群管理工具,简单方便.易于上手. Swarm集群由以下两个组件构成: Mananger:负责整个集群的管理工作包括集群配置.服务管理等所有跟集群 ...

  3. 大数据领域两大最主流集群管理工具Ambari和Cloudera Manger

    大数据集群管理方式分为手工方式(Apache hadoop)和工具方式(Ambari + hdp 和Cloudera Manger + CDH). 手工部署呢,需配置太多参数,但是,好理解其原理,建议 ...

  4. kafka集群管理工具kafka-manager

    一.kafka-manager简介 kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作.具体支持以下内容: 管理多个集群 ...

  5. docker集群管理工具_太多选择:如何选择正确的工具来管理Docker集群

    docker集群管理工具 There are all kinds of ways to play the Docker game and, obviously, no one of them is g ...

  6. redis-manger集群管理工具

    redis-manger集群管理工具 源起 安装 优点 源起 缺少一个redis集群管理工具 安装 链接: github地址 优点 安装简单,功能够用

  7. 第二篇supervisor集群管理工具cesi安装详解-如何安装supervisor-cesiwebUI

    第二篇supervisor集群管理工具cesi安装详解-如何安装supervisor-cesiwebUI 介绍 安装 解压 安装依赖 修改配置 注册为系统服务 启动 登录一下,发现报错了 解决方法 介 ...

  8. 第一篇supervisor集群管理工具cesi安装详解-如何安装supervisor

    第一篇supervisor集群管理工具cesi安装详解-如何安装supervisor 环境 准备 安装python3.7.4 问题 解决方法 安装supervisor 配置supervisor服务 启 ...

  9. Redis 集群搭建及集群管理工具

    目录 一.简介 二.架构图 三.搭建集群 3.1.下载 3.2.编译安装 3.3.配置文件修改 3.4.创建集群 四.集群管理工具redis-cli 4.1.查看集群信息 4.2.检查集群 4.3.修 ...

最新文章

  1. python会搞坏电脑吗_搞python,把原本php环境所需的libjpeg搞坏了
  2. IDEA中 30 秒生成 Spring Cloud Alibaba 工程
  3. laravel判断HTTP请求是否ajax
  4. 边工作边刷题:70天一遍leetcode: day 33-3
  5. 分析如下java代码片段,Java内部测试笔试题
  6. HttpClient在传参和返回结果的中文乱码问题
  7. 电脑看不到光驱盘符,应该如何解决
  8. (转)Delaunay三角剖分
  9. laravel redis_解析laravel之redis简单模块操作
  10. Richard Hamming - You and Your Research
  11. PowerMock进行mock测试
  12. Android CheckBoxPreference设置默认值会触发持久化以及其内部实现逻辑
  13. jmeter性能测试步骤实战教程
  14. 卫星导航之如何画出常见的图形世界地图、多路径、天空图等
  15. 使用Python3将word文档和pdf电子书进行格式互转(兼容Windows/Linux)
  16. Android登录客户端,验证码的获取,网页数据抓取与解析,HttpWatch基本使用
  17. Revi开发 - 碰撞检测
  18. 阿里云视觉AI训练营_Class5_实践课:人脸动漫化搭建
  19. ICLR'22上的47页“神仙论文” | 子图聚合图神经网络
  20. 不要再问Python了!

热门文章

  1. java的发展_java的发展
  2. mysql命令导出表结构文件夹_mysql,命令导入\导出表结构或数据
  3. 计算机删除等级列在哪里,插入与删除Excel表格的单元格、行和列
  4. 200908阶段一C++多态
  5. ubuntu双系统把win7设置为默认启动选项
  6. 不只是用于研究:使用Nvivo获取各种定性数据
  7. IBM科学家实现存储器重大突破
  8. IE下iframe跨域session和cookie失效问题的解决方案
  9. ArcGIS教程:Iso 聚类非监督分类
  10. 算法笔记_029:约瑟夫斯问题(Java)