Kafka解析之topic创建(3)——合法性验证
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/
前文摘要
在《Kafka解析之Topic创建(1)》这篇文章中,我们讲述了创建Topic的方式有两种:
- 如果kafka broker中的config/server.properties配置文件中配置了auto.create.topics.enable参数为true(默认值就是true),那么当生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(默认值为1)个副本的对应topic。
- 通过kafka提供的kafka-topics.sh脚本来创建,或者相关的变种方式(比如在zookeeper中的/config/topics/路径下直接创建)。
在学习了KafkaAdminClient之后我们发现它也可以用来创建Topic,即通过发送CreateTopicsRequest请求的方式来创建。KafkaAdminClient的详细内容可以参考:《集群管理工具KafkaAdminClient——原理与示例》和《集群管理工具KafkaAdminClient——改造》。
一般情况下,Kafka生产环境中的 auto.create.topics.enable参数会被修改为false,即自动创建Topic这条路会被堵住。kafka-topics.sh脚本创建的方式一般由运维人员操作,普通用户无权过问。那么KafkaAdminClient就为普通用户提供了一个口子,或者将其集成到公司内部的资源申请、审核系统中更加的方便。普通用户在创建Topic的时候,有可能由于误操作或者其他原因而创建了不符合运维规范的Topic,比如命名不规范,副本因子数太低等,这些都会影响后期的系统运维。如果创建Topic的操作是封装在资源申请、审核系统中的话,那么可以在前端就可以根据规则过滤掉不符合规范的申请操作。然而如果用户就是用了KafkaAdminClient或者类似的工具来创建了一个错误的Topic,我们有什么办法可以做相应的规范处理呢?
在Kafka服务端中提供了这样一个参数:create.topic.policy.class.name,其提供了一个入口用来验证Topic创建的合法性。使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy接口,比如下面的PolicyDemo,然后在kafka broker中的config/server.properties配置文件中配置参数create.topic.policy.class.name=org.apache.kafka.server.policy.PolicyDemo,然后启动Kafka服务即可。PolicyDemo的代码参考如下,主要实现接口中的configure、close以及validate方法,configure方法会在Kafka服务启动的时候执行,validate方法用来鉴定Topic参数的合法性,其在创建Topic的时候执行,close方法在关闭Kafka服务的时候执行。
public class PolicyDemo implements CreateTopicPolicy{public void configure(Map<String, ?> configs) {}public void close() throws Exception {}public void validate(RequestMetadata requestMetadata)throws PolicyViolationException {if(requestMetadata.numPartitions()!=null || requestMetadata.replicationFactor()!=null){if(requestMetadata.numPartitions()< 5){throw new PolicyViolationException("Topic should have at least 5 partitions, received: "+ requestMetadata.numPartitions());}if(requestMetadata.replicationFactor()<= 1){throw new PolicyViolationException("Topic should have at least 2 replication factor, recevied: "+ requestMetadata.replicationFactor());}}}}
采用文章《集群管理工具KafkaAdminClient——原理与示例》中的所提及的关于KafkaAdminClient来创建Topic,测试代码如下,创建一个分区数为4,副本数为1的Topic:
@Test
public void createTopics() {NewTopic newTopic = new NewTopic(NEW_TOPIC,4, (short) 1);Collection<NewTopic> newTopicList = new ArrayList<>();newTopicList.add(newTopic);CreateTopicsResult result = adminClient.createTopics(newTopicList);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}
}
测试结果如期报错:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4
相应的Kafka服务端的日志如下:
CreateTopicPolicy.RequestMetadata(topic=topic-test2, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={})
[2018-04-18 19:52:02,747] INFO [Admin Manager on Broker 0]: Error processing create topic request for topic topic-test2 with arguments (numPartitions=4, replicationFactor=1, replicasAssignments={}, configs={}) (kafka.server.AdminManager)
org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4
客户端向Kafka服务端发送了CreateTopicsRequest请求之后,会经过KafkaApis:
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
然后调用handleCreateTopicsRequest()方法,Topic最终在服务端的创建是在AdminManager中的createTopics方法中实现的。而CreateTopicPolicy的作用域也限定在这个createTopics方法之内,故只有通过CreateTopicsRequest请求的方式才能促使CreateTopicPolicy有效,而对于类似于kafka-topics.sh脚本的创建方式无效。不过在正文开头就提及了在运维规范的情况下,一般是通过KafkaAdminClient进行操作,或者更加规范的话直接通过申请页面来创建,这样就可以在前端规避风险,这样显得更加的专业。
欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
Kafka解析之topic创建(3)——合法性验证相关推荐
- Kafka解析之topic创建(2)
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- kafka常用命令 topic创建 删除 设置保存时间 消费情况 zookeeper 启动停止
启动kafka sh kafka-server-start.sh -daemon ../config/server.properties 停止kafka sh kafka-server-stop.sh ...
- 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,
目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...
- 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...
原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...
- 8.解析Kafka中的 Topic 和 Partition
目录 1.什么是Topic 2.什么是Partition 3.Consumer Group 消费者组 4.Topic 和 Partition 的存储 5.producer消息分发策略 6.消费者如何消 ...
- 解析Kafka中的 Topic 和 Partition
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据 1.什么是Topic ...
- tp6token进行合法性验证(中间件)
第一个composer:下载jwt插件 第二个composer:必须降级jwt才能使用 composer require lcobucci/jwt composer require lcobucci/ ...
- Kafka解析之失效副本
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- kafka监听topic消费_分布式专题|最近一直死磕kafka设计原理,都肝吐了
kafka架构图 kafka核心控制器 定义 在kafka集群中,会选举出一个broker作为控制器(controller),负责管理集群中所有的分区和副本的状态: 职责 监听broker变化,通过监 ...
最新文章
- 用户密码加密存储十问十答,一文说透密码安全存储
- 活见鬼,明明删除了数据,空间却没减少!
- 设计一个带有getmin功能的栈,保证时间复杂度在O(1)
- oracle存储查询出来的值_技术分享|浅析ORACLE数据库物理体系结构及其对应优化策略...
- sockaddr与 sockaddr_in
- JSP 登录案例实现
- jpannel设置位置xy_实用的摄影技巧!10种常见摄影场景的单反相机设置技巧!
- python列表元素下标是什么_python列表中元素插入位置总结
- 计算机制图的平行投影,工程制图与计算机辅助设计课件-20210406204806.ppt-原创力文档...
- linux运行并行计算cuda,并行化计算与CUDA编程
- 安卓端gis_通图采集手机版(GIS数据采集软件)V1.31 安卓版
- 与“十“俱进 阿里数据库运维10年演进之路 1
- vue 获取excel文件表头 .xls
- TaintDroid 剖析之 DVM 变量级污点跟踪(下篇)
- 51Nod 1048 1383 整数分解为2的幂
- STM32 USB应用——VCP
- Flex是什么? flex和flash是什么关系?
- 4篇cvpr2021 轨迹预测论文
- 取消Ubuntu开机硬盘自检
- vsCode安装与使用【图文并茂】
热门文章
- emacs python plugin_Windows上配置Emacs来开发Python及用Python扩展Emacs
- 计算机录入员考试题及答案,计算机录入员理论考题及答案.docx
- 020303阶段三 I/O复用 select和epoll的文件描述符管理
- USB设备被识别流程【转】
- Puppet exec资源介绍(二十六)
- 【网络知识点】防火墙主备冗余技术
- zabbix JMX监控Tomcat及错误解决方法
- 深入浅出Node.js (2) - 模块机制
- 用android LinearLayout和RelativeLayout实现精确布局(转)
- 大型WEB架构概况(笔记整理)一