** Kafka 集群配置SASL+ACL

测试环境:**

系统: CentOS 6.5 x86_64

JDK : java version 1.8.0_121

kafka: kafka_2.11-1.0.0.tgz

zookeeper: 3.4.5

ip: 192.168.49.161 (我们这里在一台机上部署整套环境)

kafka 名词解析:

Broker: Kafka 集群包含一个或多个服务器,这种服务器被称为broker

Topic: 每条发布到Kafka 集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic 的消息

分开存储,逻辑上一个Topic 的消息虽然保存于一个或多个broker 上但用户只需指定消息的Topic 即可生产或

消费数据而不必关心数据存于何处)

Partition: Parition 是物理上的概念,每个Topic 包含一个或多个Partition

Producer: 负责发布消息到Kafka broker 即生产者

Consumer: 消息消费者,向Kafka broker 读取消息的客户端即消费者

Consumer Group:每个Consumer 属于一个特定的Consumer Group(可为每个Consumer 指定group

name,若不指定group name 则属于默认的group)

Kafka 拓扑结构(借用别人的图)

一个典型的Kafka 集群中包含若干Producer(可以是web 前端产生的Page View,或者是服务器日志,系统 CPU、Memory 等),若干broker(Kafka 支持水平扩展,一般broker 数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper 集群。Kafka 通过Zookeeper 管理集群配置,选举leader,以及在Consumer Group 发生变化时进行rebalance。Producer 使用push 模式将消息发布到broker,Consumer 使用pull 模式从broker 订阅并消费消息

kafka 集群部署

一 Zookeeper 集群部署(这里部署的伪集群)

zookeeper 部署比较简单,解压修改配置文件后即可使用, 默认的配置文件为zoo_sample.cfg 这里我们直接新建一个zoo.cfg 的文件内容如下:

各参数意义这里不作详细解释,想了解可以自行参考官网

tickTime=2000

initLimit=5

syncLimit=2

dataDir=/opt/zook/zookeeper1/data //指定数据路径

dataLogDir=/opt/zook/zookeeper1/logs //指定日志路径

clientPort=2181 //监听端口

server.1=127.0.0.1:7771:7772

server.2=127.0.0.1:7771:7772

server.3=127.0.0.1:7771:7772

2. 在zookeeper/data 下创建一个名为myid 的文件,并在文件内输入数字1

3. 集群部署则将上述配置的单节点再复制两份,并将配置文件中

```

dataDir , dataLogDir 两个参数根据实际路径修改

clientPort 分别配为2182,2183

myid 分别配置为2 3

```

4. 启动集群

二 Kafka 集群部署

由于集群最终是对外部网络开放的,出于安全考虑本文采用的是SASL+ACL 控制和授权

1. broker 配置

1.1. 将配置server.properties 复制2 份并分别重命名成server-1.properties server-2.properties server-3.properties 修改server.properties 配置:(请根据实际环境填写)

broker.id = 1 //另外两个节点分别为2 3

host.name=192.168.49.161

log.dirs=/tmp/kafka-logs-1 //另外两个节点分别为kafka-logs-2 kafka-logs-3 可以不

放在/tmp 下,如果放在其他地方必须注意目录权限

zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183

port=9092 //另外两个节点分别为9093 9094 也可以自定义

listeners=SASL_PLAINTEXT://192.168.49.161:9092 //另外两个节点分别为9093 9094

1.2 要配置SASL 和ACL,需要在broker 端进行两个方面的设置。

首先是创建包含所有认证用户信息的JAAS 文件,本例中我们有admin qjld 两个用户,其中admin 为管理员用于broker 集群间的认证, qjld 为远程应用的认证用户, 新增认证信息文件

kafka_server_jaas.conf 内容为:

KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="admin"

password="admin-mgr998778123"

user_admin="admin-mgr998778123"

user_qjld="123456";

};

本例中路径为: /opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf , 我们需要将配置文件中的内容传递给JVM,因此需要修改/opt/kafka_2.11-1.0.0/bin/kafka-server-start.sh 脚本文件如下:

在exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" 之前一行添加export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf" 然后保存退出。

其次我们要在所有broker 配置文件(server-x.properties)中增加:

#ACL 入口类

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

#SASL_PLAINTEXT

sasl.enabled.mechanisms=PLAIN

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.mechanism.inter.broker.protocol=PLAIN

allow.everyone.if.no.acl.found=true

#设置admin 为超级用户

super.users=User:admin

现在我们可以启动broker 测试了

bin/kafka-server-start.sh config/server-1.properties &

bin/kafka-server-start.sh config/server-2.properties &

bin/kafka-server-start.sh config/server-3.properties &

启动成功,此时broker 可以接收已认证的客户端连接了,下面我们进行客户端配置

//创建一个topic 名为test

bin/kafka-topics.sh--create --zookeeper 192.168.49.161:2181,192.168.49.161:2182, 192.168.49.161:2183 --replication-factor 1 --partitions 1 --topic test

//查看topic 是否创建成功

bin/kafka-topics.sh --list --zookeeper 192.168.49.161:2181

2. Client 配置 (用于使用kafka命令连接)

2.1 新增一个配置文件如:/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf,内容为:

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username="qjld"

password="123456";

};

同理我们也要将配置文件内容传递给JVM, 因此需要修改

/opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh 脚本文件如下:在exec $base_dir/kafka-run-class.sh

$EXTRA_ARGS kafka.Kafka "$@" 之前一行添加export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"

同样的方法修改kafka-console-consumer.sh

2.2 需要在config/consumer.properties 和config/producer.properties 配置文件中追加:

security.protocol=SASL_PLAINTEXT

sasl.mechanism=PLAIN

这两行配置, 完成后再试运行:

bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test \

--producer.config config/producer.properties

bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test

--from-beginning --consumer.config config/consumer.properties

接下来测试消息的写入和读取, 开启两个终端分别输入下列

bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test //消息写入

bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test --from-beginning 消息读取

如果报错信息如下:

WARN Bootstrap broker 192.168.49.161:9092 disconnected (org.apache.kafka.clients.NetworkClient)

则说明配置的security 已生效, 要想普通用户能读写消息,需要配置ACL

2.3 配置ACL

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties

zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183 --add

--allow-principal User:qjld --operation All --topic test

```这是为qjld 这个用户配置所有权限,如果有更细的要求则可参考网上的,如:Read Write 等只需要将--operation all 改为--operation Read 多种则在其后面加一个-- operation write 即可

**三 测试**

这里我选用python2.7 编写的脚本进行测试

先安装pip27 install kafka-python

![](https://s4.51cto.com/images/blog/201801/15/5b755d258b838b83d79753ad5aad3d44.png?x-oss-process=image/watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=)

kafka sasl java_Kafka 集群配置SASL+ACL相关推荐

  1. Kafka 如何给集群配置Scram账户认证

    前言 很早之前的一篇博客[Kafka+ Centos7服务器集群详细安装教程] 详细的说了下一个真正的集群应该如何搭建Kafka环境,由于当时的需求只是能够使用Kafka服务,因此并没有做别的什么配置 ...

  2. kafka简述与集群配置

    一.kafka简述 1.简介 kafka是一个高吞吐的分布式消息队列系统.特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据.消息列队常见场景:系统之间解耦合.峰 ...

  3. 【kafka】Kafka 集群配置SASL+ACL

    1.概述 转载:https://www.cnblogs.com/xiao987334176/p/10110389.html 想根据这个做个试验,但是现在不行试试.以后用的时候试试 一.简介 在Kafk ...

  4. Kafka与zookeeper集群配置安装(Zk集群至少三台)

    1.上传解包 tar -zxvf kafka_2.12-2.7.1.tgz -C /usr/hadoop/ 2.Kafka配置 主要的配置项: broker.id=1log.dirs=/usr/had ...

  5. kafka集群开启sasl认证

    kafka集群开启sasl认证 sasl认证 sasl 是扩展C/S模式验证能力的一种认证机制.它可以规范客户端和服务端传输应答和传输内容编码,简而言之sasl决定了认证的规则,即客户端如何存储身份证 ...

  6. ELK5.3+Kafka集群配置

    [一]资源准备 # 3台4C*8G, 安装Zookeeper.Kafka.Logstash--Broker(input: filebeat; output: Kafka) 10.101.2.23 10 ...

  7. Web基础配置篇(十四): Kafka单机、集群的安装配置及使用

    Web基础配置篇(十四): Kafka单机.集群的安装配置及使用 一.概述 Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传 ...

  8. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  9. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

最新文章

  1. java线程学习之notify方法和notifyAll方法
  2. jQuery滚动指定位置
  3. rdcl 报表设置不分页
  4. 《About Multi-Touch(多点触摸是个什么东西?)》:基于光学原理的多点触摸技术全解析...
  5. android10分区镜像,分区和映像  |  Android 开源项目  |  Android Open Source Project
  6. mysql-增删改(DML)
  7. publiccms实现首页菜单栏下拉的方法
  8. CSS Scrollbar (滚动条)
  9. 机器学习回归问题解答
  10. “重疾险”和“重大疾病医疗保险”的区别
  11. Java 杨辉三角的简单实现
  12. mysql数据库教程官网_数据库MySQL官方推荐教程-MySQL入门到删库
  13. mappedBy的作用
  14. 创建第一个Android app项目
  15. scrapy-redis爬取豆瓣电影短评,使用词云wordcloud展示
  16. java ip搜索功能_局域网ip搜索工具扫描设备的简单Java实现
  17. siteapp–让独立博客变成手机app
  18. Synchronized和Reentrantlock的区别
  19. 【大数据技术】实验3:熟悉常用的Hive操作
  20. 10027Longest Ordered Subsequence Extention

热门文章

  1. Python-10.1-面向对象
  2. 计组--CISC和RISC特点和区别
  3. 全天下最经典的句子,2013重现!
  4. lightGBM使用
  5. Bayesian information criterion和 Akaike information criterion中的模型参数个数(自由度)计算 | 以高斯混合分布为例
  6. sort函数和sorted函数的异同
  7. 前段时间的世界互联网大会
  8. ppt如何替换其他mo ban_PPT如何做视觉化的数据图表?
  9. C语言的数据类型大全,整型数据在内存中的存储方式
  10. 前端工程师面试题详解(二)