0.前言

本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我**

1.说明

Spring可以方便的集成使用 Kafka消息队列 ,
只需要引入依赖包spring-kafka,
注意版本兼容问题,
本文详细介绍SpringBoot集成Kafka的方法,
以及生产者和消费者的使用方法。

2.引入依赖

在 pom.xml 中引入Spring Kafka版本:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

具体的版本号建议通过spring-boot-dependencies管理:

<properties><spring-boot.version>2.3.1.RELEASE</spring-boot.version>
</properties>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

3.配置

新建applicaion.yml,
新增如下配置:

Spring Kafka 通用配置

spring:kafka:# kafka连接接地址bootstrap-servers: localhost:9092

通用配置:spring.kafka.*
下面的admin、producer、consumer、streams配置,
会覆盖通用配置 spring.kafka.* 中相同的属性。

生产者配置

spring:kafka:# 生产者配置producer:# 序列化key的类key-serializer: org.apache.kafka.common.serialization.StringSerializer # 反序列化value的类value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

生产者相关配置:spring.kafka.producer.*

消费者配置

spring:kafka:# 消费者配置consumer:# 消费者所属消息组group-id: testGroup # 反序列化key的类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化value的类value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

消费者相关配置:spring.kafka.consumer.*

默认 value-deserializer 使用 org.apache.kafka.common.serialization.StringDeserializer ,
只支持文本消息。
使用org.springframework.kafka.support.serializer.JsonDeserializer可以让消息支持JSON。

完整applicaion.yml配置:

server:port: 8028 spring:kafka:# kafka连接接地址bootstrap-servers: localhost:9092# 生产者配置producer:# 序列化key的类key-serializer: org.apache.kafka.common.serialization.StringSerializer # 反序列化value的类value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消费者配置consumer:# 消费者所属消息组group-id: testGroup # 反序列化key的类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化value的类value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

4.开发代码

新建KafkaMQApplication.java启动类,
注意要新增 @EnableKafka 注解:

package com.yuwen.spring.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication
@EnableKafka
public class KafkaMQApplication {public static void main(String[] args) {SpringApplication.run(KafkaMQApplication.class, args);}
}

生产者发送消息

Spring Kafka 提供KafkaTemplate类发送消息,
在需要的地方注入即可,
新增ProviderService.java生产者服务类:

package com.yuwen.spring.kafka.provider;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class ProviderService {public static final String TOPIC = "testTopic";@Autowiredprivate KafkaTemplate<?, String> kafkaTemplate;public void send(String message) {// 发送消息kafkaTemplate.send(TOPIC, message);System.out.println("Provider= " + message);}
}

注意指定 topic ,
以及要发送的消息内容message。

消费者接收消息

新增ConsumerService.java类,
注意使用 @KafkaListener 注解:

package com.yuwen.spring.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;import com.yuwen.spring.kafka.provider.ProviderService;@Service
public class ConsumerService {@KafkaListener(topics = ProviderService.TOPIC, groupId = "testGroup", topicPartitions = {})public void receive(String message) {System.out.println("Consumer= " + message);}
}

参数说明:
topics 与发送消息topic相同,可以指定多个
groupId 消费组唯一id
topicPartitions topic分区,可指定多个

5.自动产生消息

为了测试生产者产生消息,
编写AutoGenerate.java,
自动生成随机字符串,
作为生产者向kafka发送消息:

package com.yuwen.spring.kafka.provider;import java.util.UUID;
import java.util.concurrent.TimeUnit;import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class AutoGenerate implements InitializingBean {@Autowiredprivate ProviderService providerService;@Overridepublic void afterPropertiesSet() throws Exception {Thread t = new Thread(new Runnable() {@Overridepublic void run() {while (true) {String message = UUID.randomUUID().toString();providerService.send(message);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}});t.start();}
}

6.运行服务

运行KafkaMQApplication.java启动类,
输出如下日志,
可以看到生产者产生的随机字符串,
能够被消费者正确获取到:

.   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::        (v2.3.1.RELEASE)2022-04-28 19:37:49.687  INFO 14424 --- [           main] c.yuwen.spring.kafka.KafkaMQApplication  : Starting KafkaMQApplication on yuwen-asiainfo with PID 14424 (D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka\target\classes started by yuwen in D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka)
2022-04-28 19:37:49.689  INFO 14424 --- [           main] c.yuwen.spring.kafka.KafkaMQApplication  : No active profile set, falling back to default profiles: default
2022-04-28 19:37:51.282  INFO 14424 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8028 (http)
2022-04-28 19:37:51.290  INFO 14424 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-04-28 19:37:51.291  INFO 14424 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.36]
2022-04-28 19:37:51.371  INFO 14424 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-04-28 19:37:51.371  INFO 14424 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1645 ms
2022-04-28 19:37:51.491  INFO 14424 --- [     Thread-119] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [10.21.13.14:9092]buffer.memory = 33554432client.dns.lookup = defaultclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = falseinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metadata.max.idle.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 2147483647retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.springframework.kafka.support.serializer.JsonSerializer2022-04-28 19:37:51.563  INFO 14424 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2022-04-28 19:37:51.590  INFO 14424 --- [     Thread-119] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2022-04-28 19:37:51.592  INFO 14424 --- [     Thread-119] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2022-04-28 19:37:51.592  INFO 14424 --- [     Thread-119] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1651145871589
2022-04-28 19:37:51.851  INFO 14424 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: allow.auto.create.topics = trueauto.commit.interval.ms = 5000auto.offset.reset = latestbootstrap.servers = [10.21.13.14:9092]check.crcs = trueclient.dns.lookup = defaultclient.id = client.rack = connections.max.idle.ms = 540000default.api.timeout.ms = 60000enable.auto.commit = falseexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = testGroupgroup.instance.id = nullheartbeat.interval.ms = 3000interceptor.classes = []internal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer2022-04-28 19:37:51.888  INFO 14424 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2022-04-28 19:37:51.888  INFO 14424 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2022-04-28 19:37:51.888  INFO 14424 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1651145871888
2022-04-28 19:37:51.890  INFO 14424 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Subscribed to topic(s): testTopic
2022-04-28 19:37:51.892  INFO 14424 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2022-04-28 19:37:51.911  INFO 14424 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8028 (http) with context path ''
2022-04-28 19:37:51.921  INFO 14424 --- [           main] c.yuwen.spring.kafka.KafkaMQApplication  : Started KafkaMQApplication in 2.582 seconds (JVM running for 2.957)
2022-04-28 19:37:51.939  INFO 14424 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
2022-04-28 19:37:51.939  INFO 14424 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
2022-04-28 19:37:51.940  INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Discovered group coordinator 10.21.13.14:9092 (id: 2147483647 rack: null)
2022-04-28 19:37:51.942  INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] (Re-)joining group
2022-04-28 19:37:51.959  INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Finished assignment for group at generation 5: {consumer-testGroup-1-35e34543-5bf3-4a4a-a590-9a4c6f7e1ae3=Assignment(partitions=[testTopic-0])}
Provider= c30a2e6c-e2e8-419e-865c-04885d1a90b5
2022-04-28 19:37:51.966  INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Successfully joined group with generation 5
2022-04-28 19:37:51.970  INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Adding newly assigned partitions: testTopic-0
2022-04-28 19:37:51.984  INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Setting offset for partition testTopic-0 to the committed offset FetchPosition{offset=310751, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.21.13.14:9092 (id: 0 rack: null)], epoch=absent}}
2022-04-28 19:37:51.985  INFO 14424 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : testGroup: partitions assigned: [testTopic-0]
Consumer= 19210493-d0df-4cd9-993b-f99500523eb2
Consumer= baee3749-307f-4894-ad88-a9610700ab80
Consumer= bea5e807-b003-4c90-89be-20439e2fa921
Consumer= 98258208-8a95-495d-917e-84d30d965e2b
Consumer= 4301851e-ab19-4c9e-89d6-7b604acdf077
Consumer= c30a2e6c-e2e8-419e-865c-04885d1a90b5
Provider= a6d47e9e-de74-481f-82f8-02bd7384fdd8
Consumer= a6d47e9e-de74-481f-82f8-02bd7384fdd8
Provider= bd935ef1-cc61-4014-a971-1ad76c5e82bf
Consumer= bd935ef1-cc61-4014-a971-1ad76c5e82bf

7.最后

下面的面试题答案都整理成文档笔记。也还整理了一些面试资料&最新2022收集的一些大厂的面试真题(都整理成文档,小部分截图),有需要的可以

【无废话】SpringBoot集成Kafka消息队列相关推荐

  1. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  2. SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门

    1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...

  3. SpringBoot集成AMQ消息队列

    首先新建一个SpringBoot项目,下面开始正式的集成操作. 第一步: 打开pom.xml文件,添加依赖.这里等待maven仓库加载完成. <dependency><groupId ...

  4. SpringBoot集成ActiveMq消息队列实现即时和延迟处理

    原文链接:https://blog.csdn.net/My_harbor/article/details/81328727 一.安装ActiveMq 具体安装步骤:自己谷歌去 二.新建springbo ...

  5. SpringBoot集成Kafka

    SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...

  6. kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制

    一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...

  7. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  8. 19 kafka消息队列

    文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...

  9. springboot集成kafka及kafka web UI的使用

    springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...

最新文章

  1. 面对滚滚而来好政策 安防行业该如何发展
  2. 马云、马斯克同台谈AI:人工智能拯救世界?
  3. 2013\National _C_C++_B\2.连续奇数和
  4. 前端学习(975):bootstrap轮播图
  5. 如何巧妙的申请换部门_如何设置户外广告?市城管局局长体验户外广告审批流程...
  6. C++ new一个数组时,指针移动程序崩溃问题
  7. 手绘线条图画机器人_怎么把照片转换成手绘图画?怎么加水印?
  8. SpringCloud+Docker+Jenkins+GitLab+Maven实现自动化构建与部署实战
  9. 从我的简历再谈测试分类
  10. SQL 已死,但 SQL 将永存!
  11. iScroll框架的修改
  12. java multibox_月光软件站 - 编程文档 - Java - Struts 中 Multibox 的用法
  13. 3dmax烘焙贴图主要设置之Flatiron
  14. 好消息:部分银行磁条卡更换芯片卡
  15. Protocol “https“ not supported or disabled in libcurl
  16. div vue 图片背景_div设置背景(vue设置div的背景图片)
  17. No387FirstUniqueCharacterInAString
  18. Caffe深度学习框架作者贾扬清online内容分享II
  19. Mysql 的自增主键达到最大值,怎么办
  20. 用Hook解决在VC++与C++Builder方面界面设计的一些问题

热门文章

  1. python十进制转换_Python 进制转换
  2. 可视化任务编排拖拉拽 | Scaleph 基于 Apache SeaTunnel的数据集成
  3. 游戏录制,如何实现视频音画同步
  4. i7 10870h和i9 10980hk 有什么区别
  5. 2018年高职组——信息评估与管理赛题(解析)
  6. 周末加餐 使陆地分离的最少天数
  7. 最新章节 第238章 超级计算机的安排,超脑太监(萧舒)_第238章 封神(三更)(2 / 2)_超脑太监最新章节免费阅读无弹窗_努努小说坊...
  8. 【Zbrush教程】怎样用ZBrush去雕刻制作一个维京人,详细讲解教程
  9. 小程序的消息提醒——订阅消息
  10. 01 -Python入门- Lesson1 为何要学习 Python 编程