从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例
从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例
- 前言
- 加依赖
- 生产者
- 加配置
- 生产者代码示例
- 消费者
- 加配置
- 消费者监听器示例
- 调用
- 关于 Serializer 序列化
- 序列化问题
- 有哪些序列化类型?
- 解决方案
前言
关于 Kafka
的介绍就不多写了,网上很多,大部分也都是复制官方文档的,看完就会对 Kafka
有基本的认识
Kafka 官方简介,包含使用用途等
加依赖
生产者与消费者的依赖是一样的
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
生产者
加配置
spring:kafka:# 在发出请求时传递给服务器的 id 字符串(用户自定义)# 这个不起眼的参数可以让我们知道,kafka 的每次调用来自哪个应用client-id: kafka-demo# 连接 kafka 服务器,集群多个用逗号隔开bootstrap-servers: 118.25.215.105:9092# 生产者配置producer:# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,# 当 retris 为 0 时,produce 不会重复。retirs 重发,此时 replica 节点完全成为 leader 节点,不会产生消息丢失。# 默认 0retries: 0# 每次批量发送消息的数量,produce积累到一定数据,一次发送# 加大批处理,能提高性能,提高吞吐量,但是会增大内存的消耗,减小相反# 默认 16384batch-size: 16384# 用来缓冲等待被发送到服务器的记录的总字节数# 缓存大小达到buffer.memory就发送数据# 默认 33554432buffer-memory: 33554432# 此配置是 Producer 在确认一个请求发送完成之前需要收到的反馈信息的数量。 这个参数是为了保证发送请求的可靠性。# 0:如果设置为0,则 producer 不会等待服务器的反馈。添加到缓存后就当发送了,后续服务器是否请求无法保证,重试也不会生效# 1:leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。 follower 节点复制数据完成之前产生错误,则这条记录会丢失。# all:意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1与acks=all是等效的。# 默认 1acks: all# 指定消息key和消息体的编解码方式# 默认都是 org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
生产者代码示例
/*** 生产者代码示例* @author: linjinp* @create: 2020-04-30 16:47**/
@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;/*** 发送消息* @return*/@GetMapping("/send")public ErrorMsg send() {byte[] arr = new byte[10];kafkaTemplate.send("test", JSONArray.toJSONString(arr));return ErrorMsg.SUCCESS;}
}
消费者
加配置
可以看到,与生产者序列化 StringSerializer
相对的,消费者则选择对应的 StringDeserializer
因为我生产者与消费者在一个 Demo
里,所以 spring.kafka.client-id
是一样的
spring:kafka:# 在发出请求时传递给服务器的 id 字符串(用户自定义)# 这个不起眼的参数可以让我们知道,kafka 的每次调用来自哪个应用client-id: kafka-demo# 连接 kafka 服务器,集群多个用逗号隔开bootstrap-servers: 118.25.215.105:9092consumer:# 组 ID,在kafka中,同一组中的consumer不会读取到同一个消息# 举个例子,容器部署高可用,即 1 个应用部署多个容器实例# 这时,这几个应用的组 ID 都是一样的,同一条数据就不会被消费多次group-id: kafka-demo-1# 消费者消费数据的偏移量# earliest:当各分区下有已提交的offset时,从提交的 offset 开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的 offset 开始消费;无提交的offset时,消费新产生的该分区下的数据# none:当各分区都存在已提交的offset时,从 offset 后开始消费;只要有一个分区不存在已提交的offset,则抛出异常## 这个看业务场景吧,偏移量没有,差不多意思就是数据丢了,不知道读到哪了,像是发送短信通知这种的,latest 就行# 但是像处理一些重要的业务数据,比如财务之类的,如果丢了只能重头算,主要看数据的价值把,这时候就要用 earliest# 默认 latestauto-offset-reset: latest# 自动提交 offsetenable-auto-commit: true# 指定消息key和消息体的编解码方式# 默认都是 org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
消费者监听器示例
与生产者不同的,生产者需要用户主动去调用传递消息,消费者则是通过监听器,收到消息后消费数据
@Component: 将对象注册成组件
@KafkaListener: 把方法注册成 Kafka
监听器,并且指定监听的 Topic
,这里监听 Topic
为 test
/*** 消费者监听器* @author: linjinp* @create: 2020-04-30 17:31**/
@Component
public class KafkaConsumerListener {/*** 发送消息* @param message 内容* @return*/@KafkaListener(topics = "test")public void getMessage(String message) {byte[] arr = JSONArray.parseObject(message, byte[].class);System.out.println(new Date() + ":" + arr);}
}
调用
先是浏览器调用生产者发送数据
生产者向 Topic
为 test
,发送了一个二进制数组,只是我将数组转为了 Json
PS:如果你 Topic 不存在,需要先创建
发送后,我们看下消费者那监听到了生成者发送的数据
然后将 Json
转回了数组
关于 Serializer 序列化
序列化问题
在 Kafka 中,数据传输时,生产者对应的数据都需要采用对应的序列化方式,消费者也需要根据生产者采用对应的反序列化。
配置的 key-serializer
与 value-serializer
为 org.apache.kafka.common.serialization.StringSerializer
即通过字符串方式序列化
这时候如果你 send
是一个二进制就会报序列化错误,如图:
他提醒你这个序列化不成功,这就是序列化类型不匹配导致的,应该使用 ByteArraySerializer
再用 Double
类型试下
结果
同理,应该使用 DoubleSerializer
进行序列化
有哪些序列化类型?
这个可以看 org.apache.kafka.common.serialization
包下
解决方案
可以看出来,业务系统中类型很多,我们如果每种类型都需要对其专门序列化,就会很复杂,更何况还有对象。
因此我的解决方案就是都是用默认的 StringSerializer
,然后我们通过将数据转为 Json,然后在消费者也对应类型转回来即可,就像我例子上的那样。
从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例相关推荐
- springboot集成kafka及kafka web UI的使用
springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...
- SpringBoot集成Kafka
SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...
- SpringBoot 集成 kafka,基于注解批量消费设置
网上关于SpringBoot 集成kafka的批量消费功能需要手动创建类(这篇文章不错:[弄nèng - Kafka]应用篇(三) -- Springboot整合Kafka(批量消费)_司马缸砸缸了- ...
- SpringBoot集成Kafka消息队列
1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...
- 【无废话】SpringBoot集成Kafka消息队列
0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...
- springboot集成kafka消费手动启动停止
项目场景: 在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 解决分析 KafkaList ...
- SpringBoot集成Kafka低版本和高版本
SpringBoot集成Kafka低版本和高版本 说明 地址 低版本SpringBoot集成Kafka代码 代码 kafka生产者配置 kafka消费者配置 发送消息给kafka的Controller ...
- SpringBoot集成kafka全面实战
本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章 ...
- kafka查看broker上主副本_Kafka基础(一):基本概念及生产者、消费者示例
本文章大部分内容均摘自 朱忠华老师的<深入理解Kafka:核心设计与实践原理>,也特别推荐广大读者购买阅读. 一.概述 1. 简介 Kafka 起初是由 LinkedIn 公司采用 Sca ...
最新文章
- 深度学习有哪些经典数据集?
- 读这部IT小说时,我有一种看英雄大片的莫名感动
- linux下压缩并分割稀疏文件
- 使用Java创建内存泄漏
- 模拟器中文输入法设置
- 简单java题_java
- jdk 9和jdk8_了解有关JDK9紧凑弦乐的信息(视频评论Charlie Hunt)
- socket read time out解决方法_time_after方法对jiffies回绕问题的解决
- 2017.10.19 大陆争霸 思考记录
- js 跳转到 百度指定地址定位点
- 《统计学习方法》—— 聚类方法(层次聚类和K均值聚类)
- python中换行的转义字符_Python语言中表示换行的转义字符是____________。(2.5分)_学小易找答案...
- 在谷歌下面打开之后显示金山毒霸页面
- 益聚星荣:一文看懂,为什么有的投资人讨厌元宇宙,有的却爱死它了
- ansiblea基本使用
- java:编写一个程序完成:完成在银行的存款和取款操作。在定义银行类时,若取款数大于余额则作为异常处理。
- 鸿蒙pc系统镜像,鸿蒙系统有pc版么_鸿蒙系统有pc版安装方法
- 数据分析报告怎么写?
- 【动态系统的建模与分析】一阶系统的单位阶跃响应+时间常数-笔记
- iview表格中,鼠标滑过单元格展示提示信息