前言

由于Apache官方一直没有提供AdminClient中对账户这一块的操作,因此这部分大多数时候都是用命令行去操作的,但是命令行毕竟不是很方便。为了解决这部分问题,笔者去读了Kafka Scala的源码,从中梳理出来这部分内容供给大家参考。重要:如果你的版本升级到2.7.0及其以上,请参考【Apache Kafka API AdminClient Scram账户的操作(增删改查)】。更多内容请点击【Apache Kafka API AdminClient 目录】。

Scala版本

为了操作Scala源码,必须有相应版本的包,怎么看你的Scala版本呢?这个就是在Kafka核心包的<artifactId>键值对里面,如下kafka_2.13后面对应的2.13就是Scala的版本,这个2.13版本同样也是Kafka官方推荐使用的版本,因此我们也就以这个版本为例子去操作账户的创建与删除。要提醒的是如果你使用的版本是Scala 2.12大概率会报错。

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId> <!--此处对应的就是Scala版本--><version>2.7.0</version>
</dependency>

获取JAAS认证文件

在开始之前首先得知道什么是Scram账户认证,如果不清楚建议参考【Kafka 如何给集群配置Scram账户认证】,这篇帖子里对Scram认证以及配置有很详细的介绍,这里就不多说废话了,我们已经有了一个kafka-broker-jaas.conf文件用来登陆Zookeeper。

Client{org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="kafka"password="kafka1234";
};

为什么要登陆Zookeeper呢?因为Kafka把认证机制做到了Zookeeper里,而要操作这些需要一个Zookeeper的登陆认证。在这里Zookeeper相当于一个分布式注册中心,随者Kafka的不断升级,Kafka官方也在不断地减少对Zookeeper的依赖。截止到2.7.0版本,当使用命令行创建账户的时候就会收到提示说未来版本可能会下线--zookeeper参数,转而使用参数--broker.server,但是目前还是兼容的。而且Kafka API的2.7.0版本,似乎也引入了操作Scram的API,但是之前的版本还是需要操作Zookeeper,因此我们还是需要这样进行账号的操作。希望2.7.0版本的Scram API会比较好用吧,等笔者弄明白了再分享出来。言归正传,我们可以通过下面几行代码先行把登陆Zookeeper的认证文件kafka-broker-jaas.conf加载到系统中来。

static {//获取文件路径,这里笔者使用的是项目路径,也可以用绝对路径,目的是访问到文件,什么方法自由选择String path = KafkaCreateUser.class.getClass().getResource("/").getPath();//拿到文件对象File f = new File(path+"kafka-broker-jaas.conf");//存储到系统参数对象中,以备后续使用System.setProperty("java.security.auth.login.config", f.getAbsolutePath());
}

使用Scala方法创建账户

获取了文件对象以后,就可以调用Scala中的方法了,我们主要调用的方法是AdminZkClient.changeConfigs(entityType: String, entityName: String, configs: Properties)方法,这个语法是Scala中的语法,有点类似Java,我们直接用就可以了,Sample如下。

public void createAccount() throws NoSuchAlgorithmException {//获取ZookeeperClient对象,这里的/kafka是笔者建了一个zk上的目录,如果直接用192.168.33.101:2181ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//构造PropertiesProperties properties=new Properties();//构造Scram认证机制ScramMechanism为SCRAM_SHA_512ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");//构造Scram证书credential,这里"password_1234"就是真实的密码ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());//转化为认证串String credentialString=ScramCredentialUtils.credentialToString(credential);//添加到properties中备用properties.put(scramMechanism.mechanismName(),credentialString);//创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);
}

使用Scala方法查询存在的账户

和创建一样也需要使用Scala中的方法,这次使用的是依然是AdminZkClient类中的方法,一个是fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String)去查找指定的账户信息,其次是用fetchAllEntityConfigs(entityType: String)查找Kafka服务器中所有的账户信息,Sample如下。

public void findAccount() {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//为了查看结果,构造一个Properties对象用来承接返回值Properties properties=new Properties();//指定账号查询properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");//构建一个接收参数Map<String, Properties> propertiesAll=new HashMap<>();//查询所有信息propertiesAll =adminZkClient.fetchAllEntityConfigs(ConfigType.User());
}

使用Scala方法删除账户

说完创建和查找,那就剩删除了。删除对象其实分为两步,第一步清空Kafka集群上保存的信息,第二部删除Zookeeper上对应的节点。清空信息用的还是changeConfigs()方法,删除节点用的则是Zookeeper包里的delete(final String path, int version)方法,Sample如下。

public void deleteAccount() throws InterruptedException, KeeperException {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); //获取Zookeeper对象ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();//调用delete方法把对应的节点删除,版本设置为-1是不检测版本号//注意"/config/users/"这个路径是固定的,Zk里面就是这样存的,"kaf_aaa"是自己拼的账号名字zooKeeper.delete("/config/users/" +"kaf_aaa", -1);
}

总结

到此Kafka Scram账户相关的操作告一段落,如果想要删除账户同时清除账号下的权限,可以参考【Apache Kafka API AdminClient 账号对Topic权限赋予与移除】,自己做一个循环删除即可。

附:完整的Sample和注释

public class KafkaUserOperation {//加载zookeeper sasl机制授权登陆的配置文件static {String path = KafkaUserOperation.class.getClass().getResource("/").getPath();File f = new File(path+"zk-client-jaas.conf");System.setProperty("java.security.auth.login.config", f.getAbsolutePath());}public void createAccount() throws NoSuchAlgorithmException {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//构造PropertiesProperties properties=new Properties();//构造Scram认证机制ScramMechanism为SCRAM_SHA_512ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");//构造Scram证书credential,这里"password_1234"就是真实的密码ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());//转化为认证串String credentialString=ScramCredentialUtils.credentialToString(credential);//添加到properties中备用properties.put(scramMechanism.mechanismName(),credentialString);//创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);}public void findAccount() {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//为了查看是否成功,构造一个Properties对象用来承接返回值Properties properties=new Properties();//指定账号查询properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");Map<String, Properties> proAll=new HashMap<>();//查询所有信息proAll=adminZkClient.fetchAllEntityConfigs(ConfigType.User());}public void deleteAccount() throws InterruptedException, KeeperException {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); //获取Zookeeper对象ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();//调用delete方法把对应的节点删除,版本设置为-1是不检测版本号zooKeeper.delete("/config/users/"+"kaf_aaa", -1);}
}

Apache Kafka API AdminClient Scram账户的创建与删除相关推荐

  1. Apache Kafka API AdminClient Scram账户的操作(增删改查)

    前言 很久没有更新Kafka API相关的文档了,因为笔者工作变动Kafka这部分内容在工作中接触的就相对于之前少了一些.但架不住kafka官方还是一如既往的勤奋,官方操作Scram账户的创建与删除这 ...

  2. Kafka 如何给集群配置Scram账户认证

    前言 很早之前的一篇博客[Kafka+ Centos7服务器集群详细安装教程] 详细的说了下一个真正的集群应该如何搭建Kafka环境,由于当时的需求只是能够使用Kafka服务,因此并没有做别的什么配置 ...

  3. Apache Kafka:使用java方式操作消费组和重置分区偏移量(admin api)

    当前版本:kafka_2.12-2.8.0 1. 声明 当前内容主要为本人学习和测试使用java方式操作消费组和重置分区偏移量,主要参考:Apache Kafka官方文档 主要为: 使用java方式实 ...

  4. Spring Boot 和Apache Kafka的集成

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 1. 引言 Apache Kafka 是一个分布式的.容错 ...

  5. Apache Kafka 2.7.0 稳定版发布

    参考地址:https://www.orchome.com/9993 Apache Kafka 2.7.0 于2020年12月21日正式发布,这个版本是目前 Kafka 最新稳定版本,大家可以根据需要自 ...

  6. 【转载】Understanding When to use RabbitMQ or Apache Kafka

    https://content.pivotal.io/rabbitmq/understanding-when-to-use-rabbitmq-or-apache-kafka RabbitMQ: Erl ...

  7. Apache Kafka的流式SQL引擎——KSQL

    1. KSQL 介绍 KSQL 引擎--一个基于流的 SQL.推出 KSQL 是为了降低流式处理的门槛,为处理 Kafka 数据提供简单而完整的可交互式 SQL 接口.KSQL 目前可以支持多种流式操 ...

  8. Apache Kafka 3.0 版本发布

    Apache Kafka 3.0 发布, 发布日志:Kafka 我很高兴代表 Apache Kafka® 社区宣布 Apache Kafka 3.0 的发布.Apache Kafka 3.0 是一个涉 ...

  9. Apache Kafka 3.0.0 稳定版发布,有哪些值得关心的变化?

    Apache Kafka 3.0 于2021年9月21日正式发布.本文将介绍这个版本的新功能.以下文章翻译自 <What's New in Apache Kafka 3.0.0>. 我很高 ...

最新文章

  1. java6 disable ssl2.0_SpringBoot2.0如何启用https协议
  2. 【LuoguP3241】[HNOI2015] 开店
  3. 其实Go 1.17 就支持泛型了,具体该怎么用呢?
  4. 史上最大最贵 iPhone 发布,支持双卡双待,附发布会完整视频!
  5. 大公司笔试面试有哪些经典算法题目?
  6. 计算机控制技术第二版答案于微波,微波技术基础课后参考答案 (田加胜版)
  7. PLSQL和ORACLE客户端安装
  8. Pyside2中嵌入Matplotlib的绘图并保存(指定Graphics View)
  9. 将火狐浏览器视频播放倍速设置为3倍速及其以上
  10. linux命令清理磁盘空间,Unix/Linux中常用的清理磁盘空间的命令
  11. 三相同步电机怎么接线图_实用!41例电动机接线方法-清晰大图(上)
  12. 伊甸园日历游戏 c语言,HDU2149-Good Luck in CET-4 Everybody!(博弈,打表找规律)
  13. 集群出现块丢失,块找回,以及相关底层原理,fsck等
  14. 微信公众号上传文件附件教程
  15. 后端框架之Flask--初识
  16. 图片转素描(初级板)
  17. Nacos系列【25】源码分析篇之Spring Cloud启动器
  18. python调用Agora_Recording_SDK_for_Linux_FULL
  19. 半闲居士视觉SLAM十四讲笔记(2)初识 SLAM- part 2 linux CMake、Kdevelop
  20. 更新Mathtype版本后,在WORD中无法使用的解决方法

热门文章

  1. 操作系统1(OS,operating system)
  2. 解决闭包问题时 setTimeout执行顺序提前的问题
  3. solr mysql 增量索引_solr中实现MySQL数据全量索引和增量索引
  4. 工具类APP的生存之道
  5. DevTools 无法加载源映射:无法加载chrome-extension://ncennffkjdiamlpmcbajkmaiiiddgioo/js/xl-content.js.map 的内容
  6. 多重循环(图形打印2)-C语言
  7. Python 不完全LU分解
  8. Parsing error: No Babel config file detected for
  9. 元旦给计算机老师发贺词,元旦对老师的祝福语
  10. 数据中心对洪水风险应具备应急措施