前些日子要封装一个kafka的客户端驱动,配置了下kafka环境,发现配置复杂度完爆rabbitmq很多倍啊,而且发布订阅模式使用起来也很麻烦,可能就胜在分布式了吧。

kafka需要java环境,自行安装java sdk 1.8+.

官方加载安装包,当前为kafka_2.11-1.1.0.tgz

(新版kafka包内集成了zookeeper,直接启动即可)

解压缩后kafka目录下文件:

配置:

config路径下配置文件:

consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,默认即可

producer.properties 生产者配置,这个配置文件用于配置开启的生产者,默认即可

server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,核心配置如下:

(1)broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,默认即可

(2)listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,

例如:listeners=PLAINTEXT:// 192.168.1.1:9092。并确保服务器的9092端口能够访问

(3)zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,默认即可,

例如:zookeeper.connect=localhost:2181

启动zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台运行)

启动kafka:

bin/kafka-server-start.sh config/server.properties &

创建一个名为test的topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看已经创建的topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

更改tioic分区:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --alter --partitions 4

查看指定topic信息:

bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe

创建一个消息消费者:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

(如果弹出WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available,需将上文中localhost改为队列配置的地址)

创建一个消息生产者:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

出现 > 符号后输入消息发送,消费者连接可以看到输出。

(如果弹出WARN [Consumer clientId=consumer-1, groupId=console-consumer-59948] Connection to node -1 could not be established. Broker may not be available,

需将上文中localhost改为队列配置的地址)

Kafka安装完成。

Kafka开启使用 SASL_PLAINTEXT认证:

输入下面命令,关闭kafka:

bin/kafka-server-stop.sh

输入下面命令,关闭zookeeper:

bin/zookeeper-server-stop.sh

进入config目录,增加如下配置文件:

cd config

(1)touch kafka_server_jaas.conf

配置如下:

KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin"

user_admin="admin"

user_alice="alice";

};

在KafkaServer部分,username和password是broker用于初始化连接到其他的broker,在上面配置中,admin用户为broker间的通讯,

user_userName定义了所有连接到 broker和 broker验证的所有的客户端连接包括其他 broker的用户密码,user_userName必须配置admin用户,否则报错。

(2)touch kafka_cilent_jaas.conf

配置如下:

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin";

};

在KafkaClient部分,username和password是客户端用来配置客户端连接broker的用户,在上面配置中,客户端使用admin用户连接到broker。

更改server.properties配置文件:

listeners=SASL_PLAINTEXT://ip:9092

# 使用的认证协议

security.inter.broker.protocol=SASL_PLAINTEXT

#SASL机制

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

# 完成身份验证的类

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。

#allow.everyone.if.no.acl.found=true

super.users=User:admin

修改consuer和producer的配置文件consumer.properties和producer.properties,分别增加如下配置:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

切换到kafka目录下bin路径:

cd ..

cd bin

JAAS文件作为每个broker的jvm参数,在kafka-server-start.sh脚本中增加如下配置(可在最上面):

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_server_jaas.conf"

在kafka-console-consumer.sh和kafka-console-producer.sh中添加:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"

启动zookeeper和kafka:

bin/zookeeper-server-start.sh config/zookeeper.properties & (&代表后台运行)

bin/kafka-server-start.sh config/server.properties &

输入命令,启动生产者:

bin/kafka-console-producer.sh --broker-list 10.100.17.79:9092 --topic test --producer.config config/producer.properties

输入命令,启动消费者

bin/kafka-console-consumer.sh --bootstrap-server 10.100.17.79:9092 --topic test --from-beginning --consumer.config config/consumer.properties

如果消费者启动不了或者无法消费指定topic,尝试设置所使用用户的组权限(当前使用用户admin为超级用户,不需要配置权限):

利用kafka-acls.sh为topic设置ACL:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal  User:admin  --group test-consumer-group --topic test

注意,如果admin要作为消费端连接alice-topic的话,必须对其使用的group(test-consumer-group)也赋权(group 在consumer.properties中有默认配置group.id)

权限操作例子:

增加权限:

# 为用户 alice 在 test(topic)上添加读写的权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:alice --operation Read --operation Write --topic test

# 对于 topic 为 test 的消息队列,拒绝来自 ip 为192.168.1.100账户为 zhangsan 进行 read 操作,其他用户都允许

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:* --allow-host * --deny-principal User:zhangsan

--deny-host 192.168.1.100 --operation Read --topic test

# 为 zhangsan 和 alice 添加all,以允许来自 ip 为192.168.1.100或者192.168.1.101的读写请求

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:zhangsan --allow-principal User:alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test

获取权限列表:

# 列出 topic 为 test 的所有权限账户

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --list --topic test

移除权限:

# 移除 acl

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --remove --allow-principal User:zhangsan --allow-principal User:Alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test

以上完成Kafka的SASL_PLAINTEXT认证。

多节点zookeeper下认证:

增加配置文件:

touch kafka_zoo_jaas.conf

配置如下:

ZKServer{

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin"

user_admin="admin";

};

配置应用名称为ZKServer,zookeeper默认使用的JAAS应用名称是Server(或者zookeeper)。

设置的环境变量其实只是传入到JVM的参数。这里设置的环境变量是KAFKA_OPTS。修改zookeeper的启动脚本zookeeper-server-start.sh如下:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf  -Dzookeeper.sasl.serverconfig=ZKServer"

如果配置使用默认名称,则只需要添加:

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf“”

修改zookeeper.properties配置文件,增加配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

以上配置完成后可启动尝试多节点zookeeper身份认证

kafka sasl java_Kafka安装及开启SASL_PLAINTEXT认证(用户名和密码认证)相关推荐

  1. kafka sasl java_Kafka 集群配置SASL+ACL

    ** Kafka 集群配置SASL+ACL 测试环境:** 系统: CentOS 6.5 x86_64 JDK : java version 1.8.0_121 kafka: kafka_2.11-1 ...

  2. kafka sasl java_kafka 添加SASL鉴权

    kafka 版本信息:kafka_2.12-2.3.0 使用kafka自带的zookeeper启动 bin/zookeeper-server-start.sh config/zookeeper.pro ...

  3. 安装windows服务提示输入用户名和密码

    在安装 windows服务的时候提示输入用户名和密码原因是权限问题 在serviceProcessInstaller1右键属性 Account 改为LocalService 在次安装成功,在服务列表中 ...

  4. mysql新密码认证_MySQL原生密码认证

    基于密码的身份认证包括了两个部分: - 服务器端认证信息的存储 - 密码的认证过程 基于密码的身份认证有一个原则:仅使用人知道密码.密码不能被存储在认证服务器中,在认证过程中也不能通过网络明文传输.因 ...

  5. android dropbear 密码,dropbear key认证,禁止密码认证

    dropbear 挺好的sshd 客户端,占用资源小,对于我这种资源控,是再合适不过的了.不过dropbear的文档实在是少.自己琢磨了下key认证.好禁止密码认证.共享给大家 key 认证和open ...

  6. kafka sasl java_Kafka SASL 安全认证

    java client 中添加 SASL 设置信息: Java client consumer properties配置.png 注意 sasl.jaas.config 配置中的分号必不可少. pac ...

  7. EMQX配置 用户名和密码认证方式连接

    1, 找到mqtt 的 /etc/plugins/ 文件夹下的emqx_auth_mnesia.conf 文件 2 ,vim打开编辑该文件 根据例子添加账号密码 并保存 auth.user.1.use ...

  8. kafak集群部署配置,开启SASL_PLAINTEXT认证以及acl权限控制

    SASL_PLAINTEXT认证 本人认为就是consumer连接broker开启了用户名,密码认证 acl权限控制 就是指针对用户 配置拥有哪些操作权限,如 topic的读,写,group的读,to ...

  9. 大数据Hadoop之——Zookeeper鉴权认证(Kerberos认证+账号密码认证)

    文章目录 一.Zookeeper概述与安装 二.Zookeeper Kerberos 鉴权认证 1)Kerberos安装 2)创建用户并生成keytab鉴权文件(前期准备) 3)独立zookeeper ...

最新文章

  1. CSS之定位(定位/相对定位)
  2. 一致性哈希算法----- 解决memecache 服务器扩容后的数据丢失。
  3. 怎么学python-新手如何自学python课程?
  4. [BTS]使用BizTalk开发应用系统,就是这么简单!
  5. wchar_t * 与 char * 互相转换小记
  6. 三角形描边css,[CSS] tips带有描边的小箭头
  7. mysql dump 1017_MySQL数据库导出 - Can't Wait Any Longer - OSCHINA - 中文开源技术交流社区...
  8. 7-21 求特殊方程的正整数解 (15 分)
  9. Windows Azure Cloud Service (25) 使用Startup注册COM组件(下)
  10. c语言随机抽取函数,Excel随机取值,给你三种解决方案
  11. RK987A键盘蓝牙连接电脑
  12. Android源码目录结构
  13. 古琴【A1】基本指法
  14. streaming mr
  15. ibm服务器进去阵列卡状态,IBM阵列卡介绍和服务器对阵列卡的支持情况
  16. 查询局域网内在线电脑IP地址
  17. 使用jQuery来锁定HTML表格中的首行和首列
  18. Adobe acrobat修改批注名字
  19. LocalDate、LocalTime、LocalDateTime使用与区别
  20. vue2与vue3的区别

热门文章

  1. 一个简单的acm竞赛题
  2. 计组--CISC和RISC特点和区别
  3. 什么是Receptive Field
  4. php api开发书籍,概述 · ThinkPHP API开发入门 · 看云
  5. 图解Topo拓扑排序
  6. 小师妹教我如何轻松拿下BAT、网易、蘑菇街的offer
  7. kb mac压缩图片大小_Mac系统怎么压缩图片
  8. 软件工程复习笔记 类图
  9. 简谈五线制交流道岔控制电路故障的处理方法【铁路信号技术专栏】——转自微信公众号高速铁路信号技术交流
  10. 手表频率测试软件,石英钟表测试仪是一款测量石英钟的灵活的工具