Kafka是广泛使用消息服务,很多情况下关于认证部分我都是默认的配置,也就是不需要用户名/密码,也不配置证书。在内网或者在项目组内部可以,但是设计的跨部门时一般处于安全考虑都需要加上认证,防止kafka被误用,产生大量垃圾信息,干扰了正常业务的运行。

Kafka提供的多种认证方式,比如SASL, 本文主要介绍启用了SASL_PLAINTEXT时,如何在kafka client配置jaas文件,以却把kafka client客户端能正确连接到kafka server上。

先上官方文档:

加载jaas文件的方式有三种
1, 设置系统属性,java.security.auth.login.config
2, 配置运行程序时用到的jre的安全文件 login.config.url.n属性(路径为加热的 lib/security/java.security ),
3, 编程是调用 javax.security.auth.login.Configuration.setConfiguration(Configuration)设置相关信息
还有一种,直接设置Producer或者Consumer的sasl.jaas.config属性

下面我们直接上最简单的这种方式。(关于jaas文件的内容,本文不再赘述,其他文章已经有提及),本文直接引用。
producer 代码.

注意,确保你的kafka client版本和你的kafka server是匹配的,server版本是kafka_2.11-1.1.0 , client版本是1.1.0
我使用的

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version></dependency>
package com.yq;import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;import static org.slf4j.Logger.*;/*** Simple to Introduction* className: SendMessageMain**/
public class SendMessageMain {private static final Logger logger = LoggerFactory.getLogger(SendMessageMain.class);public static void main(String... args) throws Exception {try {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\";");//System.setProperty("java.security.auth.login.config", "D:\workspaceGithub\\kafka_client_jaas.conf"); //配置文件路径System.out.println("create KafkaProducer");Producer<String, String> producer = new KafkaProducer<String, String>(props);String data = "aaa";ProducerRecord<String, String> producerRecord = new ProducerRecord("topic01", data);for (int i = 0; i < 100; i++) {System.out.println(i);producer.send(producerRecord,new org.apache.kafka.clients.producer.Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if(e != null) {System.out.println("onCompletion exception");e.printStackTrace();}System.out.println("The offset of the record we just sent is: " + metadata);}});System.out.println("flush producer");producer.flush();}System.out.println("close producer");producer.close();}catch (Exception ex) {ex.printStackTrace();System.out.println("when calling kafka output error." + ex.getMessage());}}}

核心就在于下面这段, 其中的sasl.jaas.config内容,参考官方文档,一般为如下格式
org.apache.kafka.common.security.plain.PlainLoginModule required username=“alice” password=“alice-secret”;

props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\";");

下面是consumer代码

package com.yq;import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.security.auth.login.AppConfigurationEntry;
import java.util.Arrays;
import java.util.Properties;/*** Simple to Introduction* className: SendMessageMain*/
public class ReceiveMessageMain {private static final Logger logger = LoggerFactory.getLogger(ReceiveMessageMain.class);public static void main(String... args) throws Exception {try {Properties props = new Properties();props.put("bootstrap.servers", "192.168.119.121:9092");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\";");System.out.println("create KafkaConsumer");System.out.println("receive data");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic01"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);System.out.println("receive data01");for (ConsumerRecord<String, String> record: records) {System.out.printf("offset = %d, key= %s , value = %s\n", record.offset(), record.key(), record.value());}}}catch (Exception ex) {ex.printStackTrace();System.out.println("when calling kafka output error." + ex.getMessage());}}}

从以上代码,我们可以看到,我也是用 System.setProperty(“java.security.auth.login.config”, “D:\workspaceGithub、\kafka_client_jaas.conf”); //配置文件路径, 也就是官方文档中的额设置系统属性。

我们还可以在程序启动参数中设置java.security.auth.login.config, 也就是启动参数加上-Djava.security.auth.login.config=D:/workspaceGithub/kafka_client_jaas.conf。

下面的截图供参考,有网友需要使用kafka自带的 kafka-console-consumer测试配置SASL_PLAINTEXT的效果

Kafka启用SASL_PLAINTEXT动态配置JAAS文件的几种方式相关推荐

  1. 【kafka】Kafka中的动态配置源码分析

    1.概述 2.源码分析 Broker启动加载动态配置 KafkaServer.startup 启动加载动态配置总流程 2.1 动态配置初始化 config.dynamicConfig.initiali ...

  2. jsp:jsp包含文件的两种方式

    第一种:include指令 include指令:当JSP转换成Servlet时引入指定文件(指令元素),这是一种静态包含,它运行的时候不会单独编译成.class文件,它生成一个新的整体.class文件 ...

  3. JVM 在遇到OOM(OutOfMemoryError)时生成Dump文件的三种方式

    JVM 在遇到OOM(OutOfMemoryError)时生成Dump文件的三种方式,以及如何使用Eclips Memory Analyzer(MAT)插件进行堆内存分析. 方法一: jmap -du ...

  4. 映射Mapper.xml文件的几种方式

    第一种: mybatis-config.xml配置文件中的格式: <mappers><mapper resource="org/mybatis/builder/Author ...

  5. java oom dump_JVM 在遇到OOM(OutOfMemoryError)时生成Dump文件的三种方式

    JVM 在遇到OOM(OutOfMemoryError)时生成Dump文件的三种方式,以及如何使用Eclips Memory Analyzer(MAT)插件进行堆内存分析. 方法一: jmap -du ...

  6. python 下载文件-Python下载文件的11种方式

    原标题:Python下载文件的11种方式 在本教程中,你将学习如何使用不同的Python模块从web下载文件.此外,你将下载常规文件.web页面.Amazon S3和其他资源. 最后,你将学习如何克服 ...

  7. eclipse4.6安装tomcate插件 Eclipse4.6(neno)配置Tomcat插件的两种方式

    为什么80%的码农都做不了架构师?>>>    Eclipse4.6(neno)配置Tomcat插件的两种方式 ,自己用的第一种手动安装 ①:下载net.sf.eclipse.tom ...

  8. java读取csv文件的两种方式

    java读取csv文件的两种方式 1.CsvReader读取 import com.csvreader.CsvReader; /*** CsvReader 读取* @param filePath* @ ...

  9. C# 读取XML文件的几种方式

    在开发过程中,我们有时会需要保存到本地一些结构化数据或者配置信息,这时就可以选择用xml文件.当然xml的用途也不仅仅是这些. 这一篇来谈一谈关于读取xml文件的几种方式: 我们有以下两个文件,一个是 ...

最新文章

  1. OC WKWebView的JS与OC交互、Cookie管理
  2. 后台系统可扩展性学习笔记(五)负载均衡
  3. 2018杭州电子科技大学计算机研究生复试笔试编程题第三题
  4. HTML5系列(2)--ol列表的新属性
  5. 之前跳槽面试时整理的一些知识点
  6. 通用计算机遵循什么原则,计算机组成原理——计算机系统概述考研题
  7. Milk-Tea解析工具(DJ音乐解析)
  8. Ubuntu18.04笔记本插入耳机没有声音 解决方案
  9. 易乐游装在云服务器_新手搭建云服务器详细过程
  10. CSDN 添加微信二维码
  11. shp数据中文乱码的一种恢复方法
  12. CSDN博客去除上传的图片水印
  13. 模拟线上应用cpu100%解决方法
  14. linux怎么修改目录时间,详解Linux命令修改文件的三个时间
  15. 从零开始大数据--Hadoop、HDFS、MapReduce、HBase、Hive
  16. N多计算机精品免费视频下载
  17. 分享图文到Facebook、Twitter、Pinterest
  18. 尝试添加 --skip-broken 来跳过无法安装的软件包 或 --nobest 来不只使用最佳选择的软件包
  19. Tomcat报错:waiting for changelog lock.
  20. [Unity]随机装备属性随机数生成物品概率

热门文章

  1. python读文本并写入另一个文本文件中怎么做_使用文本Python将文件中的文本写入另一个文件...
  2. java 实现邮件带附件发送
  3. ZLMediaKit视频推流和播放步骤
  4. 【知识兔】Excel教程之隔行/隔列求和
  5. 坑爹!攻城狮老爸被苹果解雇 只因女儿晒iPhone X!
  6. windows_7_ultimate_with_sp1_x64位旗舰版
  7. 《程序员面试》---你所不了解的offer话术
  8. python写一个飞花令程序
  9. 根据 Excel 中的数据批量修改文件的扩展名
  10. python批量修改图片名