Kafka SASL配置 Demo测试
引言
接到一个任务,调查一下Kafka的权限机制。捣鼓了2天,终于弄出来了。期间走了不少的坑。还有一堆不靠谱的家伙的博客。
- Kafka版本 1.0.0
- Scala 2.11
- ZooKeeper 3.4.10
根据Kafka的官网文档可知,Kafka的权限认证主要有如下三种:
- SSL
- SASL(Kerberos) keytool&opssl脚本配置证书
- SASL/PLAIN
其中SSL会导致数据传输延迟,所以不做推荐。一般常用的是后两种,第三种最方便。
Demo
Demo Provider & Consumer代码在github上,地址如下:
https://github.com/SeanYanxml/bigdata
操作步骤
操作步骤主要包括3点:
- ZooKeepr端的配置
- Kafka端配置
- Kafka Consumer & Producer 端的配置
有的博客ZooKeeper和Kafka的配置配置在一台机器上,ZooKeeper读取的是Kafka的zookeeper.properties的配置,需要明确,不要弄混了。
ZooKeeper端配置
1 zoo.cfg文件配置
添加如下配置:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
2 在conf/目录下创建文件名为zk_server_jaas.conf
的文件。
Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-secret"user_admin="admin-secret";
};
配置文件我命名为zk_server_jaas.conf,并放在部署目录的conf/下。文件中定义了身份认证类(org.apache.kafka.common.security.plain.PlainLoginModule),可以看到这个认证类是kafka命名空间,也就是需要加入kafka的插件,所以下面一步非常重要。
这个文件中定义了两个用户,一个是kafka,一个是producer(user_可以定义多个用户,等于的值就是用户密码),这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用。还有两个属性,username和password,其中username是配置Zookeeper节点之间内部认证的用户名,password是对应的密码。
3 拷贝第三方Jar包到ZooKeeper目录。创建一个在conf/目录下,创建sasl_jars目录,放置这些Jar包。
第三方的Jar包包括,可以从https://mvnrepository.com/ 查看需要的包依赖。
kafka-clients-1.0.0.jar
lz4-java-1.4.jar
org.osgi.core-4.3.0.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.4.jar
4 更改脚本bin/zkEnv.sh
,让ZooKeeper启动时,加载Jar包和jaas.conf文件
for i in "$ZOOBINDIR"/../conf/sasl_jars/*.jar; do CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
Kafka端配置
1 创建kafka_server_jaas.conf
文件
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-secret"user_admin="admin-secret"user_alice="alice";
};KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="alice"password="alice";
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-secret";
};
2 配置server.properties
配置文件
# server.properties
listeners=SASL_PLAINTEXT://ip(127.0.0.1):port(9092)
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# default false | true to accept all the users to use it.
#allow.everyone.if.no.acl.found=true
super.users=User:admin;User:alice
3 最后需要为 Kafka 添加 java.security.auth.login.config 环境变量。在 bin/kafka-run-class.sh 中添加以下内容
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/meituan/kafka_2.10-0.10.0.0/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; thennohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
elseexec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
4 为你需要使用的用户授权
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security
查询已经授权的用户
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --list --topic sean-security
Consumer & Provider端配置
1 创建kafka_client_jaas.conf
文件
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="alice"password="alice2";
};
2 让程序加载这个配置文件
System.setProperty("java.security.auth.login.config","/Users/Sean/Documents/Gitrep/bigdata/kafka/src/main/resources/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");
并且注意要把端口从9092改成你写的PLAIN后面的端口号。
成功标志
[cpic@62rhel kafka_2.11-1.0.0]$ ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security
Adding ACLs for resource `Topic:sean-security`:User:alice has Allow permission for operations: Read from hosts: *User:alice has Allow permission for operations: Write from hosts: *Current ACLs for resource `Topic:sean-security`:User:alice has Allow permission for operations: Read from hosts: *User:alice has Allow permission for operations: Write from hosts: *
[cpic@62rhel kafka_2.11-1.0.0]$ ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --list --topic sean-security
Current ACLs for resource `Topic:sean-security`:User:alice has Allow permission for operations: Read from hosts: *User:alice has Allow permission for operations: Write from hosts: *
Questions
- Questions 1
ERROR Connection to node 0 failed authentication due to: Authen
解答: 权限控制没有配置 对,主要注意 zk_jass.conf kafka_jaas.conf kafka_client_jaas.conf 这三个文件的配置
- Questions 2
Selector:189 - [Producer clientId=producer-1] Connection with 192.168.100.62/192.168.100.62 disconne
解答: 权限密码没有控制对,或者端口不是配置的端口。我之前写9092,配置PLAIN后面的端口为"9093",调试了半个多小时。
- Question 3
WARN
javax.security.auth.login.LoginException: No JAAS configuration section named ‘Client’ was found in specified JAAS configuration file: ‘/usr/cpic/apps/kafka_2.11-1.0.0/current/packages/kafka_2.11-1.0.0/config/kafka_server_jaas.conf’. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
解答:kafka_jaas.conf 文件没有client子项,写的很清楚了。
- Question 4
2018-03-14 16:42:57,206 [myid:] - ERROR [main:ZooKeeperServerMain@64] - Unexpected exception, exiting abnormally
java.io.IOException: Could not configure server because SASL configuration did not allow the ZooKeeper server to authenticate itself properly: javax.security.auth.login.LoginException: 无法找到 LoginModule 类: org.apache.kafka.common.security.plain.PlainLoginModule
at org.apache.zookeeper.server.ServerCnxnFactory.configureSaslLogin(ServerCnxnFactory.java:211)
at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:82)
at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:117)
at org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:87)
at org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:53)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
解答:zookeeper.out的报错 ZooKeeper的classpath没有需要的Jar包
- Question 5
org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure
Kafka启动失败异常,密码不是ZooKeeper内配置的密码。
- Question 6
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client m
密码不一致错误
Reference
[1] 集群Kafka配置SASL用户名密码认证
[2] Kafka 0.10.0 SASL/PLAIN身份认证及权限实现
[3] Kafka JAAS Plain SASL 安全认证配置
[4] kafka使用SASL验证(官方文档中文版 0.10.1)
[5] Kafka ACLs in Practice – User Authentication and Authorization
[6] (StackOverFlow :kafka-sasl-zookeeper-authentication)
[7] StackOverFlow : kafka-sasl-plain-setup-with-ssl
[8] Kafka JAAS 安全认证流程
Debug
# provider 连接不上 log
19:29:52,228 INFO ConsumerConfig:223 - ConsumerConfig values: auto.commit.interval.ms = 1000auto.offset.reset = latestbootstrap.servers = [192.168.100.62:9093, 192.168.100.63:9093, 192.168.100.64:9093]check.crcs = trueclient.id = connections.max.idle.ms = 540000enable.auto.commit = trueexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = testheartbeat.interval.ms = 3000interceptor.classes = nullinternal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 305000retry.backoff.ms = 100sasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = PLAINsecurity.protocol = SASL_PLAINTEXTsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.apache.kafka.common.serialization.StringDeserializer19:29:52,233 DEBUG KafkaConsumer:177 - [Consumer clientId=consumer-1, groupId=test] Initializing the Kafka consumer
19:29:52,467 DEBUG Metadata:270 - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.100.63:9093 (id: -2 rack: null), 192.168.100.64:9093 (id: -3 rack: null), 192.168.100.62:9093 (id: -1 rack: null)], partitions = [])
19:29:52,586 INFO AbstractLogin:53 - Successfully logged in.
19:29:52,602 DEBUG Metrics:404 - Added sensor with name fetch-throttle-time
19:29:52,615 DEBUG Metrics:404 - Added sensor with name connections-closed:
19:29:52,620 DEBUG Metrics:404 - Added sensor with name connections-created:
19:29:52,621 DEBUG Metrics:404 - Added sensor with name successful-authentication:
19:29:52,621 DEBUG Metrics:404 - Added sensor with name failed-authentication:
19:29:52,622 DEBUG Metrics:404 - Added sensor with name bytes-sent-received:
19:29:52,623 DEBUG Metrics:404 - Added sensor with name bytes-sent:
19:29:52,625 DEBUG Metrics:404 - Added sensor with name bytes-received:
19:29:52,626 DEBUG Metrics:404 - Added sensor with name select-time:
19:29:52,627 DEBUG Metrics:404 - Added sensor with name io-time:
19:29:52,658 DEBUG Metrics:404 - Added sensor with name heartbeat-latency
19:29:52,661 DEBUG Metrics:404 - Added sensor with name join-latency
19:29:52,667 DEBUG Metrics:404 - Added sensor with name sync-latency
19:29:52,671 DEBUG Metrics:404 - Added sensor with name commit-latency
19:29:52,677 DEBUG Metrics:404 - Added sensor with name bytes-fetched
19:29:52,678 DEBUG Metrics:404 - Added sensor with name records-fetched
19:29:52,678 DEBUG Metrics:404 - Added sensor with name fetch-latency
19:29:52,679 DEBUG Metrics:404 - Added sensor with name records-lag
19:29:52,682 INFO AppInfoParser:109 - Kafka version : 1.0.0
19:29:52,682 INFO AppInfoParser:110 - Kafka commitId : aaa7af6d4a11b29d
19:29:52,684 DEBUG KafkaConsumer:177 - [Consumer clientId=consumer-1, groupId=test] Kafka consumer initialized
19:29:52,684 DEBUG KafkaConsumer:183 - [Consumer clientId=consumer-1, groupId=test] Subscribed to topic(s): sean-security
19:29:52,685 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=test] Sending GroupCoordinator request to broker 192.168.100.62:9093 (id: -1 rack: null)
19:29:52,799 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Initiating connection to node 192.168.100.62:9093 (id: -1 rack: null)
19:29:52,881 DEBUG SaslClientAuthenticator:256 - Set SASL client state to SEND_APIVERSIONS_REQUEST
19:29:52,884 DEBUG SaslClientAuthenticator:150 - Creating SaslClient: client=null;service=kafka;serviceHostname=192.168.100.62;mechs=[PLAIN]
19:29:52,889 DEBUG Metrics:404 - Added sensor with name node--1.bytes-sent
19:29:52,890 DEBUG Metrics:404 - Added sensor with name node--1.bytes-received
19:29:52,890 DEBUG Metrics:404 - Added sensor with name node--1.latency
19:29:52,893 DEBUG Selector:195 - [Consumer clientId=consumer-1, groupId=test] Created socket with SO_RCVBUF = 66608, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node -1
19:29:52,898 DEBUG SaslClientAuthenticator:256 - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
19:29:52,899 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Completed connection to node -1. Fetching API versions.
19:29:52,902 DEBUG SaslClientAuthenticator:256 - Set SASL client state to SEND_HANDSHAKE_REQUEST
19:29:52,903 DEBUG SaslClientAuthenticator:256 - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
19:29:52,903 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Initialize connection to node 192.168.100.63:9093 (id: -2 rack: null) for sending metadata request
19:29:52,904 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Initiating connection to node 192.168.100.63:9093 (id: -2 rack: null)
19:29:52,905 DEBUG SaslClientAuthenticator:256 - Set SASL client state to SEND_APIVERSIONS_REQUEST
19:29:52,905 DEBUG SaslClientAuthenticator:150 - Creating SaslClient: client=null;service=kafka;serviceHostname=192.168.100.63;mechs=[PLAIN]
19:29:52,907 DEBUG SaslClientAuthenticator:256 - Set SASL client state to INITIAL
19:29:52,908 DEBUG SaslClientAuthenticator:256 - Set SASL client state to INTERMEDIATE
19:29:52,908 DEBUG Metrics:404 - Added sensor with name node--2.bytes-sent
19:29:52,909 DEBUG Metrics:404 - Added sensor with name node--2.bytes-received
19:29:52,911 DEBUG Metrics:404 - Added sensor with name node--2.latency
19:29:52,911 DEBUG Selector:195 - [Consumer clientId=consumer-1, groupId=test] Created socket with SO_RCVBUF = 66608, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node -2
19:29:52,912 DEBUG SaslClientAuthenticator:256 - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
19:29:52,912 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Completed connection to node -2. Fetching API versions.
19:29:52,913 DEBUG SaslClientAuthenticator:256 - Set SASL client state to FAILED
19:29:52,914 DEBUG Selector:189 - [Consumer clientId=consumer-1, groupId=test] Connection with 192.168.100.62/192.168.100.62 disconnected due to authentication exception
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
19:29:52,922 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Node -1 disconnected.
19:29:52,924 ERROR NetworkClient:296 - [Consumer clientId=consumer-1, groupId=test] Connection to node -1 failed authentication due to: Authentication failed: Invalid username or password
Exception in thread "main" org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
Kafka SASL配置 Demo测试相关推荐
- Kafka SASL/PLAIN 环境构建(Docker版)
前言 近来,老有朋友询问Kafka SASL配置 & Demo测试 这篇文章的相关内容.近来无事,准备将所以的组件打成镜像.此处,便讲的为 Kafka SASL/PLAIN权限验证镜像模块的构 ...
- Kafka安装配置(SASL/SCRAM动态认证)
SASL/SCRAM验证方法可以在Kafka服务启动之后,动态的新增用户分并配权限,在业务变动频繁,开发人员多的情况下比SASL/PLAIN方法更加灵活. Zookeeper:3.4.13,kafka ...
- WIN10python3.7配置MaskRCNN环境及demo测试(tensorflow-gpu1.x)
WIN10python3.7配置MaskRCNN环境及demo测试(tensorflow-gpu1.x) 一.工具 二. 环境搭建 三.下载mask_R-CNN 四.运行demo进行测试 #本文参考了 ...
- SpringBoot笔记:SpringBoot2.3集成Kafka组件配置
文章目录 说明 Springboot集成Kafka 依赖配置 配置文件yml配置 Producer生产者 Consumer消费者 测试代码 注意事项 说明 本文是接<Kafka学习:CentOS ...
- 高效实用Kafka-深入理解Kafka启动配置(使用kafka自身内置Zookeeper)
导语 在上一篇博客中简单的介绍了关于消息系统消息中间件技术以及Kafka的一些基础.这篇文章主要是来介绍关于Kafka的架构以及如何安装Kafka. 首先先来安装Kafka,对于Kafka的安装 ...
- graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五)
graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件 ...
- Kafka SASL/PLAIN加密 及Kafka-Python整合
SASL/PLAIN 前言 SASL/PLAIN是kafka中一种使用用户名/密码的身份验证机制,本文使用Kafka-Python2.02 及kafka3.2.0进行简单的整合操作. 一.配置Kafk ...
- Kafka SASL/SCRAM+ACL实现动态创建用户及权限控制
文章目录 SASL_SCRAM+ACL实现动态创建用户及权限控制 使用SASL / SCRAM进行身份验证 1. 创建SCRAM Credentials 创建broker建通信用户(或称超级用户) 创 ...
- YOLOv4 资源环境配置和测试样例效果
YOLOv4 资源环境配置和测试样例效果 基本环境:cuda=10.0,cudnn>=7.0, opencv>=2.4 一.下载yolov4 git clone https://githu ...
最新文章
- Kanzi常用操作2
- python下载安装教程图解-一招解决:各种版本的Python下载安装教程
- 新建linux 服务器初始化配置
- C++实现trie tree字典树(附完整源码)
- matlab multithreading spyder,spyder和python的关系是什么
- Typora图片上传和加载问题解决方案
- Whoops, looks like something went wrong.
- Unwind 栈回溯详解:libunwind
- android源码分析网上随笔记录
- AX2012 R3 Data upgrade checklist sync database step, failed to create a session;
- Java中的final、static、this、super 关键字
- 【汉诺塔问题】递归算法求解汉诺塔问题
- 在Linux上安装centos 7镜像详细步骤
- Method annotated with @Bean is called directly. Use dependency injection instead.
- 【转载】开源项目推荐:Qt有关的GitHub/Gitee开源项目(★精品收藏★)
- 玖富曲线入股湖北消金,已成第二大股东
- 复旦大学计算机专业好还是浙大好,复旦大学和浙江大学,哪个更强一点?很多人都猜错了...
- 【论文笔记】Catching Both Gray and Black Swans: Open-set Supervised Anomaly Detection*
- 观小林coding图解网络总结
- LinkedList入门教程