引言

接到一个任务,调查一下Kafka的权限机制。捣鼓了2天,终于弄出来了。期间走了不少的坑。还有一堆不靠谱的家伙的博客。

  • Kafka版本 1.0.0
  • Scala 2.11
  • ZooKeeper 3.4.10

根据Kafka的官网文档可知,Kafka的权限认证主要有如下三种:

  • SSL
  • SASL(Kerberos) keytool&opssl脚本配置证书
  • SASL/PLAIN

其中SSL会导致数据传输延迟,所以不做推荐。一般常用的是后两种,第三种最方便。


Demo

Demo Provider & Consumer代码在github上,地址如下:
https://github.com/SeanYanxml/bigdata


操作步骤

操作步骤主要包括3点:

  • ZooKeepr端的配置
  • Kafka端配置
  • Kafka Consumer & Producer 端的配置

有的博客ZooKeeper和Kafka的配置配置在一台机器上,ZooKeeper读取的是Kafka的zookeeper.properties的配置,需要明确,不要弄混了。

ZooKeeper端配置

1 zoo.cfg文件配置

添加如下配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

2 在conf/目录下创建文件名为zk_server_jaas.conf的文件。

 Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-secret"user_admin="admin-secret";
};

配置文件我命名为zk_server_jaas.conf,并放在部署目录的conf/下。文件中定义了身份认证类(org.apache.kafka.common.security.plain.PlainLoginModule),可以看到这个认证类是kafka命名空间,也就是需要加入kafka的插件,所以下面一步非常重要。
这个文件中定义了两个用户,一个是kafka,一个是producer(user_可以定义多个用户,等于的值就是用户密码),这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用。还有两个属性,username和password,其中username是配置Zookeeper节点之间内部认证的用户名,password是对应的密码。

3 拷贝第三方Jar包到ZooKeeper目录。创建一个在conf/目录下,创建sasl_jars目录,放置这些Jar包。

第三方的Jar包包括,可以从https://mvnrepository.com/ 查看需要的包依赖。

kafka-clients-1.0.0.jar
lz4-java-1.4.jar
org.osgi.core-4.3.0.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.4.jar

4 更改脚本bin/zkEnv.sh,让ZooKeeper启动时,加载Jar包和jaas.conf文件

for i in "$ZOOBINDIR"/../conf/sasl_jars/*.jar; do  CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "

Kafka端配置

1 创建kafka_server_jaas.conf文件

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-secret"user_admin="admin-secret"user_alice="alice";
};KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="alice"password="alice";
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-secret";
};

2 配置server.properties配置文件

# server.properties
listeners=SASL_PLAINTEXT://ip(127.0.0.1):port(9092)
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# default false | true to accept  all the users to use it.
#allow.everyone.if.no.acl.found=true
super.users=User:admin;User:alice

3 最后需要为 Kafka 添加 java.security.auth.login.config 环境变量。在 bin/kafka-run-class.sh 中添加以下内容

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/meituan/kafka_2.10-0.10.0.0/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; thennohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
elseexec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

4 为你需要使用的用户授权

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security

查询已经授权的用户

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --list --topic sean-security

Consumer & Provider端配置

1 创建kafka_client_jaas.conf文件

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="alice"password="alice2";
};

2 让程序加载这个配置文件

System.setProperty("java.security.auth.login.config","/Users/Sean/Documents/Gitrep/bigdata/kafka/src/main/resources/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");

并且注意要把端口从9092改成你写的PLAIN后面的端口号。


成功标志

[cpic@62rhel kafka_2.11-1.0.0]$ ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --add --allow-principal User:alice --operation Read --operation Write --topic sean-security
Adding ACLs for resource `Topic:sean-security`:User:alice has Allow permission for operations: Read from hosts: *User:alice has Allow permission for operations: Write from hosts: *Current ACLs for resource `Topic:sean-security`:User:alice has Allow permission for operations: Read from hosts: *User:alice has Allow permission for operations: Write from hosts: *
[cpic@62rhel kafka_2.11-1.0.0]$ ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --list --topic sean-security
Current ACLs for resource `Topic:sean-security`:User:alice has Allow permission for operations: Read from hosts: *User:alice has Allow permission for operations: Write from hosts: *

Questions

  • Questions 1

ERROR Connection to node 0 failed authentication due to: Authen
解答: 权限控制没有配置 对,主要注意 zk_jass.conf kafka_jaas.conf kafka_client_jaas.conf 这三个文件的配置

  • Questions 2

Selector:189 - [Producer clientId=producer-1] Connection with 192.168.100.62/192.168.100.62 disconne
解答: 权限密码没有控制对,或者端口不是配置的端口。我之前写9092,配置PLAIN后面的端口为"9093",调试了半个多小时。

  • Question 3

WARN
javax.security.auth.login.LoginException: No JAAS configuration section named ‘Client’ was found in specified JAAS configuration file: ‘/usr/cpic/apps/kafka_2.11-1.0.0/current/packages/kafka_2.11-1.0.0/config/kafka_server_jaas.conf’. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
解答:kafka_jaas.conf 文件没有client子项,写的很清楚了。

  • Question 4

2018-03-14 16:42:57,206 [myid:] - ERROR [main:ZooKeeperServerMain@64] - Unexpected exception, exiting abnormally
java.io.IOException: Could not configure server because SASL configuration did not allow the ZooKeeper server to authenticate itself properly: javax.security.auth.login.LoginException: 无法找到 LoginModule 类: org.apache.kafka.common.security.plain.PlainLoginModule
at org.apache.zookeeper.server.ServerCnxnFactory.configureSaslLogin(ServerCnxnFactory.java:211)
at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:82)
at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:117)
at org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:87)
at org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:53)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
解答:zookeeper.out的报错 ZooKeeper的classpath没有需要的Jar包

  • Question 5

org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure
Kafka启动失败异常,密码不是ZooKeeper内配置的密码。

  • Question 6

org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client m
密码不一致错误


Reference

[1] 集群Kafka配置SASL用户名密码认证
[2] Kafka 0.10.0 SASL/PLAIN身份认证及权限实现
[3] Kafka JAAS Plain SASL 安全认证配置
[4] kafka使用SASL验证(官方文档中文版 0.10.1)
[5] Kafka ACLs in Practice – User Authentication and Authorization
[6] (StackOverFlow :kafka-sasl-zookeeper-authentication)
[7] StackOverFlow : kafka-sasl-plain-setup-with-ssl
[8] Kafka JAAS 安全认证流程


Debug

# provider 连接不上 log
19:29:52,228  INFO ConsumerConfig:223 - ConsumerConfig values: auto.commit.interval.ms = 1000auto.offset.reset = latestbootstrap.servers = [192.168.100.62:9093, 192.168.100.63:9093, 192.168.100.64:9093]check.crcs = trueclient.id = connections.max.idle.ms = 540000enable.auto.commit = trueexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = testheartbeat.interval.ms = 3000interceptor.classes = nullinternal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 305000retry.backoff.ms = 100sasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = PLAINsecurity.protocol = SASL_PLAINTEXTsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.apache.kafka.common.serialization.StringDeserializer19:29:52,233 DEBUG KafkaConsumer:177 - [Consumer clientId=consumer-1, groupId=test] Initializing the Kafka consumer
19:29:52,467 DEBUG Metadata:270 - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.100.63:9093 (id: -2 rack: null), 192.168.100.64:9093 (id: -3 rack: null), 192.168.100.62:9093 (id: -1 rack: null)], partitions = [])
19:29:52,586  INFO AbstractLogin:53 - Successfully logged in.
19:29:52,602 DEBUG Metrics:404 - Added sensor with name fetch-throttle-time
19:29:52,615 DEBUG Metrics:404 - Added sensor with name connections-closed:
19:29:52,620 DEBUG Metrics:404 - Added sensor with name connections-created:
19:29:52,621 DEBUG Metrics:404 - Added sensor with name successful-authentication:
19:29:52,621 DEBUG Metrics:404 - Added sensor with name failed-authentication:
19:29:52,622 DEBUG Metrics:404 - Added sensor with name bytes-sent-received:
19:29:52,623 DEBUG Metrics:404 - Added sensor with name bytes-sent:
19:29:52,625 DEBUG Metrics:404 - Added sensor with name bytes-received:
19:29:52,626 DEBUG Metrics:404 - Added sensor with name select-time:
19:29:52,627 DEBUG Metrics:404 - Added sensor with name io-time:
19:29:52,658 DEBUG Metrics:404 - Added sensor with name heartbeat-latency
19:29:52,661 DEBUG Metrics:404 - Added sensor with name join-latency
19:29:52,667 DEBUG Metrics:404 - Added sensor with name sync-latency
19:29:52,671 DEBUG Metrics:404 - Added sensor with name commit-latency
19:29:52,677 DEBUG Metrics:404 - Added sensor with name bytes-fetched
19:29:52,678 DEBUG Metrics:404 - Added sensor with name records-fetched
19:29:52,678 DEBUG Metrics:404 - Added sensor with name fetch-latency
19:29:52,679 DEBUG Metrics:404 - Added sensor with name records-lag
19:29:52,682  INFO AppInfoParser:109 - Kafka version : 1.0.0
19:29:52,682  INFO AppInfoParser:110 - Kafka commitId : aaa7af6d4a11b29d
19:29:52,684 DEBUG KafkaConsumer:177 - [Consumer clientId=consumer-1, groupId=test] Kafka consumer initialized
19:29:52,684 DEBUG KafkaConsumer:183 - [Consumer clientId=consumer-1, groupId=test] Subscribed to topic(s): sean-security
19:29:52,685 DEBUG AbstractCoordinator:183 - [Consumer clientId=consumer-1, groupId=test] Sending GroupCoordinator request to broker 192.168.100.62:9093 (id: -1 rack: null)
19:29:52,799 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Initiating connection to node 192.168.100.62:9093 (id: -1 rack: null)
19:29:52,881 DEBUG SaslClientAuthenticator:256 - Set SASL client state to SEND_APIVERSIONS_REQUEST
19:29:52,884 DEBUG SaslClientAuthenticator:150 - Creating SaslClient: client=null;service=kafka;serviceHostname=192.168.100.62;mechs=[PLAIN]
19:29:52,889 DEBUG Metrics:404 - Added sensor with name node--1.bytes-sent
19:29:52,890 DEBUG Metrics:404 - Added sensor with name node--1.bytes-received
19:29:52,890 DEBUG Metrics:404 - Added sensor with name node--1.latency
19:29:52,893 DEBUG Selector:195 - [Consumer clientId=consumer-1, groupId=test] Created socket with SO_RCVBUF = 66608, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node -1
19:29:52,898 DEBUG SaslClientAuthenticator:256 - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
19:29:52,899 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Completed connection to node -1. Fetching API versions.
19:29:52,902 DEBUG SaslClientAuthenticator:256 - Set SASL client state to SEND_HANDSHAKE_REQUEST
19:29:52,903 DEBUG SaslClientAuthenticator:256 - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
19:29:52,903 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Initialize connection to node 192.168.100.63:9093 (id: -2 rack: null) for sending metadata request
19:29:52,904 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Initiating connection to node 192.168.100.63:9093 (id: -2 rack: null)
19:29:52,905 DEBUG SaslClientAuthenticator:256 - Set SASL client state to SEND_APIVERSIONS_REQUEST
19:29:52,905 DEBUG SaslClientAuthenticator:150 - Creating SaslClient: client=null;service=kafka;serviceHostname=192.168.100.63;mechs=[PLAIN]
19:29:52,907 DEBUG SaslClientAuthenticator:256 - Set SASL client state to INITIAL
19:29:52,908 DEBUG SaslClientAuthenticator:256 - Set SASL client state to INTERMEDIATE
19:29:52,908 DEBUG Metrics:404 - Added sensor with name node--2.bytes-sent
19:29:52,909 DEBUG Metrics:404 - Added sensor with name node--2.bytes-received
19:29:52,911 DEBUG Metrics:404 - Added sensor with name node--2.latency
19:29:52,911 DEBUG Selector:195 - [Consumer clientId=consumer-1, groupId=test] Created socket with SO_RCVBUF = 66608, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node -2
19:29:52,912 DEBUG SaslClientAuthenticator:256 - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
19:29:52,912 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Completed connection to node -2. Fetching API versions.
19:29:52,913 DEBUG SaslClientAuthenticator:256 - Set SASL client state to FAILED
19:29:52,914 DEBUG Selector:189 - [Consumer clientId=consumer-1, groupId=test] Connection with 192.168.100.62/192.168.100.62 disconnected due to authentication exception
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
19:29:52,922 DEBUG NetworkClient:183 - [Consumer clientId=consumer-1, groupId=test] Node -1 disconnected.
19:29:52,924 ERROR NetworkClient:296 - [Consumer clientId=consumer-1, groupId=test] Connection to node -1 failed authentication due to: Authentication failed: Invalid username or password
Exception in thread "main" org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password

Kafka SASL配置 Demo测试相关推荐

  1. Kafka SASL/PLAIN 环境构建(Docker版)

    前言 近来,老有朋友询问Kafka SASL配置 & Demo测试 这篇文章的相关内容.近来无事,准备将所以的组件打成镜像.此处,便讲的为 Kafka SASL/PLAIN权限验证镜像模块的构 ...

  2. Kafka安装配置(SASL/SCRAM动态认证)

    SASL/SCRAM验证方法可以在Kafka服务启动之后,动态的新增用户分并配权限,在业务变动频繁,开发人员多的情况下比SASL/PLAIN方法更加灵活. Zookeeper:3.4.13,kafka ...

  3. WIN10python3.7配置MaskRCNN环境及demo测试(tensorflow-gpu1.x)

    WIN10python3.7配置MaskRCNN环境及demo测试(tensorflow-gpu1.x) 一.工具 二. 环境搭建 三.下载mask_R-CNN 四.运行demo进行测试 #本文参考了 ...

  4. SpringBoot笔记:SpringBoot2.3集成Kafka组件配置

    文章目录 说明 Springboot集成Kafka 依赖配置 配置文件yml配置 Producer生产者 Consumer消费者 测试代码 注意事项 说明 本文是接<Kafka学习:CentOS ...

  5. 高效实用Kafka-深入理解Kafka启动配置(使用kafka自身内置Zookeeper)

    导语   在上一篇博客中简单的介绍了关于消息系统消息中间件技术以及Kafka的一些基础.这篇文章主要是来介绍关于Kafka的架构以及如何安装Kafka.  首先先来安装Kafka,对于Kafka的安装 ...

  6. graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五)

    graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件 ...

  7. Kafka SASL/PLAIN加密 及Kafka-Python整合

    SASL/PLAIN 前言 SASL/PLAIN是kafka中一种使用用户名/密码的身份验证机制,本文使用Kafka-Python2.02 及kafka3.2.0进行简单的整合操作. 一.配置Kafk ...

  8. Kafka SASL/SCRAM+ACL实现动态创建用户及权限控制

    文章目录 SASL_SCRAM+ACL实现动态创建用户及权限控制 使用SASL / SCRAM进行身份验证 1. 创建SCRAM Credentials 创建broker建通信用户(或称超级用户) 创 ...

  9. YOLOv4 资源环境配置和测试样例效果

    YOLOv4 资源环境配置和测试样例效果 基本环境:cuda=10.0,cudnn>=7.0, opencv>=2.4 一.下载yolov4 git clone https://githu ...

最新文章

  1. Kanzi常用操作2
  2. python下载安装教程图解-一招解决:各种版本的Python下载安装教程
  3. 新建linux 服务器初始化配置
  4. C++实现trie tree字典树(附完整源码)
  5. matlab multithreading spyder,spyder和python的关系是什么
  6. Typora图片上传和加载问题解决方案
  7. Whoops, looks like something went wrong.
  8. Unwind 栈回溯详解:libunwind
  9. android源码分析网上随笔记录
  10. AX2012 R3 Data upgrade checklist sync database step, failed to create a session;
  11. Java中的final、static、this、super 关键字
  12. 【汉诺塔问题】递归算法求解汉诺塔问题
  13. 在Linux上安装centos 7镜像详细步骤
  14. Method annotated with @Bean is called directly. Use dependency injection instead.
  15. 【转载】开源项目推荐:Qt有关的GitHub/Gitee开源项目(★精品收藏★)
  16. 玖富曲线入股湖北消金,已成第二大股东
  17. 复旦大学计算机专业好还是浙大好,复旦大学和浙江大学,哪个更强一点?很多人都猜错了...
  18. 【论文笔记】Catching Both Gray and Black Swans: Open-set Supervised Anomaly Detection*
  19. 观小林coding图解网络总结
  20. LinkedList入门教程

热门文章

  1. 司普沃浅谈蓝莓种植技术与管理
  2. 《淘宝网开店 拍摄 修图 设计 装修 实战150招》导读
  3. 美团再杀入共享充电宝的阳谋
  4. 网易云音乐评论爬虫 params encSecKey逆向分析!
  5. 大庆铁人精神与时俱进 石油石化行业如何利用ICT基础设施驱动价值创造?
  6. 计算机图像相关应用研究,计算机图像处理技术的应用探讨.pdf
  7. 使用tf.data.Dataset加载numpy数据
  8. 关于联通IPTV盒子和光猫之间接无线路由器
  9. 片上总线学习之Wishbone
  10. qq图的理解以及python的实现