记录:

zookeeper启动命令:

[root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh start
[root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh stop

kafka启动命令:

/data/program/kafka2.12/bin/kafka-server-start.sh /data/program/kafka2.12/config/server.properties

创建SCRAM证书

1)创建broker建通信用户:admin(在使用sasl之前必须先创建,否则启动报错)

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[password=admin-sec],
SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin

2)创建生产用户:producer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],
SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer

3)创建消费用户:consumer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],
SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consumer

SCRAM-SHA-256/SCRAM-SHA-512是对密码加密的算法,二者有其一即可

查看SCRAM证书

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name consumer
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer

服务端配置

1)创建JAAS文件

vi config/kafka_server_jaas.conf

内容:

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};

2)将JAAS配置文件位置作为JVM参数传递给每个Kafka Broker【bin/kafka-server-start.sh】添加

exec $base_dir/kafka-run-class.sh 
$EXTRA_ARGS -Djava.security.auth.login.config
=/home/test/kiki/kafka/ka/config/kafka_server_jaas.conf kafka.Kafka "$@"

操作:

vi bin/kafka-server-start.sh

修改最后一行

#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data/program/kafka2.12/config/kafka_server_jaas.conf kafka.Kafka "$@"

3)配置server.properties【config/server.properties】

#认证配置
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

当前测试kafka端口为8100

vi config/server.properties

#listeners=PLAINTEXT://:8100

listeners=SASL_PLAINTEXT://:8100
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

advertised.listeners=SASL_PLAINTEXT://183.56.218.28:8100

#ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

个人备注:

log.dirs=/data/program/kafka2.12/data

zookeeper.connect=183.56.218.28:2181

SCRAM-SHA-512与SCRAM-SHA-216可互相更改,看需要什么类型。PLAINTEXT为不需要认证

4)重启Kafka和Zookeeper

客户端配置

1)为我们创建的三个用户分别创建三个JAAS文件:分别命名为
kafka_client_scram_admin_jaas.conf
kafka_client_scram_producer_jaas.conf
kafka_client_scram_consumer_jaas.conf

vi bin/kafka_client_scram_admin_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-sec"; };

vi bin/kafka_client_scram_producer_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="prod-sec"; };

vi bin/kafka_client_scram_consumer_jaas.conf

KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec"; };

2)修改启动脚本引入JAAS文件:
生产者配置:
配置bin/kafka-console-producer.sh

exec $(dirname $0)/kafka-run-class.sh 
-Djava.security.auth.login.config
=/data/program/kafka2.12/config/kafka_client_scram_producer_jaas.conf

消费者配置:
配置bin/kafka-console-consumer.sh

exec $(dirname $0)/kafka-run-class.sh 
-Djava.security.auth.login.config
=/data/program/kafka2.12/config/kafka_client_scram_consumer_jaas.conf

3)配置consumer.properties和producer.properties,都要加入以下配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512

4)创建主题

[test@police ka]$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 2 --replication-factor 1

5)启动生产(ps:结束也未能成功测试该命令是否能用,后面在代码方面配置就好)

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test --producer.config config/producer.properties

发现会报权限相关的错

6)对生产者赋予写的权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:producer --operation Write --topic test

7)对消费者赋予读的权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:consumer --operation Read --topic test

此时启动消费者(ps:结束也未能成功测试该命令是否能用,后面在代码方面配置就好)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties

此时依旧会报错,报未对消费者组授权。给groupId配权

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:consumer --operation Read --group test-group

此时再启动消费者,可以发现能正常消费生产者的消息

8)查看权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --list

springboot整合配置

权限主要配置部分格式:

props.put("security.protocol", "SASL_PLAINTEXT");
      props.put("sasl.mechanism", "SCRAM-SHA-512");
      props.put("sasl.jaas.config",
            "org.apache.kafka.common.security.scram.ScramLoginModule required username='easy' password='easy1234';");

生产者:

//异步发送@Testfun customProducer() {//配置val properties = Properties()//链接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8100"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_PLAINTEXT"properties[SaslConfigs.SASL_MECHANISM] = "SCRAM-SHA-512"properties[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"producer\" password=\"prod-sec\";"val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 1) {//黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)kafkaProducer.send(ProducerRecord("test", "我是成功:::${LocalDateTime.now()}"))}//"type":"UPDATE/ADD/DELETE"//关闭资源kafkaProducer.close()}

消费者

package com.umh.medicalbookingplatform.background.configimport com.umh.medicalbookingplatform.core.properties.ApplicationProperties
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory/*** @Description :* @Author  xiaomh* @date  2022/8/30 14:14*/@EnableKafka
@Configuration
class KafkaConsumerConfig {@Autowiredprivate lateinit var appProperties: ApplicationProperties@Beanfun consumerFactory(): ConsumerFactory<String?, String?> {val props: MutableMap<String, Any> = HashMap()props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = appProperties.kafkaBootstrapServersConfig.toString()props[ConsumerConfig.GROUP_ID_CONFIG] = appProperties.kafkaGroupId.toString()props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.javaprops[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.javaprops[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.javaprops[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = appProperties.kafkaSecurityProtocol.toString()props[SaslConfigs.SASL_MECHANISM] = appProperties.kafkaSaslMechanism.toString()props[SaslConfigs.SASL_JAAS_CONFIG] = appProperties.kafkaSaslJaasConfig.toString()return DefaultKafkaConsumerFactory(props)}@Beanfun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {val factory = ConcurrentKafkaListenerContainerFactory<String, String>()factory.setConsumerFactory(consumerFactory())return factory}
}

yml

kafkaBootstrapServersConfig: xxxxxx:8100
kafkaGroupId: test-group
kafkaSecurityProtocol: SASL_PLAINTEXT
kafkaSaslMechanism: SCRAM-SHA-512
kafkaSaslJaasConfig: org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec";

参考

Kafka动态认证SASL/SCRAM验证_慕木兮人可的博客-CSDN博客

Kafka动态认证SASL/SCRAM配置+整合springboot配置相关推荐

  1. Kafka动态认证SASL/SCRAM验证

    一.目的 配置SASL/PLAIN验证,实现了对Kafka的权限控制.但SASL/PLAIN验证有一个问题:只能在JAAS文件KafkaServer中配置用户,一但Kafka启动,无法动态新增用户.S ...

  2. kafka集群搭建+权限认证(SASL/SCRAM)+整合springboot

    本文介绍的的是kafka集群搭建.kafka权限认证(SASL/SCRAM).整合springboot项目. 1.创建kafka日志和zookeeper文件目录: /data/kafka/kafka- ...

  3. Kafka SASL_PLAINTEXT权限管理,并整合SpringBoot

    使用SASL/PLAIN进行身份验证 SASL/PLAIN是一种简单的用户名/密码身份验证机制,通常与TLS一起用于加密以实现安全身份验证.Kafka支持SASL/PLAIN的默认实现,可以扩展到生产 ...

  4. Kafka安全认证SASL/PLAINTEXT,账号密码认证

    环境centos7 kafka集群和zookeeper集群默认都是不带用户密码的. 1. 配置zookeeper集群SASL zookeeper所有节点都是对等的,只是各个节点角色可能不相同.以下步骤 ...

  5. Kafka安装配置(SASL/SCRAM动态认证)

    SASL/SCRAM验证方法可以在Kafka服务启动之后,动态的新增用户分并配权限,在业务变动频繁,开发人员多的情况下比SASL/PLAIN方法更加灵活. Zookeeper:3.4.13,kafka ...

  6. Kafka SASL/SCRAM动态认证集群部署

    Kafka SASL/SCRAM动态认证集群部署 目的:配置SASL/PLAIN验证,实现了对Kafka的权限控制.但SASL/PLAIN验证有一个问题:只能在JAAS文件KafkaServer中配置 ...

  7. Kafka SASL SCRAM动态授权实现方案Java版

    效果截图预览 一.pom依赖 <!-- kafka client --> <dependency><groupId>org.apache.kafka</gro ...

  8. 大数据Hadoop之——Kafka安全机制(Kafka SSL认证实现)

    文章目录 一.概述 1)SASL认证概述 2)Delegation Token认证概述 3)SSL认证概述(本章实现) 二.各种安全认证机制对比和使用场景 三.Kafka SSL认证实现 1)创建ss ...

  9. kafka动态权限认证(SASL SCRAM + ACL)

    kafka动态权限认证(SASL SCRAM + ACL) 创建三个测试用户 bin/kafka-configs.sh --zookeeper 192.168.x.x:2181 --alter --a ...

最新文章

  1. T-SQL游标学习总结
  2. Consensus Mechanisms — As Detailed and Concise as possible!
  3. [20161006]windows下bbed使用注意.txt
  4. WPF实现环(圆)形菜单
  5. 论文小综 | 文档级关系抽取方法(下)
  6. 7-31 字符串循环左移 (20 分)
  7. CF788E:New task
  8. Linux 命令(17)—— su 与 sudo 命令
  9. 在Java中将前导零添加到数字? [重复]
  10. centos7 python2换成python3后,yum报错解决
  11. 知识共享平台开发-BUG[2014-11-27]
  12. 单芯片快速以太网MAC控制器DM9000介绍续
  13. 软件工程计算机水平 推荐表,软件工程就业推荐表2014届.doc
  14. 7-22 寻找大富翁 (25 分)
  15. 深入解析云原生网络抖动引起的性能问题 @龙蜥社区eBPF SIG
  16. 头颅ct有伪影_颅脑CT怎么看?正常和异常影像分别是什么
  17. python微信公众号开发音乐功能_python利用微信公众号实现报警功能
  18. 极大似然估计量(θ)
  19. MacBook nice软件
  20. [案例5-1]模拟订单号生成

热门文章

  1. 目标检测回归损失函数:SmoothL1/IoU/GIoU/DIoU/CIoU Loss
  2. P2P贷款全攻略,贷前、贷中、贷后工作事项解析
  3. mysql学习笔记(1)_DQL(Data Query Language)
  4. 华硕p9d服务器主板什么系统,华硕服务器主板P9D-V
  5. 痞子衡嵌入式:盘点国内Cortex-M内核MCU厂商高性能产品
  6. blender 保留贴图转换 mmd 模型到 ue4/ue5 引擎
  7. 1100 Mars Numbers
  8. 2020程序员VS码农,“金三银四”春招指南
  9. Json 格式化工具类 支持Jackson、FastJson、Gson
  10. 职场“女神”,绝不会有的12个习惯