go kafka 配置SASL认证及实现SASL PLAIN认证功能
用户认证功能,是一个成熟组件不可或缺的功能。在0.9版本以前kafka是没有用户认证模块的(或者说只有SSL),好在kafka0.9版本以后逐渐发布了多种用户认证功能,弥补了这一缺陷(这里仅介绍SASL)。
本篇会介绍部署SASL/PLAIN认证功能的流程。最后再介绍对SASL/PLAIN功能进行二次开发。
kafka 2.x用户认证方式小结
需要先明确的一点是,用户认证和权限控制是两码事。用户认证是确认这个用户能否访问当前的系统,而权限控制是控制用户对当前系统中各种资源的访问权限。用户认证就是今天要讲的内容,而kafka的权限控制,则是对应bin/kafka-acls.sh
工具所提供的一系列功能,这里不详细展开。
标题特地说明kafka2.x是因为kafka2.0的时候推出一种新的用户认证方式,SASL/OAUTHBEARER,在此前的版本是不存在这个东西的。那么加上这个之后,kafka目前共有4种常见的认证方式。
- SASL/GSSAPI(kerberos):kafka0.9版本推出,即借助kerberos实现用户认证,如果公司恰好有kerberos环境,那么用这个是比较合适的。
- SASL/PLAIN:kafka0.10推出,非常简单,简单得有些鸡肋,不建议生产环境使用,除非对这个功能二次开发,这也是我后面要讲的。
- SASL/SCRAM:kafka0.10推出,全名Salted Challenge Response Authentication Mechanism,为解决SASL/PLAIN的不足而生,缺点可能是某些客户端并不支持这种方式认证登陆(使用比较复杂)。
- SASL/OAUTHBEARER:kafka2.0推出,实现较为复杂,目前业内应该较少实践。
其实除了上述四种用户认证功能之外,还有一个叫Delegation Token的东西。这个东西说一个轻量级的工具,是对现有SASL的一个补充,能够提高用户认证的性能(主要针对Kerberos的认证方式)。算是比较高级的用法,一般也用不到,所以也不会多介绍,有兴趣可以看这里Authentication using Delegation Tokens。
SASL/GSSAPI
如果已经有kerberos的环境,那么会比较适合使用这种方式,只需要让管理员分配好principal和对应的keytab,然后在配置中添加对应的选项就可以了。需要注意的是,一般采用这种方案的话,zookeeper也需要配置kerberos认证。
SASL/PLAIN
这种方式其实就是一个用户名/密码的认证方式,不过它有很多缺陷,比如用户名密码是存储在文件中,不能动态添加,明文等等!这些特性决定了它比较鸡肋,但好处是足够简单,这使得我们可以方便地对它进行二次开发。本篇文章后续会介绍SASL/PLAIN的部署方式和二次开发的例子(基于kafka2.x)。
SASL/SCRAM
针对PLAIN方式的不足而提供的另一种认证方式。这种方式的用户名/密码是存储中zookeeper的,因此能够支持动态添加用户。该种认证方式还会使用sha256或sha512对密码加密,安全性相对会高一些。
而且配置起来和SASL/PLAIN差不多同样简单,添加用户/密码的命令官网也有提供,个人比较推荐使用这种方式。不过有些客户端是不支持这个方式认证登陆的,比如python的kafka客户端,这点需要提前调研好。
具体的部署方法官网或网上有很多,这里不多介绍,贴下官网的Authentication using SASL/SCRAM。
SASL/OAUTHBEARER
SASL/OAUTHBEARER是基于OAUTH2.0的一个新的认证框架,这里先说下什么是OAUTH吧,引用维基百科。
OAuth是一个开放标准,允许用户让第三方应用访问该用户在某一网站上存储的私密的资源(如照片,视频,联系人列表),而无需将用户名和密码提供给第三方应用。而 OAUTH2.0算是OAUTH的一个加强版。
说白了,SASL/OAUTHBEARER就是一套让用户使用第三方认证工具认证的标准,通常是需要自己实现一些token认证和创建的接口,所以会比较繁琐。
详情可以通过这个kip了解KIP-255
说了这么多,接下来就说实战了,先介绍下如何配置SASL/PLAIN。
SASL/PLAIN实例(配置及客户端)
Kafka添加SASL_PLAIN安全认证
1,配置修改
2,添加kafka_server_jaas.conf文件 server端的认证文件,放置在/mnt/hdb/ops-ng/kafka/config/中。
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
内容解释:配置文件命名为:kafka_server_jaas.conf,放置在/mnt/hdb/ops-ng/kafka/config/。
使用user_来定义多个用户,供客户端程序(生产者、消费者程序)认证使用,可以定义多个。
上例我定义了两个用户,一个是admin,一个是alice,等号后面是对应用户的密码(如user_admin定义了用户名为admin,密码为admin-secret的用户)。
官方说明:
大概意思是:username="admin"和password="admin-secret"是代理之间使用的用户名和密码,即多个kafka集群使用的用户名和密码,而user_userName则是连接端使用的用户名密码。
3,创建client认证文件kafka_client_jaas.conf,此文件是后面console的生产者和消费者使用,放置在/mnt/hdb/ops-ng/kafka/config/中。(可选,如果是程序是生产者或者消费者,可以不用配置)
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice-secret";
};
4,修改启动脚本
vi bin/kafka-server-start.sh
添加一行:export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/hdb/ops-ng/kafka/config/kafka_server_jaas.conf"
5,console 控制台生产、消费相关配置 (可选)
- 修改/mnt/hdb/ops-ng/kafka/config/producer.properties,在配置最后加入以下两行内容:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- 修改/mnt/hdb/ops-ng/kafka/config/consumer.properties,要添加的内容和producer的内容一样:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- 添加kafka-console-producer.sh认证文件路径,后面启动生产者测试时使用:
[root@kafka1 ~]# cat /mnt/hdb/ops-ng/kafka/bin/kafka-console-producer.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/hdb/ops-ng/kafka/config/kafka_client_jaas.conf"
控制台生产者命令:
bin/kafka-console-producer.sh --broker-list 192.168.1.20:9092 --topic read
--producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
- 添加kafka-console-consumer.sh认证文件路径,后面启动消费者测试时使用:
[root@kafka1 ~]# cat /mnt/hdb/ops-ng/kafka/bin/kafka-console-consumer.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/hdb/ops-ng/kafka/config/kafka_client_jaas.conf"
控制台消费者命令:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.203:9092 --topic test-topicaaa --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
一般能够发送数据就说明部署完成了~
代码实现SASL/PLAIN认证
github.com/segmentio/kafka-go 消费数据 未指定组
package kafkaimport ("context""github.com/segmentio/kafka-go""github.com/segmentio/kafka-go/sasl""github.com/segmentio/kafka-go/sasl/plain""github.com/segmentio/kafka-go/sasl/scram"ktesting "github.com/segmentio/kafka-go/testing""github.com/wonderivan/logger""testing""time" )const (saslTestConnect = "172.19.1.103:9092" // connect to sasl listenersaslTestTopic = "test-topic" // this topic is guaranteed to exist.username = "alice"password = "alice-secret"version = "0.10.2.0" )func Test_SASL_Read(t *testing.T) {//建立连接d := &kafka.Dialer{SASLMechanism: plain.Mechanism{Username: username,Password: password,},}//消费数据Readers := make([]*kafka.Reader, 0)for _, topic := range []string{saslTestTopic} {rc := kafka.ReaderConfig{Brokers: []string{saslTestConnect},Topic: topic,Partition: 0,MinBytes: 10e3, // 10KBMaxBytes: 10e6, // 10MB}rc.Dialer = dreader := kafka.NewReader(rc)// beginning设置 通过获取当前log值指定消费位置//if !consumer.beginning {// lag, _ := reader.ReadLag(context.Background())// reader.SetOffset(lag) //从当前部开始读取//}Readers = append(Readers, reader)}for {for _, reader := range Readers {m, err := reader.ReadMessage(context.Background())if err == nil {logger.Info(m.Value)} else {logger.Error("read kafka error:%v", err)//lag, _ := reader.ReadLag(context.Background())//logger.Error("read %d close:lag:%d", idx, lag)reader.Close()}}}}
github.com/Shopify/sarama 实现组消费
package kafkaimport ("fmt""github.com/Shopify/sarama"cluster "github.com/bsm/sarama-cluster""github.com/wonderivan/logger""testing""time" )func Test_SASL_Group(t *testing.T) {config := cluster.NewConfig()config.Group.Return.Notifications = trueconfig.Net.ReadTimeout = 10 * time.Second //time.Millisecondconfig.Net.SASL.Enable = trueconfig.Net.SASL.User = usernameconfig.Net.SASL.Password = passwordconfig.Net.SASL.Version = sarama.SASLHandshakeV1 //version//SASLHandshakeV1config.Consumer.Offsets.CommitInterval = 1 * time.Secondconfig.Consumer.Offsets.Initial = sarama.OffsetNewest //初始从最新的offset开始/*// beginning设置 通过获取当前log值指定消费位置if consumer.beginning {config.Consumer.Offsets.Initial = sarama.OffsetOldest}*/c, err := cluster.NewConsumer([]string{saslTestConnect}, "filebeat01", []string{"testtopic"}, config)if err != nil {logger.Info("连接失败:\n %v", err)return} else {logger.Info("连接成功")}defer c.Close()//接受错误消息go func(c *cluster.Consumer) {errors := c.Errors()noti := c.Notifications()for {select {case <-errors:case <-noti:}}}(c)for m := range c.Messages() {fmt.Println("消费:", m.Value)c.MarkOffset(m, "") //MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}
github.com/segmentio/kafka-go 生产数据
package channelimport ("context""fs.com/ezlogic/utils""github.com/segmentio/kafka-go/sasl/plain""sync""time""github.com/segmentio/kafka-go""github.com/wonderivan/logger" )type KafkaProducer struct {writer *kafka.Writer }type Producers struct {Write sync.Map //make(map[string]*channel.KafkaProducer) }func NewProducer(topic, username, password string, brokers []string) *KafkaProducer {defer utils.DeferFunc("NewProducer", nil)var (conn *kafka.Conndialer *kafka.Dialererr error)if username == "" {conn, err = kafka.DialLeader(context.Background(), "tcp", brokers[0], topic, 0)} else {//sasldialer = &kafka.Dialer{SASLMechanism: plain.Mechanism{Username: username,Password: password},}// 读topic,如果topic不存在,则创建topic,因此后续可以正常写conn, err = dialer.DialLeader(context.Background(), "tcp", brokers[0], topic, 0)}if err != nil {logger.Painc("NewProducer err:", topic, brokers, "====", err)}conn.ReadPartitions(topic)conn.SetWriteDeadline(time.Now().Add(10 * time.Second))conn.Close()kp := &KafkaProducer{writer: kafka.NewWriter(kafka.WriterConfig{Brokers: brokers,Dialer: dialer,Topic: topic,Balancer: &kafka.LeastBytes{},Async: true,//0613 提高写性能BatchTimeout: 100 * time.Millisecond,BatchSize: 10000,}),}return kp }func (producer *KafkaProducer) Write(msg []byte) {if err := producer.writer.WriteMessages(context.Background(), kafka.Message{Value: msg,}); err != nil {logger.Error("producer write error:%v", err)} }
go kafka 配置SASL认证及实现SASL PLAIN认证功能相关推荐
- Kafka配置SASL/PLAIN认证
1.安装zk,kafka 2.配置server.properties security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inte ...
- 安装 kafka 配置 sasl 认证
一.安装kafka 1.安装jdk yum serach jdk # 查找jdk yum install java-latest-openjdk.x86_64 # 选择jdk安装,这里选择最新的 ...
- kafka配置SASL
适用于kafka_2.11-1.1.1版本 第1步 将kafka_client_jaas.conf/kafka_server_jaas.conf/kafka_zoo_jaas.conf三个文件放入ka ...
- kafka sasl java_Kafka安装及开启SASL_PLAINTEXT认证(用户名和密码认证)
前些日子要封装一个kafka的客户端驱动,配置了下kafka环境,发现配置复杂度完爆rabbitmq很多倍啊,而且发布订阅模式使用起来也很麻烦,可能就胜在分布式了吧. kafka需要java环境,自行 ...
- kafka SASL认证介绍及自定义SASL PLAIN认证功能
文章目录 kafka 2.x用户认证方式小结 SASL/PLAIN实例(配置及客户端) broker配置 客户端配置 自定义SASL/PLAIN认证(二次开发) kafka2新的callback接口介 ...
- Kafka配置SASL_SSL认证传输加密
Kafka配置SASL_SSL认证传输加密 一.SSL证书配置 1.生成证书 如我输入命令如下:依次是 密码-重输密码-名与姓-组织单位-组织名-城市-省份-国家两位代码-密码-重输密码,后面告警不用 ...
- Kafka配置动态SASL_SCRAM认证
Kafka配置动态SASL_SCRAM认证 1.启动Zookeeper和Kafka 2.创建SCRAM证书 3.维护SCRAM证书 3.1查看SCRAM证书 3.1删除SCRAM证书 4.服务端配置 ...
- Kafka配置安全认证
Kafka配置安全认证 提示:为了对数据的安全考虑,在对kafka进行读取数据时需要添加安全认证,在摸索了大量博主的博客后,自己终于把这安全认证给安排了,废话不多说,往下走. 文章目录 Kafka配置 ...
- [工作中爬过的坑] Kafka配置域名的三种难度
文章目录 1. 背景说明 2. 初级难度 - 无认证Kafka 3. 中级难度 - SASL/PLAIN认证Kafka 4. 有人捣乱的难度 - Ambari中SASL/PLAIN认证Kafka 我曾 ...
最新文章
- WIN7 64位系统下,右下角的声音和电源图标不见的解决办法
- hdu3786 找出直系亲属 水题
- 程序员如何跟领导提离职_如何跟领导谈加薪,做好这几点,成功谈加薪又不失风度...
- windows配置java运行环境
- make_heap(),push_heap(),pop_heap(),sort_heap()用法。
- ffmpeg 常用基本命令和ffmpeg处理RTMP流媒体的常用命令
- web实现数据交互的几种常见方式
- 伪集群zookeeper模式下codis的部署安装
- 解析WeNet云端推理部署代码
- WebDriverException: Cannot find firefox binary in PATH.的解决方法
- 第6章 面向方面编程
- c++多线程——基于锁和条件变量的前程安全队列
- python导入模块的常用方法_(9)python模块的定义、导入、优化,常用模块
- 微服务框架和工具大全
- VR线下体验店群雄并起,超级队长为何能靠IP突围?
- mysql centos_centos7mysql安装
- 结合百度地图城市编码的数据表
- dataframe两个表合并_python处理DataFrame数据的一些常用操作
- 广告行业计费模式专业术语
- Django计算机毕业设计餐饮管理系统(程序+LW)Python
热门文章
- 【zz】陈硕:当析构函数遇到多线程──C++ 中线程安全的对象回调
- flink SourceFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic
- c++11 regex
- 风寒感冒 风热感冒区别
- 终端模拟器 java_程序员必备之终端模拟器,让你的终端世界多一抹“颜色”
- php 微信朋友圈 更新时间,微信朋友圈可见天数怎么设置
- 活体检测——Oulu-NPU数据集
- Fewest Flops
- 关于win10无限重启的解决方案
- 干同一份工作,工资为什么比同事低很多?