一、概述

  1. GSSAPI: 使用的Kerberos认证,可以集成目录服务,比如AD。从Kafka0.9版本开始支持
  2. PLAIN: 使用简单用户名和密码形式。从Kafka0.10版本开始支持
  3. SCRAM: 主要解决PLAIN动态更新问题以及安全机制,从Kafka0.10.2开始支持
  4. OAUTHBEARER: 基于OAuth 2认证框架,从Kafka2.0版本开始支持

二、配置SASL/PLAIN

  1. kafka配置文件server.properties配置
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=true
listeners=SASL_PLAINTEXT://<IP Address>:9092
advertised.listeners=SASL_PLAINTEXT://<IP Address>:9092

2. 创建kafka_server_jaas.conf文件

这个文件是配置Broker服务端的JASS,在Kafka程序目录的config目录中创建kafka_servre_jass.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};

username和password配置成具体的账号和密码

KafkaServer字段是用来配置broker间通信使用的用户名和密码以及客户端连接时需要的用户名和密码,其中username和password是broker用于初始化连接到其他的broker,kafka用户为broker间的通讯。在一个Kafka集群中,这个文件的内容要一样,集群中每个Borker去连接其他Broker的时候都使用这个文件中定义的username和password来让对方进行认证。

Client部分是用来设置与Zookeeper的连接的,它还允许broker设置 SASL ACL 到zookeeper 节点,锁定这些节点,只有broker可以修改它。如果Zookeeper与Broker之间不设置认证,那么就可以不配置Client部分。

3. 启动时指定安全配置

export KAFKA_OPTS="-Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_server_jaas.conf"

或将以上指令写入到启动脚本中bin/kafka-server-start.sh

4.  启动kafka server

bin/kafka-server-start.sh -daemon config/server.properties

5. kafka client认证

(1) 新建kafka_client_jaas.conf文件

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
Client {org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="admin"password="admin-secret";
};

username和password是真实配置的用户名密码

(2) 使用kafka自带脚本消费消息

$ export KAFKA_OPTS="-Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"$ ./bin/kafka-console-consumer.sh --topic test-topic --from-beginning  --consumer.config=config/consumer.properties --bootstrap-server=localhost:9092

(3)  使用kafka自带脚本生产消息

$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
--producer.config=config/producer.properties

6. Java程序使用

引入依赖

<!-- kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version>
</dependency>

kafka-clients 3.0版本

配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//配置security.protocol
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getSecurity().getProtocol());
//配置sasl.mechanism
properties.put(SaslConfigs.SASL_MECHANISM, config.getSasl().getMechanism());
//配置sasl.jaas.config
properties.put(SaslConfigs.SASL_JAAS_CONFIG, config.getSasl().getJaasConfig());KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

感谢:

https://www.cnblogs.com/rexcheny/articles/12884990.html
https://docs.vmware.com/en/VMware-Smart-Assurance/10.1.0/sa-ui-installation-config-guide-10.1.0/GUID-3E473EC3-732A-4963-81BD-13BCCD3AC700.html

Kafka安全认证(Java)相关推荐

  1. Kafka动态认证SASL/SCRAM配置+整合springboot配置

    记录: zookeeper启动命令: [root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh start ...

  2. kafka权限认证ssl

    2016-11-14 18:16:01 shengjk1 阅读数 18141更多 分类专栏: 工作之行 版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和 ...

  3. 大数据Hadoop之——Kafka安全机制(Kafka SSL认证实现)

    文章目录 一.概述 1)SASL认证概述 2)Delegation Token认证概述 3)SSL认证概述(本章实现) 二.各种安全认证机制对比和使用场景 三.Kafka SSL认证实现 1)创建ss ...

  4. kafka 0.9 java开发_kafka 0.9 java producer and consumer demo

    实验环境: kafka_2.11-0.9.0.1.tgz zookeeper-3.4.6.tar.gz 样例代码: git clone https://github.com/downgoon/hell ...

  5. python子进程进行kinit认证_使用kafka-python客户端进行kafka kerberos认证

    之前说过python confluent kafka客户端做kerberos认证的过程,如果使用kafka python客户端的话同样也可以进行kerberos的认证,具体的认证机制这里不再描述,主要 ...

  6. scjp java程序员_Sun认证Java程序员SCJP考题常见陷阱

    (1) Two public classes in the same file. (illegal) 同一个文件里有两个public类.(非法) (2) Main method calling a n ...

  7. java的行业认证_Sun认证Java程序员考试介绍

    Sun认证Java程序员考试介绍 Sun Microsystems在行业中被认为是同行中最具创造性的企业之一,它想尝试新的软件方式和定价模式等等.以下是关于Sun认证Java程序员考试介绍,欢迎大家参 ...

  8. storm和kafka集成报java.lang.ClassNotFoundException: kafka.api.OffsetRequest解决方法

    添加依赖 <dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka& ...

  9. 【ssl认证、证书】SSL双向认证java实战、keytool创建证书

    文章目录 概述 keytool示例 参考 相关文章: //-----------Java SSL begin---------------------- [ssl认证.证书]SSL双向认证和SSL单向 ...

  10. 参加Sun认证Java程序员_sun认证java程序员笔试经验

    sun认证java程序员笔试经验 SCJP测验Java程序设计概念及能力,内容偏重于Java语法及JDK的内容;SCJD则进一步测试用Java开发应用程序的能力,考试者必须先完成一个程序的设计方案,再 ...

最新文章

  1. 把字符串转换为日期时间
  2. 写一个实时监控IP连通性的小脚本
  3. php修改session生存时间,修改session的过期(生存)时间
  4. sqlite3修改表内容python_python之sqlite3使用详解(转载)
  5. 心得复述知识体系:《强化学习》中的蒙特卡洛方法 Monte Carlo Methods in Reinforcement Learning
  6. VC绘制控件如何防止闪烁
  7. testNg官方文档
  8. Linux串口驱动分析及移植
  9. c语言宠物店管理系统,宠物店信息管理系统的设计课程设计报告精选.doc
  10. 【csdn上使用MathType编写公式建议方法】【亲测有效!!!】
  11. The Productive Programmer 读书笔记
  12. html网页实现查询功能实现,CSS与媒体查询实现网页导航功能(附代码)
  13. 撸一个聊天室(vue+koa2+websokect+mongodb)
  14. scanf ---------未完待续
  15. 学习金字塔——谈谈对学习的一点理解
  16. app按钮没反应android,求助,uni-app按钮点击没反应
  17. zookeepr 简介
  18. 您为什么要加入CSDN个人空间
  19. 计算机学硕调剂分数,计算机考研调剂的一些经验
  20. HDLBits—Lemmings3

热门文章

  1. java application作用_1. 下列关于 Java Application 程序在结构上特点的描述中,错误的是 ( )_汽车修理工题库答案_学小易找答案...
  2. 一天一个小算法——最小生成树(kru Prm)
  3. java 鸡尾酒排序_java交换排序之鸡尾酒排序实现方法
  4. 格子玻尔兹曼机(Lattice Boltzmann Method)系列5:LBM多相流实例之Shan-Chen模型
  5. 1236mysql_MySQL1236错误解决方法_MySQL
  6. 解决Ubuntu系统上网速度慢的问题
  7. kafka不消费:9092 (id: 0 rack: null)
  8. java的inputbox,解释InputBox函数(输入函数)的用途
  9. oracle的显示和隐式,oracle的显式受权和隐式授权(转)
  10. 缓解焦虑,这9种食物必不可少!