Kafka API: TopicMetadata
Jusfr 原创,转载请注明来自博客园
TopicMetadataRequest/TopicMetadataResponse
前文简单说过“Kafka是自描述的”,是指其broker、topic、partition 信息可以通过 TopicMetadata API 获取。
TopicMetadataRequest 的内容非常简单,是一个包含 TopicName 的数组,TopicMetadataResponse 则告诉使用者 Broker、Topic、Partition 的分布情况。
使用空数组可以获取完整数据。
在 Chuye.Kafka 里,使用 Connection/Router 对应的发起一个请求:
var section = new KafkaConfigurationSection("jusfr.redis", 9092);var demoTopics = new String[0];var connection = new Router(section); connection.TopicMetadata(demoTopics).Dump("Metadata");
Connection.TopicMetadata() 使用 TopicName 数组作为参数构造了一个 TopicMetadataRequest 实例,将其序列化,发送 KafkaConfigurationSection 指向的主机和端口,读取响应再解析为 TopicMetadataResponse 对象,单机部署的 TopicMetadataResponse 可能有如下结构:
当 Kafka 服务的启动参数auto.create.topics.enable
设置为true的时候,TopicMetadataRequest 传递的 TopicName 不存在时将被自动创建;
集群模式下 Topic 的自动创建复杂一些,Kafka 携带的 bin/kafka-topics.sh 提供了再多参数。
Zookeeper
- 如何使用程序查询、删除 Topic? 如何彻底删除 Topic ?
- 如何在集群模式下管理 Topic
源码阅读得知,Kafka 对 TopicMetadataRequest 的响应是通过引用 Zookeeper 来完成的。Zookeeper 在 .Net 上的实现有 ZooKeeperNet, NuGet 上是3.4.6.2 版本。
Zookeeper 编程又是一大块内容,这里只是略加提及。
ZooKeeper 的两个方法最重要:GetChildren()
和 GetData()
,前者提供了路径查询,后者提供了节点数据获取,可以使用以下代码递归访问:
void Main() {ZooKeeper zk = new ZooKeeper("jusfr.mac", TimeSpan.FromSeconds(10), null);var paths = zk.GetChildren("/", false).ToArray();foreach (var path in paths) {GetChildren(zk, "/" + path);}
}void GetChildren(ZooKeeper zk, String path) {var data = zk.GetData(path, null, null); var paths = zk.GetChildren(path, false).ToArray();if (paths.Length > 0) {foreach (var p in paths) {GetChildren(zk, path + "/" + p);}}
}
在集群环境下部分响应示例
// /brokers/topics/demoTopic1
{"version":1,"partitions":{"0":[2]}}// /brokers/topics/demoTopic1/partitions/0/state
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} // /brokers/ids/1
{"jmx_port":-1,"timestamp":"1457431238732","endpoints":["PLAINTEXT://jusfr.kafka-1:9093"],"host":"jusfr.kafka-1","version":2,"port":9093}
路径 /brokers/topics 存储了topic 信息,/admin/delete_topics 存储了被删除的 topic,这只是一个标记,由于 Kafka 是基于文件系统的,你需要等待 Kafka 在某个时机真正移除它们。部分参考
由于 Kafka 通过 Zookeeper 返回元数据,故任何 Broker 节点都能应答 TopicMetadataRequest 并提供完整响应;
可以看到 demoTopic3 的 PartitionId=0 分区所在 Leader=1,即 Broker NodeId=1 的节点 jusfr.kafka-1:9093 ,PartitionId=1 分区所在 Leader=2,即 Broker NodeId=2 的节点 jusfr.kafka-2:9094。读写 demoTopic3 的分区0 需要连接到主机 jusfr.kafka-1、端口9093,读写 demoTopic3 的分区1 需要连接到主机 jusfr.kafka-2、端口9094,此过程我称为 Broker route。错误的 Broker 访问、不正确的 server.properties 配置可能触发状态码为 UnknownTopicOrPartition 的响应。
Chuye.Kafka 的 Router 对象从 IRouter 定义,继续自Connection,重写了 Route 方法,内部便是 Partition-Broker 检查逻辑。集群模式下涉及到 Zookeeper 编程,Chuye.Kafka 可能未能给予支持。
Jusfr 原创,转载请注明来自博客园
转载于:https://www.cnblogs.com/Jusfr/p/5257258.html
Kafka API: TopicMetadata相关推荐
- 4、Kafka API实战
环境准备 启动zookeeper集群和kafka集群,在kafka集群中打开一个消费者 [hadoop@hadoop-100 kafka]$ zkservers start [hadoop@hadoo ...
- 一文详解Kafka API
摘要:Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...
- Apache Kafka API AdminClient Scram账户的创建与删除
前言 由于Apache官方一直没有提供AdminClient中对账户这一块的操作,因此这部分大多数时候都是用命令行去操作的,但是命令行毕竟不是很方便.为了解决这部分问题,笔者去读了Kafka Scal ...
- Apache Kafka API AdminClient Scram账户的操作(增删改查)
前言 很久没有更新Kafka API相关的文档了,因为笔者工作变动Kafka这部分内容在工作中接触的就相对于之前少了一些.但架不住kafka官方还是一如既往的勤奋,官方操作Scram账户的创建与删除这 ...
- storm和kafka集成报java.lang.ClassNotFoundException: kafka.api.OffsetRequest解决方法
添加依赖 <dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka& ...
- Kafka API的运用(Producer API)
文章目录 四.Kafka API 1.Producer API 1.1 消息发送流程 1.2 异步发送 API 1.3 分区器 1.4 同步发送 API 四.Kafka API 1.Producer ...
- Kafka API的运用(Consumer API)
文章目录 四.Kafka API 2. Consumer API 2.1 自动提交offset 2.2 重置Offset 2.3手动提交 offset 四.Kafka API 2. Consumer ...
- 如何使用Kafka API入门Spark流和MapR流
这篇文章将帮助您开始使用Apache Spark Streaming通过MapR Streams和Kafka API消费和发布消息. Spark Streaming是核心Spark API的扩展,可实 ...
- spark和kafka_如何使用Kafka API入门Spark流和MapR流
spark和kafka 这篇文章将帮助您开始使用Apache Spark Streaming通过MapR Streams和Kafka API消费和发布消息. Spark Streaming是核心Spa ...
最新文章
- [ubuntu] 摆脱一直敲打‘Y'('yes')的困境
- Xamarin Android项目真机测试闪退
- 0320互联网新闻 | 网易《明日之后》全球营收突破1.25亿美元;阿里AI labs宣布投入1亿元进行方言保护...
- Spring 事务处理参数
- 【Docker】Docker 删除所有容器和镜像
- 提取过程_大米多肽提取过程中如何应用膜分离技术呢?
- .NET Core开发日志——OData
- 实例1.2:获得应用程序主窗口指针
- 【英语学习】【Level 07】U01 Making friends L3 Do you eat here a lot?
- error:Assertion failed ((unsigned)i0 (unsigned)size.p[0]) in cv::Mat::at
- ORA-01034: ORACLE not available
- 1-4-05:整数大小比较
- python模块规定的格式,按照这样写,最规范
- 汇编语言基础之二 - 各种寻址和过程进出简介
- 英特尔傲腾 DC P4800X 固态盘
- (三)Lucene中Index.ANALYZED分词相关
- mac安装虚拟机配置win10系统
- 微信公众号H5开发,实现网页授权(静默登录)
- 腾讯云点播视频存储(Web端视频上传)
- Golang 正则表达式判断手机号或身份证
热门文章
- 简单六步上手spring aop,通过各种类型通知,面向切面编程,实现代码解耦(超详细)
- JAVA读取Properties文件对象常用方法总结
- 来瓶82年拉菲压压惊
- GDAL读取S-57海图数据中文属性值乱码问题解决(续)
- 使用C#调用非托管DLL函数
- 算法详解_常用算法详解——打印杨辉三角形
- 双android手机同步工具,android手机同步数据PC(SyncDroid)
- python itemgetter_Python operator.itemgetter
- 【java学习之路】(java SE篇)012.网络编程
- 小区重选优先级_NR小区重选理论研究