欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/


前文摘要

在《Kafka解析之Topic创建(1)》这篇文章中,我们讲述了创建Topic的方式有两种:

  1. 如果kafka broker中的config/server.properties配置文件中配置了auto.create.topics.enable参数为true(默认值就是true),那么当生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(默认值为1)个副本的对应topic。
  2. 通过kafka提供的kafka-topics.sh脚本来创建,或者相关的变种方式(比如在zookeeper中的/config/topics/路径下直接创建)。

在学习了KafkaAdminClient之后我们发现它也可以用来创建Topic,即通过发送CreateTopicsRequest请求的方式来创建。KafkaAdminClient的详细内容可以参考:《集群管理工具KafkaAdminClient——原理与示例》和《集群管理工具KafkaAdminClient——改造》。


一般情况下,Kafka生产环境中的 auto.create.topics.enable参数会被修改为false,即自动创建Topic这条路会被堵住。kafka-topics.sh脚本创建的方式一般由运维人员操作,普通用户无权过问。那么KafkaAdminClient就为普通用户提供了一个口子,或者将其集成到公司内部的资源申请、审核系统中更加的方便。普通用户在创建Topic的时候,有可能由于误操作或者其他原因而创建了不符合运维规范的Topic,比如命名不规范,副本因子数太低等,这些都会影响后期的系统运维。如果创建Topic的操作是封装在资源申请、审核系统中的话,那么可以在前端就可以根据规则过滤掉不符合规范的申请操作。然而如果用户就是用了KafkaAdminClient或者类似的工具来创建了一个错误的Topic,我们有什么办法可以做相应的规范处理呢?

在Kafka服务端中提供了这样一个参数:create.topic.policy.class.name,其提供了一个入口用来验证Topic创建的合法性。使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy接口,比如下面的PolicyDemo,然后在kafka broker中的config/server.properties配置文件中配置参数create.topic.policy.class.name=org.apache.kafka.server.policy.PolicyDemo,然后启动Kafka服务即可。PolicyDemo的代码参考如下,主要实现接口中的configure、close以及validate方法,configure方法会在Kafka服务启动的时候执行,validate方法用来鉴定Topic参数的合法性,其在创建Topic的时候执行,close方法在关闭Kafka服务的时候执行。

public class PolicyDemo implements CreateTopicPolicy{public void configure(Map<String, ?> configs) {}public void close() throws Exception {}public void validate(RequestMetadata requestMetadata)throws PolicyViolationException {if(requestMetadata.numPartitions()!=null || requestMetadata.replicationFactor()!=null){if(requestMetadata.numPartitions()< 5){throw new PolicyViolationException("Topic should have at least 5 partitions, received: "+ requestMetadata.numPartitions());}if(requestMetadata.replicationFactor()<= 1){throw new PolicyViolationException("Topic should have at least 2 replication factor, recevied: "+ requestMetadata.replicationFactor());}}}}

采用文章《集群管理工具KafkaAdminClient——原理与示例》中的所提及的关于KafkaAdminClient来创建Topic,测试代码如下,创建一个分区数为4,副本数为1的Topic:

@Test
public void createTopics() {NewTopic newTopic = new NewTopic(NEW_TOPIC,4, (short) 1);Collection<NewTopic> newTopicList = new ArrayList<>();newTopicList.add(newTopic);CreateTopicsResult result = adminClient.createTopics(newTopicList);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}
}

测试结果如期报错:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4

相应的Kafka服务端的日志如下:

CreateTopicPolicy.RequestMetadata(topic=topic-test2, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={})
[2018-04-18 19:52:02,747] INFO [Admin Manager on Broker 0]: Error processing create topic request for topic topic-test2 with arguments (numPartitions=4, replicationFactor=1, replicasAssignments={}, configs={}) (kafka.server.AdminManager)
org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4

客户端向Kafka服务端发送了CreateTopicsRequest请求之后,会经过KafkaApis:

case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)

然后调用handleCreateTopicsRequest()方法,Topic最终在服务端的创建是在AdminManager中的createTopics方法中实现的。而CreateTopicPolicy的作用域也限定在这个createTopics方法之内,故只有通过CreateTopicsRequest请求的方式才能促使CreateTopicPolicy有效,而对于类似于kafka-topics.sh脚本的创建方式无效。不过在正文开头就提及了在运维规范的情况下,一般是通过KafkaAdminClient进行操作,或者更加规范的话直接通过申请页面来创建,这样就可以在前端规避风险,这样显得更加的专业。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


Kafka解析之topic创建(3)——合法性验证相关推荐

  1. Kafka解析之topic创建(2)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  2. kafka常用命令 topic创建 删除 设置保存时间 消费情况 zookeeper 启动停止

    启动kafka sh kafka-server-start.sh -daemon ../config/server.properties 停止kafka sh kafka-server-stop.sh ...

  3. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  4. 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...

    原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...

  5. 8.解析Kafka中的 Topic 和 Partition

    目录 1.什么是Topic 2.什么是Partition 3.Consumer Group 消费者组 4.Topic 和 Partition 的存储 5.producer消息分发策略 6.消费者如何消 ...

  6. 解析Kafka中的 Topic 和 Partition

    topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据 1.什么是Topic ...

  7. tp6token进行合法性验证(中间件)

    第一个composer:下载jwt插件 第二个composer:必须降级jwt才能使用 composer require lcobucci/jwt composer require lcobucci/ ...

  8. Kafka解析之失效副本

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  9. kafka监听topic消费_分布式专题|最近一直死磕kafka设计原理,都肝吐了

    kafka架构图 kafka核心控制器 定义 在kafka集群中,会选举出一个broker作为控制器(controller),负责管理集群中所有的分区和副本的状态: 职责 监听broker变化,通过监 ...

最新文章

  1. 用户密码加密存储十问十答,一文说透密码安全存储
  2. 活见鬼,明明删除了数据,空间却没减少!
  3. 设计一个带有getmin功能的栈,保证时间复杂度在O(1)
  4. oracle存储查询出来的值_技术分享|浅析ORACLE数据库物理体系结构及其对应优化策略...
  5. sockaddr与 sockaddr_in
  6. JSP 登录案例实现
  7. jpannel设置位置xy_实用的摄影技巧!10种常见摄影场景的单反相机设置技巧!
  8. python列表元素下标是什么_python列表中元素插入位置总结
  9. 计算机制图的平行投影,工程制图与计算机辅助设计课件-20210406204806.ppt-原创力文档...
  10. linux运行并行计算cuda,并行化计算与CUDA编程
  11. 安卓端gis_通图采集手机版(GIS数据采集软件)V1.31 安卓版
  12. 与“十“俱进 阿里数据库运维10年演进之路 1
  13. vue 获取excel文件表头 .xls
  14. TaintDroid 剖析之 DVM 变量级污点跟踪(下篇)
  15. 51Nod 1048 1383 整数分解为2的幂
  16. STM32 USB应用——VCP
  17. Flex是什么? flex和flash是什么关系?
  18. 4篇cvpr2021 轨迹预测论文
  19. 取消Ubuntu开机硬盘自检
  20. vsCode安装与使用【图文并茂】

热门文章

  1. emacs python plugin_Windows上配置Emacs来开发Python及用Python扩展Emacs
  2. 计算机录入员考试题及答案,计算机录入员理论考题及答案.docx
  3. 020303阶段三 I/O复用 select和epoll的文件描述符管理
  4. USB设备被识别流程【转】
  5. Puppet exec资源介绍(二十六)
  6. 【网络知识点】防火墙主备冗余技术
  7. zabbix JMX监控Tomcat及错误解决方法
  8. 深入浅出Node.js (2) - 模块机制
  9. 用android LinearLayout和RelativeLayout实现精确布局(转)
  10. 大型WEB架构概况(笔记整理)一