kafka sasl java_Kafka安装及开启SASL_PLAINTEXT认证(用户名和密码认证)
前些日子要封装一个kafka的客户端驱动,配置了下kafka环境,发现配置复杂度完爆rabbitmq很多倍啊,而且发布订阅模式使用起来也很麻烦,可能就胜在分布式了吧。
kafka需要java环境,自行安装java sdk 1.8+.
官方加载安装包,当前为kafka_2.11-1.1.0.tgz
(新版kafka包内集成了zookeeper,直接启动即可)
解压缩后kafka目录下文件:
配置:
config路径下配置文件:
consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,默认即可
producer.properties 生产者配置,这个配置文件用于配置开启的生产者,默认即可
server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,核心配置如下:
(1)broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,默认即可
(2)listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,
例如:listeners=PLAINTEXT:// 192.168.1.1:9092。并确保服务器的9092端口能够访问
(3)zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,默认即可,
例如:zookeeper.connect=localhost:2181
启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台运行)
启动kafka:
bin/kafka-server-start.sh config/server.properties &
创建一个名为test的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看已经创建的topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
更改tioic分区:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --alter --partitions 4
查看指定topic信息:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
创建一个消息消费者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
(如果弹出WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available,需将上文中localhost改为队列配置的地址)
创建一个消息生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
出现 > 符号后输入消息发送,消费者连接可以看到输出。
(如果弹出WARN [Consumer clientId=consumer-1, groupId=console-consumer-59948] Connection to node -1 could not be established. Broker may not be available,
需将上文中localhost改为队列配置的地址)
Kafka安装完成。
Kafka开启使用 SASL_PLAINTEXT认证:
输入下面命令,关闭kafka:
bin/kafka-server-stop.sh
输入下面命令,关闭zookeeper:
bin/zookeeper-server-stop.sh
进入config目录,增加如下配置文件:
cd config
(1)touch kafka_server_jaas.conf
配置如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};
在KafkaServer部分,username和password是broker用于初始化连接到其他的broker,在上面配置中,admin用户为broker间的通讯,
user_userName定义了所有连接到 broker和 broker验证的所有的客户端连接包括其他 broker的用户密码,user_userName必须配置admin用户,否则报错。
(2)touch kafka_cilent_jaas.conf
配置如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
在KafkaClient部分,username和password是客户端用来配置客户端连接broker的用户,在上面配置中,客户端使用admin用户连接到broker。
更改server.properties配置文件:
listeners=SASL_PLAINTEXT://ip:9092
# 使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
#allow.everyone.if.no.acl.found=true
super.users=User:admin
修改consuer和producer的配置文件consumer.properties和producer.properties,分别增加如下配置:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
切换到kafka目录下bin路径:
cd ..
cd bin
JAAS文件作为每个broker的jvm参数,在kafka-server-start.sh脚本中增加如下配置(可在最上面):
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_server_jaas.conf"
在kafka-console-consumer.sh和kafka-console-producer.sh中添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"
启动zookeeper和kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台运行)
bin/kafka-server-start.sh config/server.properties &
输入命令,启动生产者:
bin/kafka-console-producer.sh --broker-list 10.100.17.79:9092 --topic test --producer.config config/producer.properties
输入命令,启动消费者
bin/kafka-console-consumer.sh --bootstrap-server 10.100.17.79:9092 --topic test --from-beginning --consumer.config config/consumer.properties
如果消费者启动不了或者无法消费指定topic,尝试设置所使用用户的组权限(当前使用用户admin为超级用户,不需要配置权限):
利用kafka-acls.sh为topic设置ACL:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:admin --group test-consumer-group --topic test
注意,如果admin要作为消费端连接alice-topic的话,必须对其使用的group(test-consumer-group)也赋权(group 在consumer.properties中有默认配置group.id)
权限操作例子:
增加权限:
# 为用户 alice 在 test(topic)上添加读写的权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:alice --operation Read --operation Write --topic test
# 对于 topic 为 test 的消息队列,拒绝来自 ip 为192.168.1.100账户为 zhangsan 进行 read 操作,其他用户都允许
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:* --allow-host * --deny-principal User:zhangsan
--deny-host 192.168.1.100 --operation Read --topic test
# 为 zhangsan 和 alice 添加all,以允许来自 ip 为192.168.1.100或者192.168.1.101的读写请求
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:zhangsan --allow-principal User:alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test
获取权限列表:
# 列出 topic 为 test 的所有权限账户
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --list --topic test
移除权限:
# 移除 acl
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --remove --allow-principal User:zhangsan --allow-principal User:Alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test
以上完成Kafka的SASL_PLAINTEXT认证。
多节点zookeeper下认证:
增加配置文件:
touch kafka_zoo_jaas.conf
配置如下:
ZKServer{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin";
};
配置应用名称为ZKServer,zookeeper默认使用的JAAS应用名称是Server(或者zookeeper)。
设置的环境变量其实只是传入到JVM的参数。这里设置的环境变量是KAFKA_OPTS。修改zookeeper的启动脚本zookeeper-server-start.sh如下:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf -Dzookeeper.sasl.serverconfig=ZKServer"
如果配置使用默认名称,则只需要添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf“”
修改zookeeper.properties配置文件,增加配置:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
以上配置完成后可启动尝试多节点zookeeper身份认证
kafka sasl java_Kafka安装及开启SASL_PLAINTEXT认证(用户名和密码认证)相关推荐
- kafka sasl java_Kafka 集群配置SASL+ACL
** Kafka 集群配置SASL+ACL 测试环境:** 系统: CentOS 6.5 x86_64 JDK : java version 1.8.0_121 kafka: kafka_2.11-1 ...
- kafka sasl java_kafka 添加SASL鉴权
kafka 版本信息:kafka_2.12-2.3.0 使用kafka自带的zookeeper启动 bin/zookeeper-server-start.sh config/zookeeper.pro ...
- 安装windows服务提示输入用户名和密码
在安装 windows服务的时候提示输入用户名和密码原因是权限问题 在serviceProcessInstaller1右键属性 Account 改为LocalService 在次安装成功,在服务列表中 ...
- mysql新密码认证_MySQL原生密码认证
基于密码的身份认证包括了两个部分: - 服务器端认证信息的存储 - 密码的认证过程 基于密码的身份认证有一个原则:仅使用人知道密码.密码不能被存储在认证服务器中,在认证过程中也不能通过网络明文传输.因 ...
- android dropbear 密码,dropbear key认证,禁止密码认证
dropbear 挺好的sshd 客户端,占用资源小,对于我这种资源控,是再合适不过的了.不过dropbear的文档实在是少.自己琢磨了下key认证.好禁止密码认证.共享给大家 key 认证和open ...
- kafka sasl java_Kafka SASL 安全认证
java client 中添加 SASL 设置信息: Java client consumer properties配置.png 注意 sasl.jaas.config 配置中的分号必不可少. pac ...
- EMQX配置 用户名和密码认证方式连接
1, 找到mqtt 的 /etc/plugins/ 文件夹下的emqx_auth_mnesia.conf 文件 2 ,vim打开编辑该文件 根据例子添加账号密码 并保存 auth.user.1.use ...
- kafak集群部署配置,开启SASL_PLAINTEXT认证以及acl权限控制
SASL_PLAINTEXT认证 本人认为就是consumer连接broker开启了用户名,密码认证 acl权限控制 就是指针对用户 配置拥有哪些操作权限,如 topic的读,写,group的读,to ...
- 大数据Hadoop之——Zookeeper鉴权认证(Kerberos认证+账号密码认证)
文章目录 一.Zookeeper概述与安装 二.Zookeeper Kerberos 鉴权认证 1)Kerberos安装 2)创建用户并生成keytab鉴权文件(前期准备) 3)独立zookeeper ...
最新文章
- CSS之定位(定位/相对定位)
- 一致性哈希算法----- 解决memecache 服务器扩容后的数据丢失。
- 怎么学python-新手如何自学python课程?
- [BTS]使用BizTalk开发应用系统,就是这么简单!
- wchar_t * 与 char * 互相转换小记
- 三角形描边css,[CSS] tips带有描边的小箭头
- mysql dump 1017_MySQL数据库导出 - Can't Wait Any Longer - OSCHINA - 中文开源技术交流社区...
- 7-21 求特殊方程的正整数解 (15 分)
- Windows Azure Cloud Service (25) 使用Startup注册COM组件(下)
- c语言随机抽取函数,Excel随机取值,给你三种解决方案
- RK987A键盘蓝牙连接电脑
- Android源码目录结构
- 古琴【A1】基本指法
- streaming mr
- ibm服务器进去阵列卡状态,IBM阵列卡介绍和服务器对阵列卡的支持情况
- 查询局域网内在线电脑IP地址
- 使用jQuery来锁定HTML表格中的首行和首列
- Adobe acrobat修改批注名字
- LocalDate、LocalTime、LocalDateTime使用与区别
- vue2与vue3的区别