转载请注明出处:Springboot集成使用阿里云kafka详细步骤

明确连接认证类型

首先要明确使用哪种连接认证类型

Ons模式参考

https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-java-demo/beta

Ons模式的conf内容

KafkaClient {com.aliyun.openservices.ons.sasl.client.OnsLoginModule requiredAccessKey="XXX"SecretKey="XXX";
};

Plain模式参考

https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-java-demo/vpc-ssl

Plain模式的conf内容

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="xxxxxxxxxxxxxxxxxxxxx"password="xxxxxxxxxxxxxxxxxxxxx";
};

分别在这两个帖子中下载对应的jks证书和conf文件。

或者参考代码的相应目录下

注意,这两个配置都不能打包到jar包中,否则容易无法识别和出问题,所以我们需要放在服务的明确路径里。

例如/jar/kafka_client_jaas.conf和/jar/kafka.client.truststore.jks

集成

springboot版本为1.5.2。

引入kafka-client的jar包

在项目的pom文件中添加kafka-clients并且排除spring-kafka中的kafka-clients。

因为spring-kafka目前最新版本为2.1.2,其依赖的kafka-clients是1.0.x,但Kafka 服务端版本是 0.10,Client 版本建议 0.10,所以此处需排除依赖重新引入,否则一直报错:disconnected

如下:

     <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.0.0</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>

新建KafkaAliyunConfiguration类

KafkaAliyunConfiguration.java

package com.biologic.util;import java.net.URL;
import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.util.StringUtils;@Configuration
@EnableKafka
public class KafkaAliyunConfiguration {@Value("${kafka.broker.address}")private String brokerAddress;@Value("${kafka.sample.topic}")private String defaultTopic;@Value("${kafka.jks.location}")private String jksLocation;@Value("${kafka.sample.retrycount}")private String retrycount;public KafkaAliyunConfiguration() {//如果用-D 或者其它方式设置过,这里不再设置if (null == System.getProperty("java.security.auth.login.config")) {//请注意将 XXX 修改为自己的路径//这个路径必须是一个文件系统可读的路径,不能被打包到 jar 中System.setProperty("java.security.auth.login.config", "/jar/kafka_client_jaas.conf");}System.out.println("环境变量中已有config文件,kafka配置为:"+System.getProperty("java.security.auth.login.config"));}public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<String, Object>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);if (StringUtils.isEmpty(jksLocation)) {props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaAliyunConfiguration.class.getClassLoader().getResource("kafka.client.truststore.jks").getPath());} else {props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksLocation);}props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");props.put(ProducerConfig.RETRIES_CONFIG, retrycount);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<String, String>(producerConfigs());}@Beanpublic  KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());kafkaTemplate.setDefaultTopic(defaultTopic);return kafkaTemplate;}
}

此处定义了四个变量,通过配置文件注入:

brokerAddress kafka服务器地址

defaultTopic kafka默认topic

jksLocation JKS文件地址(开发环境无需定义,直接读取resources下的jks,但生产环境需读取jar包外部的jks文件,所以此处需配置路径)

retrycount 重试次数

配置文件properties中增加相应变量

在application-beta.properties中增加对应配置如下:

kafka.broker.address=39.76.22.123:9093,39.175.15.234:9093,39.126.188.165:9093kafka.sample.retrycount=100kafka.sample.topic=save_samplekafka.jks.location=/jar/kafka.client.truststore.jks

新建KafkaService发送消息

KafkaService.java

package com.biologic.api.service;import org.springframework.stereotype.Service;@Service
public interface KafkaService {void sendMessage(String topic, String data);void releaseKafkaMsg(String barcode, String chip);}

KafkaServiceImpl.java

package com.biologic.api.service.impl;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import com.biologic.api.service.KafkaService;import net.sf.json.JSONArray;
import net.sf.json.JSONObject;@Service
public class KafkaServiceImpl implements KafkaService {@Value("${kafka.sample.topic}")private String sampleTopic;private Logger LOG = LoggerFactory.getLogger(KafkaServiceImpl.class);// private final KafkaTemplate<Integer, String> kafkaTemplate;//// /**// * 注入KafkaTemplate// * @param kafkaTemplate kafka模版类// */// @Autowired// public KafkaServiceImpl(KafkaTemplate kafkaTemplate) {// this.kafkaTemplate = kafkaTemplate;// }@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String data) {LOG.info("kafka sendMessage start");ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);}@Overridepublic void onSuccess(SendResult<String, String> result) {LOG.info("kafka sendMessage success topic = {}, data = {}", topic, data);}});LOG.info("kafka sendMessage end");}public void releaseKafkaMsg(String barcode, String chip) {try {JSONArray data = new JSONArray();JSONObject kafka_sample_state = new JSONObject();kafka_sample_state.put("plate_id", chip);kafka_sample_state.put("barcode", barcode);kafka_sample_state.put("status", "release_report");data.add(kafka_sample_state);JSONObject sample_list = new JSONObject();sample_list.put("sample_list", data.toString());sendMessage(sampleTopic, sample_list.toString());} catch (Exception e) {e.printStackTrace();}}}

外部注入路径变量的方式

我们上面的代码中是把conf文件的路径写死的,如果需要变动地址,可以使用以下方式

环境注入conf文件路径

因为代码中会默认获取环境变量中的java.security.auth.login.config配置,所以只需要启动时 赋值路径即可。

-Djava.security.auth.login.config=你的配置绝对路径

完整启动springboot的项目命令如下:

java -jar /jar/report-api-1.0.0-SNAPSHOT.jar --spring.profiles.active=beta  -Djava.security.auth.login.config=/jar/kafka_client_jaas.conf

变量注入conf文件路径

注意 因为类的初始化在注入变量之前,所以conf的路径不能用变量的方式注入,否则会报空指针错误。

如下用法会报错

 @Value("${kafka.conf.location}")private String confLocation;public KafkaAliyunConfiguration() {// 如果用-D 或者其它方式设置过,这里不再设置if (null == System.getProperty("java.security.auth.login.config")) {// 请注意将 XXX 修改为自己的路径// 这个路径必须是一个文件系统可读的路径,不能被打包到 jar 中System.setProperty("java.security.auth.login.config", confLocation);System.out.println("使用配置中的路径,kafka配置为:" + System.getProperty("java.security.auth.login.config"));} else {System.out.println("环境变量中已有config文件,kafka配置为:" + System.getProperty("java.security.auth.login.config"));}}

安全层面加固

因为直接conf文件中包含帐号密码容易被其他人查看到,有一种方式是外部引入模版文件,使用环境变量中的帐号密码修改conf文件。

模版文件如下:

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="_KAFKA_ALIYUN_USERNAME_"password="_KAFKA_ALIYUN_PASSWORD_";
};

使用shell命令从s3中下载conf文件并修改conf文件如下:

      initContainers:- name: pull-libimage: anigeo/awscli:latestcommand: ["/bin/sh","-c"] args: ['aws s3 cp s3://test-env/kafka.client.truststore.jks /jar/ ;aws s3 cp s3://test-env/kafka_client_jaas.conf /jar/ ;sed -i "s/_KAFKA_ALIYUN_USERNAME_/${KAFKA_SSL_USERNAME}/"  /jar/kafka_client_jaas.conf;sed -i "s/_KAFKA_ALIYUN_PASSWORD_/${KAFKA_SSL_PASSWORLD}/"  /jar/kafka_client_jaas.conf']env:- name: AWS_DEFAULT_REGIONvalue: cn-southwest-2- name: KAFKA_SSL_USERNAMEvalueFrom:secretKeyRef:name: aliyun-kafkakey: username- name: KAFKA_SSL_PASSWORLDvalueFrom:secretKeyRef:name: aliyun-kafkakey: passwordvolumeMounts:- name: workdirmountPath: /jar  

可能遇到的问题–org.apache.kafka.common.errors.UnsupportedSaslMechanismException: Client SASL mechanism ‘ONS’ not enabled in the server, enabled mechanisms are [PLAIN]

原因

代码中使用的配置与conf中设置的安全机制不一致。

解决方式

PLAIN模式
代码中

     props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

对应conf内容

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="xxxxxxxxxxxxxxxxxxxxx"password="xxxxxxxxxxxxxxxxxxxxx";
};

ONS模式
代码中

     props.put(SaslConfigs.SASL_MECHANISM, "ONS");

对应conf内容

KafkaClient {com.aliyun.openservices.ons.sasl.client.OnsLoginModule requiredAccessKey="XXX"SecretKey="XXX";
};

可能遇到的问题–nested exception is java.lang.NullPointerException

使用代码为

 public KafkaAliyunConfiguration() {if (StringUtils.isEmpty(confLocation)) {URL authLocation = KafkaAliyunConfiguration.class.getClassLoader().getResource("kafka_client_jaas.conf");if (System.getProperty("java.security.auth.login.config") == null) {System.setProperty("java.security.auth.login.config", authLocation.toExternalForm());}System.out.println("kafka配置为:"+authLocation.toExternalForm());} else {System.out.println("kafka配置为:"+confLocation);System.setProperty("java.security.auth.login.config", confLocation);}     }

在进行KafkaAliyunConfiguration初始化时报错空指针。

Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'qualityServiceImpl': Unsatisfied dependency expressed through field 'kafkaService'; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaServiceImpl': Unsatisfied dependency expressed through field 'kafkaTemplate'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaAliyunConfiguration' defined in URL [jar:file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/biologic/util/KafkaAliyunConfiguration.class]: Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.biologic.util.KafkaAliyunConfiguration$$EnhancerBySpringCGLIB$$88e40778]: Constructor threw exception; nested exception is java.lang.NullPointerException

原因 初始化KafkaAliyunConfiguration时,变量加载的顺序问题导致无法识别到变量。

解决方式

方式一 初始化时不使用注入的变量
如下:

 public KafkaAliyunConfiguration() {//如果用-D 或者其它方式设置过,这里不再设置if (null == System.getProperty("java.security.auth.login.config")) {//请注意将 XXX 修改为自己的路径//这个路径必须是一个文件系统可读的路径,不能被打包到 jar 中System.setProperty("java.security.auth.login.config", "/jar/kafka_client_jaas.conf");}System.out.println("环境变量中已有config文件,kafka配置为:"+System.getProperty("java.security.auth.login.config"));}

方式二 将bean方法设置成static静态方法

参考 spring boot整合shiro引用配置文件配置是出现的问题

可能遇到的问题–Caused by: java.io.FileNotFoundException: file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/kafka.client.truststore.jks (No such file or directory)

原因

打入jar包的证书和conf文件无法读取,或者没有设置外部路径导致默认读取项目内的配置。

解决方式

通过外部明确的linux路径进行配置。

可能遇到问题–Configuration Error:Line 3: expected [option key]

 ssl.truststore.location = file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/kafka.client.truststore.jksServiceExceptionHandler.java[line:30] exception ERROR org.apache.kafka.common.KafkaException: Failed to construct kafka producerCaused by: java.io.IOException: Configuration Error:Line 3: expected [option key]

原因–配置文件无法读取或者参数格式错误。

解决方法

通过外部明确的linux路径进行配置jks和conf文件, 并且注意conf中的参数格式–分号,冒号要与原文件一致。

转载请注明出处:Springboot集成使用阿里云kafka详细步骤

参考链接

https://help.aliyun.com/document_detail/99958.html?spm=a2c4g.11186623.6.563.7b3b1e3bEl5oex

https://yq.aliyun.com/articles/433740

Springboot集成使用阿里云kafka详细步骤相关推荐

  1. Linux入门最终集! Vue SpringBoot Mybatis-Plus Linux 阿里云!

    Linux入门最终集! Vue & SpringBoot & Mybatis-Plus & Linux & 阿里云! 一.Xshell连接时出现警告 The remot ...

  2. springboot项目整合阿里云oss的内容审核

    springboot项目整合阿里云 内容审核 第一 添加依赖 <dependency><groupId>com.aliyun</groupId><artifa ...

  3. SpringBoot集成腾讯云存储COS服务

    前言 该文章会先简单的介绍一下腾讯云的COS存储,然后演示如何在SpringBoot项目中集成COS,每一步都有记录,保证初学者也能看懂. 文章目录 前言 1.腾讯云对象存储介绍 1.1.开通&quo ...

  4. 使用springboot集成腾讯云短信服务,解决配置文件读取乱码问题

    使用springboot集成腾讯云短信服务,解决配置文件读取乱码问题 参考文章: (1)使用springboot集成腾讯云短信服务,解决配置文件读取乱码问题 (2)https://www.cnblog ...

  5. SpringBoot 中连接阿里云rds数据库

    今天首次碰到在SpringBoot中使用阿里云rds数据库,搜索一了一番资料,最后成功实现.然后在这里做下自我总结.(个人用到是SSH方式进行连接) 1.在pom文件中添加jar包依赖 <dep ...

  6. 利用Jenkins自动化部署springboot项目到阿里云服务器(centos8)

    背景: 最近想部署一个springboot项目到阿里云服务器里面.捣鼓了很久,构建了很多次,都失败了!实在是一脸辛酸,有点气馁~ 在此想记录一下构建的过程. 不了解Jenkins之前,觉得Jenkin ...

  7. Springboot+阿里云kafka踩坑实录

    场景描述:上文写到,不断接收数据并存放到OSS,现在要把数据存到MQ的kafka一份. springboot版本为1.5.9. 开工之前先阅读阿里云官方kafka消息接入说明:https://help ...

  8. 项目集成短信验证功能(阿里云)详细实用教程

    阿里云手机短信验证码 第一步 登录阿里云开放平台 1.进入阿里云开放平台---->点击控制台 2.点击AccessKey管理 3.点击之后会弹出提示,选择开始使用子用户 4.新建一个用户组,然后 ...

  9. 【详细】阿里云Accesskey创建步骤记录

    1.登录阿里云,<点此快速传送>,登录后点击控制台. 2.鼠标移动到右上角头像 3.点击AccessKey管理 4.出现了安全提示,继续使用Accesskey. 5.点击创建AccessK ...

  10. 怎么把springboot部署到阿里云_听说“一分钟就能部署阿里云ECS集群”?

    导读 "企业上云"是当下大势所趋,"提效.降成本" 也是企业上云.迁云和管理云必须关注的问题.今天我们将从一个集群部署的场景出发,介绍阿里云如何借助生态工具Te ...

最新文章

  1. 基于TensorFlow理解三大降维技术:PCA、t-SNE 和自编码器
  2. JSP获得客服端MAC地址
  3. 微信小程序云开发用户身份登录_云开发版的微信商城小程序第一章
  4. 【STM32】端口复用和重映射,完全重映射,部分重映射
  5. 前端学习(1801):前端调试之清除浮动练习3
  6. 往java里输入坐标值_java.让用户输入x坐标,和y坐标。当用户输入完x坐标(比如200),敲enter,...
  7. crowd counting_[crowd_counting]-SFCN-CVPR2019amp;amp;GCC dataset
  8. Callable接口、Runable接口、Future接口
  9. wsdl可以访问单但是调不通_Dubbo 一些你不一定知道但是很好用的功能
  10. 深入kubernetes之Pod——一pod多容器
  11. Servlet HTTP 状态码
  12. ubuntu下JNI之HelloWorld相互传String值
  13. 选择排序法之Java实现
  14. 100句自我激励的名言佳句
  15. Hadoop 权威指南 第四版 中文目录
  16. IsValid() 的用法
  17. MINT:蛋白质相互作用数据库简介
  18. Web开发网页颜色搭配
  19. kafka错误代码解释
  20. 软链接解决存储空间不足

热门文章

  1. 关于工作后定居城市的思考以及房价…
  2. K.gather()
  3. GRUB4DOS(九)常用设备类型及map命令用法示例
  4. 卡内基梅隆计算机金融,卡耐基梅隆大学计算金融项目申请要求是什么?
  5. #2297. 战争调度(war)
  6. vue3 + vite +ts 引入静态图片
  7. vim 修改注释颜色 tab键
  8. 云栖干货回顾 | 行业顶级NoSQL成员坐阵,NoSQL数据库专场重点解析!
  9. web在线播放flv视频
  10. 计算机主机地址填什么,什么是IP地址 何设置IP地址