【无废话】SpringBoot集成Kafka消息队列
0.前言
本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我**
1.说明
Spring可以方便的集成使用 Kafka消息队列 ,
只需要引入依赖包spring-kafka,
注意版本兼容问题,
本文详细介绍SpringBoot集成Kafka的方法,
以及生产者和消费者的使用方法。
2.引入依赖
在 pom.xml 中引入Spring Kafka版本:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
具体的版本号建议通过spring-boot-dependencies管理:
<properties><spring-boot.version>2.3.1.RELEASE</spring-boot.version>
</properties>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
3.配置
新建applicaion.yml,
新增如下配置:
Spring Kafka 通用配置
spring:kafka:# kafka连接接地址bootstrap-servers: localhost:9092
通用配置:spring.kafka.*
下面的admin、producer、consumer、streams配置,
会覆盖通用配置 spring.kafka.* 中相同的属性。
生产者配置
spring:kafka:# 生产者配置producer:# 序列化key的类key-serializer: org.apache.kafka.common.serialization.StringSerializer # 反序列化value的类value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
生产者相关配置:spring.kafka.producer.*
消费者配置
spring:kafka:# 消费者配置consumer:# 消费者所属消息组group-id: testGroup # 反序列化key的类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化value的类value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
消费者相关配置:spring.kafka.consumer.*
默认 value-deserializer 使用 org.apache.kafka.common.serialization.StringDeserializer ,
只支持文本消息。
使用org.springframework.kafka.support.serializer.JsonDeserializer可以让消息支持JSON。
完整applicaion.yml配置:
server:port: 8028 spring:kafka:# kafka连接接地址bootstrap-servers: localhost:9092# 生产者配置producer:# 序列化key的类key-serializer: org.apache.kafka.common.serialization.StringSerializer # 反序列化value的类value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消费者配置consumer:# 消费者所属消息组group-id: testGroup # 反序列化key的类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化value的类value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
4.开发代码
新建KafkaMQApplication.java启动类,
注意要新增 @EnableKafka 注解:
package com.yuwen.spring.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication
@EnableKafka
public class KafkaMQApplication {public static void main(String[] args) {SpringApplication.run(KafkaMQApplication.class, args);}
}
生产者发送消息
Spring Kafka 提供KafkaTemplate类发送消息,
在需要的地方注入即可,
新增ProviderService.java生产者服务类:
package com.yuwen.spring.kafka.provider;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class ProviderService {public static final String TOPIC = "testTopic";@Autowiredprivate KafkaTemplate<?, String> kafkaTemplate;public void send(String message) {// 发送消息kafkaTemplate.send(TOPIC, message);System.out.println("Provider= " + message);}
}
注意指定 topic ,
以及要发送的消息内容message。
消费者接收消息
新增ConsumerService.java类,
注意使用 @KafkaListener 注解:
package com.yuwen.spring.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;import com.yuwen.spring.kafka.provider.ProviderService;@Service
public class ConsumerService {@KafkaListener(topics = ProviderService.TOPIC, groupId = "testGroup", topicPartitions = {})public void receive(String message) {System.out.println("Consumer= " + message);}
}
参数说明:
topics 与发送消息topic相同,可以指定多个
groupId 消费组唯一id
topicPartitions topic分区,可指定多个
5.自动产生消息
为了测试生产者产生消息,
编写AutoGenerate.java,
自动生成随机字符串,
作为生产者向kafka发送消息:
package com.yuwen.spring.kafka.provider;import java.util.UUID;
import java.util.concurrent.TimeUnit;import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class AutoGenerate implements InitializingBean {@Autowiredprivate ProviderService providerService;@Overridepublic void afterPropertiesSet() throws Exception {Thread t = new Thread(new Runnable() {@Overridepublic void run() {while (true) {String message = UUID.randomUUID().toString();providerService.send(message);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}});t.start();}
}
6.运行服务
运行KafkaMQApplication.java启动类,
输出如下日志,
可以看到生产者产生的随机字符串,
能够被消费者正确获取到:
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v2.3.1.RELEASE)2022-04-28 19:37:49.687 INFO 14424 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : Starting KafkaMQApplication on yuwen-asiainfo with PID 14424 (D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka\target\classes started by yuwen in D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka)
2022-04-28 19:37:49.689 INFO 14424 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : No active profile set, falling back to default profiles: default
2022-04-28 19:37:51.282 INFO 14424 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8028 (http)
2022-04-28 19:37:51.290 INFO 14424 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-04-28 19:37:51.291 INFO 14424 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.36]
2022-04-28 19:37:51.371 INFO 14424 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-04-28 19:37:51.371 INFO 14424 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1645 ms
2022-04-28 19:37:51.491 INFO 14424 --- [ Thread-119] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [10.21.13.14:9092]buffer.memory = 33554432client.dns.lookup = defaultclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = falseinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metadata.max.idle.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 2147483647retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.springframework.kafka.support.serializer.JsonSerializer2022-04-28 19:37:51.563 INFO 14424 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2022-04-28 19:37:51.590 INFO 14424 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2022-04-28 19:37:51.592 INFO 14424 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2022-04-28 19:37:51.592 INFO 14424 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1651145871589
2022-04-28 19:37:51.851 INFO 14424 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = trueauto.commit.interval.ms = 5000auto.offset.reset = latestbootstrap.servers = [10.21.13.14:9092]check.crcs = trueclient.dns.lookup = defaultclient.id = client.rack = connections.max.idle.ms = 540000default.api.timeout.ms = 60000enable.auto.commit = falseexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = testGroupgroup.instance.id = nullheartbeat.interval.ms = 3000interceptor.classes = []internal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer2022-04-28 19:37:51.888 INFO 14424 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2022-04-28 19:37:51.888 INFO 14424 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2022-04-28 19:37:51.888 INFO 14424 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1651145871888
2022-04-28 19:37:51.890 INFO 14424 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Subscribed to topic(s): testTopic
2022-04-28 19:37:51.892 INFO 14424 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2022-04-28 19:37:51.911 INFO 14424 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8028 (http) with context path ''
2022-04-28 19:37:51.921 INFO 14424 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : Started KafkaMQApplication in 2.582 seconds (JVM running for 2.957)
2022-04-28 19:37:51.939 INFO 14424 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
2022-04-28 19:37:51.939 INFO 14424 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
2022-04-28 19:37:51.940 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Discovered group coordinator 10.21.13.14:9092 (id: 2147483647 rack: null)
2022-04-28 19:37:51.942 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] (Re-)joining group
2022-04-28 19:37:51.959 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Finished assignment for group at generation 5: {consumer-testGroup-1-35e34543-5bf3-4a4a-a590-9a4c6f7e1ae3=Assignment(partitions=[testTopic-0])}
Provider= c30a2e6c-e2e8-419e-865c-04885d1a90b5
2022-04-28 19:37:51.966 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Successfully joined group with generation 5
2022-04-28 19:37:51.970 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Adding newly assigned partitions: testTopic-0
2022-04-28 19:37:51.984 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Setting offset for partition testTopic-0 to the committed offset FetchPosition{offset=310751, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.21.13.14:9092 (id: 0 rack: null)], epoch=absent}}
2022-04-28 19:37:51.985 INFO 14424 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : testGroup: partitions assigned: [testTopic-0]
Consumer= 19210493-d0df-4cd9-993b-f99500523eb2
Consumer= baee3749-307f-4894-ad88-a9610700ab80
Consumer= bea5e807-b003-4c90-89be-20439e2fa921
Consumer= 98258208-8a95-495d-917e-84d30d965e2b
Consumer= 4301851e-ab19-4c9e-89d6-7b604acdf077
Consumer= c30a2e6c-e2e8-419e-865c-04885d1a90b5
Provider= a6d47e9e-de74-481f-82f8-02bd7384fdd8
Consumer= a6d47e9e-de74-481f-82f8-02bd7384fdd8
Provider= bd935ef1-cc61-4014-a971-1ad76c5e82bf
Consumer= bd935ef1-cc61-4014-a971-1ad76c5e82bf
7.最后
下面的面试题答案都整理成文档笔记。也还整理了一些面试资料&最新2022收集的一些大厂的面试真题(都整理成文档,小部分截图),有需要的可以
【无废话】SpringBoot集成Kafka消息队列相关推荐
- SpringBoot集成Kafka消息队列
1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...
- SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...
- SpringBoot集成AMQ消息队列
首先新建一个SpringBoot项目,下面开始正式的集成操作. 第一步: 打开pom.xml文件,添加依赖.这里等待maven仓库加载完成. <dependency><groupId ...
- SpringBoot集成ActiveMq消息队列实现即时和延迟处理
原文链接:https://blog.csdn.net/My_harbor/article/details/81328727 一.安装ActiveMq 具体安装步骤:自己谷歌去 二.新建springbo ...
- SpringBoot集成Kafka
SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...
- kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制
一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...
- 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例
从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...
- 19 kafka消息队列
文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...
- springboot集成kafka及kafka web UI的使用
springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...
最新文章
- 面对滚滚而来好政策 安防行业该如何发展
- 马云、马斯克同台谈AI:人工智能拯救世界?
- 2013\National _C_C++_B\2.连续奇数和
- 前端学习(975):bootstrap轮播图
- 如何巧妙的申请换部门_如何设置户外广告?市城管局局长体验户外广告审批流程...
- C++ new一个数组时,指针移动程序崩溃问题
- 手绘线条图画机器人_怎么把照片转换成手绘图画?怎么加水印?
- SpringCloud+Docker+Jenkins+GitLab+Maven实现自动化构建与部署实战
- 从我的简历再谈测试分类
- SQL 已死,但 SQL 将永存!
- iScroll框架的修改
- java multibox_月光软件站 - 编程文档 - Java - Struts 中 Multibox 的用法
- 3dmax烘焙贴图主要设置之Flatiron
- 好消息:部分银行磁条卡更换芯片卡
- Protocol “https“ not supported or disabled in libcurl
- div vue 图片背景_div设置背景(vue设置div的背景图片)
- No387FirstUniqueCharacterInAString
- Caffe深度学习框架作者贾扬清online内容分享II
- Mysql 的自增主键达到最大值,怎么办
- 用Hook解决在VC++与C++Builder方面界面设计的一些问题
热门文章
- python十进制转换_Python 进制转换
- 可视化任务编排拖拉拽 | Scaleph 基于 Apache SeaTunnel的数据集成
- 游戏录制,如何实现视频音画同步
- i7 10870h和i9 10980hk 有什么区别
- 2018年高职组——信息评估与管理赛题(解析)
- 周末加餐 使陆地分离的最少天数
- 最新章节 第238章 超级计算机的安排,超脑太监(萧舒)_第238章 封神(三更)(2 / 2)_超脑太监最新章节免费阅读无弹窗_努努小说坊...
- 【Zbrush教程】怎样用ZBrush去雕刻制作一个维京人,详细讲解教程
- 小程序的消息提醒——订阅消息
- 01 -Python入门- Lesson1 为何要学习 Python 编程