Jusfr 原创,转载请注明来自博客园

TopicMetadataRequest/TopicMetadataResponse

前文简单说过“Kafka是自描述的”,是指其broker、topic、partition 信息可以通过 TopicMetadata API 获取。

TopicMetadataRequest 的内容非常简单,是一个包含 TopicName 的数组,TopicMetadataResponse 则告诉使用者 Broker、Topic、Partition 的分布情况。

使用空数组可以获取完整数据。

在 Chuye.Kafka 里,使用 Connection/Router 对应的发起一个请求:

    var section = new KafkaConfigurationSection("jusfr.redis", 9092);var demoTopics = new String[0];var connection = new Router(section);    connection.TopicMetadata(demoTopics).Dump("Metadata");

Connection.TopicMetadata() 使用 TopicName 数组作为参数构造了一个 TopicMetadataRequest 实例,将其序列化,发送 KafkaConfigurationSection 指向的主机和端口,读取响应再解析为 TopicMetadataResponse 对象,单机部署的 TopicMetadataResponse 可能有如下结构:

当 Kafka 服务的启动参数auto.create.topics.enable设置为true的时候,TopicMetadataRequest 传递的 TopicName 不存在时将被自动创建;

集群模式下 Topic 的自动创建复杂一些,Kafka 携带的 bin/kafka-topics.sh 提供了再多参数。


Zookeeper

  • 如何使用程序查询、删除 Topic? 如何彻底删除 Topic ?
  • 如何在集群模式下管理 Topic

源码阅读得知,Kafka 对 TopicMetadataRequest 的响应是通过引用 Zookeeper 来完成的。Zookeeper 在 .Net 上的实现有 ZooKeeperNet, NuGet 上是3.4.6.2 版本。

Zookeeper 编程又是一大块内容,这里只是略加提及。

ZooKeeper 的两个方法最重要:GetChildren()GetData(),前者提供了路径查询,后者提供了节点数据获取,可以使用以下代码递归访问:

void Main() {ZooKeeper zk = new ZooKeeper("jusfr.mac", TimeSpan.FromSeconds(10), null);var paths = zk.GetChildren("/", false).ToArray();foreach (var path in paths) {GetChildren(zk, "/" + path);}
}void GetChildren(ZooKeeper zk, String path) {var data = zk.GetData(path, null, null); var paths = zk.GetChildren(path, false).ToArray();if (paths.Length > 0) {foreach (var p in paths) {GetChildren(zk, path + "/" + p);}}
}

在集群环境下部分响应示例

// /brokers/topics/demoTopic1
{"version":1,"partitions":{"0":[2]}}// /brokers/topics/demoTopic1/partitions/0/state
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} // /brokers/ids/1
{"jmx_port":-1,"timestamp":"1457431238732","endpoints":["PLAINTEXT://jusfr.kafka-1:9093"],"host":"jusfr.kafka-1","version":2,"port":9093}

路径 /brokers/topics 存储了topic 信息,/admin/delete_topics 存储了被删除的 topic,这只是一个标记,由于 Kafka 是基于文件系统的,你需要等待 Kafka 在某个时机真正移除它们。部分参考

由于 Kafka 通过 Zookeeper 返回元数据,故任何 Broker 节点都能应答 TopicMetadataRequest 并提供完整响应;

可以看到 demoTopic3 的 PartitionId=0 分区所在 Leader=1,即 Broker NodeId=1 的节点 jusfr.kafka-1:9093 ,PartitionId=1 分区所在 Leader=2,即 Broker NodeId=2 的节点 jusfr.kafka-2:9094。读写 demoTopic3 的分区0 需要连接到主机 jusfr.kafka-1、端口9093,读写 demoTopic3 的分区1 需要连接到主机 jusfr.kafka-2、端口9094,此过程我称为 Broker route。错误的 Broker 访问、不正确的 server.properties 配置可能触发状态码为 UnknownTopicOrPartition 的响应。

Chuye.Kafka 的 Router 对象从 IRouter 定义,继续自Connection,重写了 Route 方法,内部便是 Partition-Broker 检查逻辑。集群模式下涉及到 Zookeeper 编程,Chuye.Kafka 可能未能给予支持。

Jusfr 原创,转载请注明来自博客园

转载于:https://www.cnblogs.com/Jusfr/p/5257258.html

Kafka API: TopicMetadata相关推荐

  1. 4、Kafka API实战

    环境准备 启动zookeeper集群和kafka集群,在kafka集群中打开一个消费者 [hadoop@hadoop-100 kafka]$ zkservers start [hadoop@hadoo ...

  2. 一文详解Kafka API

    摘要:Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...

  3. Apache Kafka API AdminClient Scram账户的创建与删除

    前言 由于Apache官方一直没有提供AdminClient中对账户这一块的操作,因此这部分大多数时候都是用命令行去操作的,但是命令行毕竟不是很方便.为了解决这部分问题,笔者去读了Kafka Scal ...

  4. Apache Kafka API AdminClient Scram账户的操作(增删改查)

    前言 很久没有更新Kafka API相关的文档了,因为笔者工作变动Kafka这部分内容在工作中接触的就相对于之前少了一些.但架不住kafka官方还是一如既往的勤奋,官方操作Scram账户的创建与删除这 ...

  5. storm和kafka集成报java.lang.ClassNotFoundException: kafka.api.OffsetRequest解决方法

    添加依赖 <dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka& ...

  6. Kafka API的运用(Producer API)

    文章目录 四.Kafka API 1.Producer API 1.1 消息发送流程 1.2 异步发送 API 1.3 分区器 1.4 同步发送 API 四.Kafka API 1.Producer ...

  7. Kafka API的运用(Consumer API)

    文章目录 四.Kafka API 2. Consumer API 2.1 自动提交offset 2.2 重置Offset 2.3手动提交 offset 四.Kafka API 2. Consumer ...

  8. 如何使用Kafka API入门Spark流和MapR流

    这篇文章将帮助您开始使用Apache Spark Streaming通过MapR Streams和Kafka API消费和发布消息. Spark Streaming是核心Spark API的扩展,可实 ...

  9. spark和kafka_如何使用Kafka API入门Spark流和MapR流

    spark和kafka 这篇文章将帮助您开始使用Apache Spark Streaming通过MapR Streams和Kafka API消费和发布消息. Spark Streaming是核心Spa ...

最新文章

  1. [ubuntu] 摆脱一直敲打‘Y'('yes')的困境
  2. Xamarin Android项目真机测试闪退
  3. 0320互联网新闻 | 网易《明日之后》全球营收突破1.25亿美元;阿里AI labs宣布投入1亿元进行方言保护...
  4. Spring 事务处理参数
  5. 【Docker】Docker 删除所有容器和镜像
  6. 提取过程_大米多肽提取过程中如何应用膜分离技术呢?
  7. .NET Core开发日志——OData
  8. 实例1.2:获得应用程序主窗口指针
  9. 【英语学习】【Level 07】U01 Making friends L3 Do you eat here a lot?
  10. error:Assertion failed ((unsigned)i0 (unsigned)size.p[0]) in cv::Mat::at
  11. ORA-01034: ORACLE not available
  12. 1-4-05:整数大小比较
  13. python模块规定的格式,按照这样写,最规范
  14. 汇编语言基础之二 - 各种寻址和过程进出简介
  15. 英特尔傲腾 DC P4800X 固态盘
  16. (三)Lucene中Index.ANALYZED分词相关
  17. mac安装虚拟机配置win10系统
  18. 微信公众号H5开发,实现网页授权(静默登录)
  19. 腾讯云点播视频存储(Web端视频上传)
  20. Golang 正则表达式判断手机号或身份证

热门文章

  1. 简单六步上手spring aop,通过各种类型通知,面向切面编程,实现代码解耦(超详细)
  2. JAVA读取Properties文件对象常用方法总结
  3. 来瓶82年拉菲压压惊
  4. GDAL读取S-57海图数据中文属性值乱码问题解决(续)
  5. 使用C#调用非托管DLL函数
  6. 算法详解_常用算法详解——打印杨辉三角形
  7. 双android手机同步工具,android手机同步数据PC(SyncDroid)
  8. python itemgetter_Python operator.itemgetter
  9. 【java学习之路】(java SE篇)012.网络编程
  10. 小区重选优先级_NR小区重选理论研究