1.概述

转载:https://www.cnblogs.com/xiao987334176/p/10110389.html 想根据这个做个试验,但是现在不行试试。以后用的时候试试

一、简介
在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。本文主要使用SASL+ACL二、技术关键点
配置文件
修改broker启动所需的server.properties文件,你至少需要配置(或修改)以下这些参数:复制代码
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin
复制代码其他参数讲解,请参考链接:https://www.cnblogs.com/xiao987334176/p/10065844.html这里主要讲解几个重点参数默认情况下,如果资源R没有关联acl,除了超级用户,没有用户允许访问。如果你想改变这种方式你可以做如下配置allow.everyone.if.no.acl.found=true什么意思呢?上面的配置已经启动了acl,除了超级用户之外,其他用户无法访问。那么问题就来了,在kafka集群中,其它节点需要同步数据,需要相互访问。它默认会使用ANONYMOUS的用户名连接集群。在这种情况下,启动kafka集群,必然失败!所以这个参数一定要配置才行!listeners=SASL_PLAINTEXT://:9092这个参数,表示kafka监听的地址。此参数必须要配置,默认是注释掉的。默认会使用listeners=PLAINTEXT://:9092,但是我现在开启了SASL,必须使用SASL协议连接才行。//:9092 这里虽然没有写IP地址,根据官方解释,它会监听所有IP。注意:这里只能是IP地址,不能是域名。否则启动时,会提示无法绑定IP。advertised.listeners 这个参数,表示外部的连接地址。这里可以写域名,也可以写IP地址。建议使用域名,为什么呢?因为IP可能会变动,但是主机名是不会变动的。所以在java代码里面写死,就可以了!注意:必须是SASL协议才行!super.users=User:admin  表示启动超级用户admin,注意:此用户名不允许更改,否则使用生产模式时,会有异常!启动脚本
bin/kafka-server-start.sh 这个是kafka的启动脚本,要使用ACL,需要增加一个参数才行。有2种方法修改,这里分别介绍一下:1. 增加环境变量KAFKA_OPTS(推荐)
先来看一下,默认的bin/kafka-server-start.sh的最后一行exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
只需要在最后一行的上面一行,添加一个环境变量即可export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"2. 增加参数-Djava.security.auth.login.config
直接将最后一行修改为exec $base_dir/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf $EXTRA_ARGS kafka.Kafka "$@"JAAS文件
kafka_cluster_jaas.conf复制代码
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_reader="123456"
user_writer="123456";
};
复制代码这个文件,是专门用来做认证的。用户名和密码的格式如下:user_用户名="密码"注意:对于超级用户,这几行是固定的username="admin"
password="123456"
user_admin="admin"
这里指定的是admin用户密码为123456,密码可自行更改。下面的,才是普通用户。最后一个用户,要有一个分号才行!三、正式部署
环境介绍
本文采用的环境,参考以下链接https://www.cnblogs.com/xiao987334176/p/10088497.html#autoid-3-0-0使用了3台zookeeper和5台kafka。都是在一台服务器上面运行的!其中zookeeper的镜像,不需要变动,直接启动即可。但是kafka的镜像,需要重新构建,请看下面的内容。创建镜像
创建空目录mkdir /opt/kafka_cluster_acldockerfile复制代码
FROM ubuntu:16.04
# 修改更新源为阿里云
ADD sources.list /etc/apt/sources.list
ADD kafka_2.12-2.1.0.tgz /
ADD kafka_cluster_jaas.conf /
# 安装jdk
RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean allEXPOSE 9092
# 添加启动脚本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]
复制代码kafka_cluster_jaas.conf复制代码
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_reader="123456"
user_writer="123456";
};
复制代码run.shView Code注意:由于没有DNS,这里临时添加了5条hosts记录。5台kafka之间,必须要相互连通,否则会报错WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)sources.list复制代码
deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial maindeb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates maindeb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universedeb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe
复制代码此时,目录结构如下:复制代码
./
├── dockerfile
├── kafka_2.12-2.1.0.tgz
├── kafka_cluster_jaas.conf
├── run.sh
└── sources.list
复制代码生成镜像docker build -t kafka_cluster_acl /opt/kafka_cluster_acl启动镜像
请确保已经启动了3台zookeeper的镜像!第一个kafka节点docker run -it -p 9092:9092 -e broker_id=1 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-1.default.svc.cluster.local  --network br1 --ip=172.168.0.5 kafka_cluster_acl第二个kafka节点docker run -it -p 9093:9092 -e broker_id=2 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-2.default.svc.cluster.local --network br1 --ip=172.168.0.6 kafka_cluster_acl第三个kafka节点docker run -it -p 9094:9092 -e broker_id=3 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-3.default.svc.cluster.local --network br1 --ip=172.168.0.7 kafka_cluster_acl第四个kafka节点docker run -it -p 9095:9092 -e broker_id=4 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-4.default.svc.cluster.local --network br1 --ip=172.168.0.8 kafka_cluster_acl第五个kafka节点docker run -it -p 9096:9092 -e broker_id=5 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-5.default.svc.cluster.local --network br1 --ip=172.168.0.9 kafka_cluster_acl客户端测试
shell脚本客户端
先来查看docker进程复制代码
root@jqb-node128:~# docker ps
CONTAINER ID        IMAGE               COMMAND             CREATED              STATUS              PORTS                    NAMES
a5ff3c8f5c2a        kafka_cluster_acl   "/run.sh"           About a minute ago   Up About a minute   0.0.0.0:9096->9092/tcp   gifted_jones
36a4d94054b5        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9095->9092/tcp   modest_khorana
f614d734ac8b        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9094->9092/tcp   tender_kare
29ef9a2edd08        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9093->9092/tcp   reverent_jepsen
d9cd45c62e86        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9092->9092/tcp   silly_mcclintock
69dba560bc09        zookeeper_cluster   "/run.sh"           4 minutes ago        Up 4 minutes        0.0.0.0:2183->2181/tcp   confident_fermat
d73a01e76949        zookeeper_cluster   "/run.sh"           4 minutes ago        Up 4 minutes        0.0.0.0:2182->2181/tcp   admiring_snyder
7ccab68252e7        zookeeper_cluster   "/run.sh"           4 minutes ago        Up 4 minutes        0.0.0.0:2181->2181/tcp   gifted_wilson
复制代码确保已经运行了5个kafka和3个zk随便进入一个kafka容器root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bash
root@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/新增一个配置文件 kafka_client_jaas.conf root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# apt-get install -y vim
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi config/kafka_client_jaas.conf内容如下:KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"
password="123456";
};同理我们也要将配置文件内容传递给JVM, 因此需要修改。生产者
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi bin/kafka-console-producer.sh
最后一行的上面,添加 KAFKA_OPTS 变量if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"修改生产者配置文件,最后一行追加2行内容security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN使用echo 追加root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/producer.properties
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/producer.properties消费者
修改生产者配置文件,使用echo追加root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/consumer.properties
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/consumer.properties编辑测试脚本root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi bin/kafka-console-consumer.sh
最后一行的上面,添加 KAFKA_OPTS 变量if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"测试生产者
目前还没有topic,先来创建一个topicroot@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-topics.sh --create --zookeeper 172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --topic test --partitions 1 --replication-factor 1Created topic "test".
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# 进入生产者模式,指定kafka的服务器为第一个kafka。当然,只要是5个kafka中的任意一个即可!输入消息 fdsa,回车复制代码
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties
>fdsa
[2018-12-17 08:45:15,455] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
[2018-12-17 08:45:15,457] ERROR [Producer clientId=console-producer] Connection to node -1 (d9cd45c62e86.br1/172.168.0.5:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)
复制代码会出现报错,则说明配置的security 已生效, 要想普通用户能读写消息,需要配置ACL配置ACL
kafka的ACL规则,是存储在zookeeper中的,只需要连接zookeeper即可!topic权限
允许writer用户有所有权限,访问所有topic--operation All 表示所有权限,--topic=* 表示所有topic复制代码
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All --topic=*
Adding ACLs for resource `Topic:LITERAL:*`: User:writer has Allow permission for operations: All from hosts: * Current ACLs for resource `Topic:LITERAL:*`: User:writer has Allow permission for operations: All from hosts: *
复制代码组权限
复制代码
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All -group=*
Adding ACLs for resource `Group:LITERAL:*`: User:writer has Allow permission for operations: All from hosts: * Current ACLs for resource `Group:LITERAL:*`: User:writer has Allow permission for operations: All from hosts: *
复制代码再次测试
root@e0bb740ac0ce:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties
>123>注意:在config/server.properties 文件中,设置了advertised.listeners=SASL_PLAINTEXT://kafka-1.default.svc.cluster.local:9092
所以连接地址,必须是指定域名才可以!再开一个窗口,连接同样的容器root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bash
root@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/启动消费者模式
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning  --consumer.config config/consumer.properties
123收到123表示成功了!python客户端测试
由于真实主机无法直接连接到网桥的地址172.168.0.5,那么因此代码需要在创建空目录mkdir /opt/py_test放2个文件sources.listView Codeproduer_consumer_acl_test.pyView Code此时目录结构如下:./
├── produer_consumer_acl_test.py
└── sources.list进入容器,更新ubuntu更新源root@jqb-node128:/opt/py_test# docker run -it -v /opt/py_test:/mnt --network br1 --ip=172.168.0.10 ubuntu:16.04
root@064f2f97aad2:/# cp /mnt/sources.list /etc/apt/
root@064f2f97aad2:/# apt-get update  安装python3-piproot@064f2f97aad2:/# apt-get install -y python3-pip安装kafka模块root@064f2f97aad2:/# pip3 install kafka添加hosts记录echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts执行Python文件复制代码
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py
执行生产者
执行消费者
等待结果....
b'hello honey'
('kafka验证消息成功,花费时间', '28.59 秒')
复制代码注意:第一次执行时,会非常慢。等待30秒,如果没有输出hello honey。终止掉,再次执行。反复5次。就可以了!之后再次执行几次,就会很快了!复制代码
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py
执行生产者
执行消费者
等待结果....
b'hello honey'
('kafka验证消息成功,花费时间', '5.37 秒')
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py
执行生产者
执行消费者
等待结果....
b'hello honey'
('kafka验证消息成功,花费时间', '0.43 秒')
复制代码为啥,前面几次会很慢。之后就很快了,什么原因,我也不知道!总之,只要经历过慢的阶段,之后就很快了!

【kafka】Kafka 集群配置SASL+ACL相关推荐

  1. kafka sasl java_Kafka 集群配置SASL+ACL

    ** Kafka 集群配置SASL+ACL 测试环境:** 系统: CentOS 6.5 x86_64 JDK : java version 1.8.0_121 kafka: kafka_2.11-1 ...

  2. kafka分布式集群 SASL_PLAINTEXT 模式的加密部署

    大家好,我是Linux运维工程师 Linke .技术过硬,很少挖坑~ 此刻时间紧迫,来不及扯淡了,直接进入正题. 一.kafka依赖于 zookeeper 的选举机制 二.分布式工作原理 日志的分区p ...

  3. kafka集群开启sasl认证

    kafka集群开启sasl认证 sasl认证 sasl 是扩展C/S模式验证能力的一种认证机制.它可以规范客户端和服务端传输应答和传输内容编码,简而言之sasl决定了认证的规则,即客户端如何存储身份证 ...

  4. Kafka 如何给集群配置Scram账户认证

    前言 很早之前的一篇博客[Kafka+ Centos7服务器集群详细安装教程] 详细的说了下一个真正的集群应该如何搭建Kafka环境,由于当时的需求只是能够使用Kafka服务,因此并没有做别的什么配置 ...

  5. ELK5.3+Kafka集群配置

    [一]资源准备 # 3台4C*8G, 安装Zookeeper.Kafka.Logstash--Broker(input: filebeat; output: Kafka) 10.101.2.23 10 ...

  6. kafka简述与集群配置

    一.kafka简述 1.简介 kafka是一个高吞吐的分布式消息队列系统.特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据.消息列队常见场景:系统之间解耦合.峰 ...

  7. 2021年大数据Kafka(三):❤️Kafka的集群搭建以及shell启动命令脚本编写❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的集群搭建以及shell启动命令脚本编写 一.搭建 ...

  8. 单台主机 kafka + zookeeper 集群搭建

    文章转自: https://www.jianshu.com/p/dc4770fc34b6 在单台主机上搭建kafka + zookeeper 集群. zookeeper 集群搭建 kafka是通过zo ...

  9. 新闻网大数据实时分析可视化系统项目——7、Kafka分布式集群部署

    Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spa ...

最新文章

  1. 华山论剑之浅谈iOS的生产线 工厂模式
  2. 20个开源项目托管站点推荐
  3. linux wc -l 对io,linux设备驱动归纳总结(五):2.操作硬件——IO内存
  4. xa协议 mysql_分布式事务XA实现数据一致性的协议与原理
  5. 主板螺丝是机箱配还是主板配_要配新电脑,A520主板和B450主板哪一个更加值得购买?...
  6. 服务器的可维护性,可靠性和可维护性
  7. 作者:张岩,男,北京大学教授、博士生导师。
  8. pdf常用字体包 -baijiahao_如何给PDF文档添加和插入页码?
  9. [ActionScript 3.0] 获取TextFiled字符边框
  10. visio防火墙可以连接什么_画流程图,就是要用Visio软件!
  11. 【李宏毅2020 ML/DL】P80 Generative Adversarial Network | Feature Extraction
  12. Broken Keyboard(悲剧文本)
  13. java中float double利用BigDecimal运算
  14. 枚举算法,常用的三种
  15. 如何使用AForge调用本机摄像头
  16. 成本更低、更优观看体验——自研S265编解码器解析
  17. SQL Server 获取2019年节假日列表(可用于Java、.Net系统实现)
  18. HTML之基础篇--陈小沫
  19. wxpython动态实时绘图_wxPython实现画图板
  20. PR/PO一锅粥,关键信息一图兜

热门文章

  1. 2月8日见!三星Galaxy S22系列新增樱花粉配色
  2. 外媒:苹果高管正拜访亚洲供应商,探讨苹果汽车生产事宜
  3. 卢伟冰暗示Redmi K40轻奢版:搭载联发科天玑1100
  4. 王小川:与龚宇相识14年 爱奇艺造假不可能
  5. 有望年前发布?魅族16s Pro Plus曝光:下半年旗舰担当
  6. 蔡崇信完成对布鲁克林篮网和巴克莱中心的全资收购
  7. 这就是华为Mate 30 Pro真机了,价格或许要高攀不起了?
  8. 下一代iPhone将变成“越南制造”?富士康已在越南买地
  9. 竟然不是马云!第一个使用支付宝的人找到了 :受赠终身钻石会员
  10. 荣耀20系列国行版正式发布 拍照全球第三 屏幕开孔全球最小