Apache Kafka API AdminClient Scram账户的创建与删除
前言
由于Apache官方一直没有提供AdminClient中对账户这一块的操作,因此这部分大多数时候都是用命令行去操作的,但是命令行毕竟不是很方便。为了解决这部分问题,笔者去读了Kafka Scala的源码,从中梳理出来这部分内容供给大家参考。重要:如果你的版本升级到2.7.0及其以上,请参考【Apache Kafka API AdminClient Scram账户的操作(增删改查)】。更多内容请点击【Apache Kafka API AdminClient 目录】。
Scala版本
为了操作Scala源码,必须有相应版本的包,怎么看你的Scala版本呢?这个就是在Kafka核心包的<artifactId>
键值对里面,如下kafka_2.13后面对应的2.13就是Scala的版本,这个2.13版本同样也是Kafka官方推荐使用的版本,因此我们也就以这个版本为例子去操作账户的创建与删除。要提醒的是如果你使用的版本是Scala 2.12大概率会报错。
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId> <!--此处对应的就是Scala版本--><version>2.7.0</version>
</dependency>
获取JAAS认证文件
在开始之前首先得知道什么是Scram账户认证,如果不清楚建议参考【Kafka 如何给集群配置Scram账户认证】,这篇帖子里对Scram认证以及配置有很详细的介绍,这里就不多说废话了,我们已经有了一个kafka-broker-jaas.conf
文件用来登陆Zookeeper。
Client{org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="kafka"password="kafka1234";
};
为什么要登陆Zookeeper呢?因为Kafka把认证机制做到了Zookeeper里,而要操作这些需要一个Zookeeper的登陆认证。在这里Zookeeper相当于一个分布式注册中心,随者Kafka的不断升级,Kafka官方也在不断地减少对Zookeeper的依赖。截止到2.7.0版本,当使用命令行创建账户的时候就会收到提示说未来版本可能会下线--zookeeper
参数,转而使用参数--broker.server
,但是目前还是兼容的。而且Kafka API的2.7.0版本,似乎也引入了操作Scram的API,但是之前的版本还是需要操作Zookeeper,因此我们还是需要这样进行账号的操作。希望2.7.0版本的Scram API会比较好用吧,等笔者弄明白了再分享出来。言归正传,我们可以通过下面几行代码先行把登陆Zookeeper的认证文件kafka-broker-jaas.conf
加载到系统中来。
static {//获取文件路径,这里笔者使用的是项目路径,也可以用绝对路径,目的是访问到文件,什么方法自由选择String path = KafkaCreateUser.class.getClass().getResource("/").getPath();//拿到文件对象File f = new File(path+"kafka-broker-jaas.conf");//存储到系统参数对象中,以备后续使用System.setProperty("java.security.auth.login.config", f.getAbsolutePath());
}
使用Scala方法创建账户
获取了文件对象以后,就可以调用Scala中的方法了,我们主要调用的方法是AdminZkClient.changeConfigs(entityType: String, entityName: String, configs: Properties)
方法,这个语法是Scala中的语法,有点类似Java,我们直接用就可以了,Sample如下。
public void createAccount() throws NoSuchAlgorithmException {//获取ZookeeperClient对象,这里的/kafka是笔者建了一个zk上的目录,如果直接用192.168.33.101:2181ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//构造PropertiesProperties properties=new Properties();//构造Scram认证机制ScramMechanism为SCRAM_SHA_512ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");//构造Scram证书credential,这里"password_1234"就是真实的密码ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());//转化为认证串String credentialString=ScramCredentialUtils.credentialToString(credential);//添加到properties中备用properties.put(scramMechanism.mechanismName(),credentialString);//创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);
}
使用Scala方法查询存在的账户
和创建一样也需要使用Scala中的方法,这次使用的是依然是AdminZkClient
类中的方法,一个是fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String)
去查找指定的账户信息,其次是用fetchAllEntityConfigs(entityType: String)
查找Kafka服务器中所有的账户信息,Sample如下。
public void findAccount() {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//为了查看结果,构造一个Properties对象用来承接返回值Properties properties=new Properties();//指定账号查询properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");//构建一个接收参数Map<String, Properties> propertiesAll=new HashMap<>();//查询所有信息propertiesAll =adminZkClient.fetchAllEntityConfigs(ConfigType.User());
}
使用Scala方法删除账户
说完创建和查找,那就剩删除了。删除对象其实分为两步,第一步清空Kafka集群上保存的信息,第二部删除Zookeeper上对应的节点。清空信息用的还是changeConfigs()
方法,删除节点用的则是Zookeeper包里的delete(final String path, int version)
方法,Sample如下。
public void deleteAccount() throws InterruptedException, KeeperException {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); //获取Zookeeper对象ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();//调用delete方法把对应的节点删除,版本设置为-1是不检测版本号//注意"/config/users/"这个路径是固定的,Zk里面就是这样存的,"kaf_aaa"是自己拼的账号名字zooKeeper.delete("/config/users/" +"kaf_aaa", -1);
}
总结
到此Kafka Scram账户相关的操作告一段落,如果想要删除账户同时清除账号下的权限,可以参考【Apache Kafka API AdminClient 账号对Topic权限赋予与移除】,自己做一个循环删除即可。
附:完整的Sample和注释
public class KafkaUserOperation {//加载zookeeper sasl机制授权登陆的配置文件static {String path = KafkaUserOperation.class.getClass().getResource("/").getPath();File f = new File(path+"zk-client-jaas.conf");System.setProperty("java.security.auth.login.config", f.getAbsolutePath());}public void createAccount() throws NoSuchAlgorithmException {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//构造PropertiesProperties properties=new Properties();//构造Scram认证机制ScramMechanism为SCRAM_SHA_512ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");//构造Scram证书credential,这里"password_1234"就是真实的密码ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());//转化为认证串String credentialString=ScramCredentialUtils.credentialToString(credential);//添加到properties中备用properties.put(scramMechanism.mechanismName(),credentialString);//创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);}public void findAccount() {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//为了查看是否成功,构造一个Properties对象用来承接返回值Properties properties=new Properties();//指定账号查询properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");Map<String, Properties> proAll=new HashMap<>();//查询所有信息proAll=adminZkClient.fetchAllEntityConfigs(ConfigType.User());}public void deleteAccount() throws InterruptedException, KeeperException {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); //获取Zookeeper对象ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();//调用delete方法把对应的节点删除,版本设置为-1是不检测版本号zooKeeper.delete("/config/users/"+"kaf_aaa", -1);}
}
Apache Kafka API AdminClient Scram账户的创建与删除相关推荐
- Apache Kafka API AdminClient Scram账户的操作(增删改查)
前言 很久没有更新Kafka API相关的文档了,因为笔者工作变动Kafka这部分内容在工作中接触的就相对于之前少了一些.但架不住kafka官方还是一如既往的勤奋,官方操作Scram账户的创建与删除这 ...
- Kafka 如何给集群配置Scram账户认证
前言 很早之前的一篇博客[Kafka+ Centos7服务器集群详细安装教程] 详细的说了下一个真正的集群应该如何搭建Kafka环境,由于当时的需求只是能够使用Kafka服务,因此并没有做别的什么配置 ...
- Apache Kafka:使用java方式操作消费组和重置分区偏移量(admin api)
当前版本:kafka_2.12-2.8.0 1. 声明 当前内容主要为本人学习和测试使用java方式操作消费组和重置分区偏移量,主要参考:Apache Kafka官方文档 主要为: 使用java方式实 ...
- Spring Boot 和Apache Kafka的集成
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 1. 引言 Apache Kafka 是一个分布式的.容错 ...
- Apache Kafka 2.7.0 稳定版发布
参考地址:https://www.orchome.com/9993 Apache Kafka 2.7.0 于2020年12月21日正式发布,这个版本是目前 Kafka 最新稳定版本,大家可以根据需要自 ...
- 【转载】Understanding When to use RabbitMQ or Apache Kafka
https://content.pivotal.io/rabbitmq/understanding-when-to-use-rabbitmq-or-apache-kafka RabbitMQ: Erl ...
- Apache Kafka的流式SQL引擎——KSQL
1. KSQL 介绍 KSQL 引擎--一个基于流的 SQL.推出 KSQL 是为了降低流式处理的门槛,为处理 Kafka 数据提供简单而完整的可交互式 SQL 接口.KSQL 目前可以支持多种流式操 ...
- Apache Kafka 3.0 版本发布
Apache Kafka 3.0 发布, 发布日志:Kafka 我很高兴代表 Apache Kafka® 社区宣布 Apache Kafka 3.0 的发布.Apache Kafka 3.0 是一个涉 ...
- Apache Kafka 3.0.0 稳定版发布,有哪些值得关心的变化?
Apache Kafka 3.0 于2021年9月21日正式发布.本文将介绍这个版本的新功能.以下文章翻译自 <What's New in Apache Kafka 3.0.0>. 我很高 ...
最新文章
- java6 disable ssl2.0_SpringBoot2.0如何启用https协议
- 【LuoguP3241】[HNOI2015] 开店
- 其实Go 1.17 就支持泛型了,具体该怎么用呢?
- 史上最大最贵 iPhone 发布,支持双卡双待,附发布会完整视频!
- 大公司笔试面试有哪些经典算法题目?
- 计算机控制技术第二版答案于微波,微波技术基础课后参考答案 (田加胜版)
- PLSQL和ORACLE客户端安装
- Pyside2中嵌入Matplotlib的绘图并保存(指定Graphics View)
- 将火狐浏览器视频播放倍速设置为3倍速及其以上
- linux命令清理磁盘空间,Unix/Linux中常用的清理磁盘空间的命令
- 三相同步电机怎么接线图_实用!41例电动机接线方法-清晰大图(上)
- 伊甸园日历游戏 c语言,HDU2149-Good Luck in CET-4 Everybody!(博弈,打表找规律)
- 集群出现块丢失,块找回,以及相关底层原理,fsck等
- 微信公众号上传文件附件教程
- 后端框架之Flask--初识
- 图片转素描(初级板)
- Nacos系列【25】源码分析篇之Spring Cloud启动器
- python调用Agora_Recording_SDK_for_Linux_FULL
- 半闲居士视觉SLAM十四讲笔记(2)初识 SLAM- part 2 linux CMake、Kdevelop
- 更新Mathtype版本后,在WORD中无法使用的解决方法
热门文章
- 操作系统1(OS,operating system)
- 解决闭包问题时 setTimeout执行顺序提前的问题
- solr mysql 增量索引_solr中实现MySQL数据全量索引和增量索引
- 工具类APP的生存之道
- DevTools 无法加载源映射:无法加载chrome-extension://ncennffkjdiamlpmcbajkmaiiiddgioo/js/xl-content.js.map 的内容
- 多重循环(图形打印2)-C语言
- Python 不完全LU分解
- Parsing error: No Babel config file detected for
- 元旦给计算机老师发贺词,元旦对老师的祝福语
- 数据中心对洪水风险应具备应急措施