主要是记录下 SpringBoot 如何集成 Kafka,完成消息队列的使用,代码包括 Json 序列化消息,生产者,消费者,配置文件。

1、maven 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-json</artifactId>
</dependency>

2、定义序列化消息

public class OrderMsg{public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getAccountId() {return accountId;}public void setAccountId(String accountId) {this.accountId = accountId;}private String orderId;private String accountId;public OrderMsg() {}public OrderMsg(String orderId, String accountId) {this.orderId = orderId;this.accountId = accountId;}@Overridepublic String toString() {return "OrderMsg{" +"orderId='" + orderId + '\'' +", accountId='" + accountId + '\'' +'}';}
}

3、定义消息消费者

用于消费主题topic01,这个主题已在Kafka集群搭建的时候创建,这里直接消费;一共定义了两个消费者-consumer01和consumer02,属于orderGroup消费组,用于分担消费topic01主题3个分区里的消息。

@Component
public class OrderConsumer {/*** topic01的消费者,orderGroup消费者组,一共两个消费者* @param msg*/@KafkaListener(id="consumer01", groupId = "orderGroup", topics = {"topic01"})public void processOrder1(ConsumerRecord<String, String> msg) {System.out.println("consumer01-" +"topic:" + msg.topic() +";partition:" + msg.partition() +";key:" + msg.key() +";value:" + msg.value());}@KafkaListener(id="consumer02", groupId = "orderGroup", topics = {"topic01"})public void processOrder2(ConsumerRecord<String, String> msg) {System.out.println("consumer02-" +"topic:" + msg.topic() +";partition:" + msg.partition() +";key:" + msg.key() +";value:" + msg.value());}

}

4、引入KafkaTemplate 来生产消息

@RestController
@RequestMapping("kafka")
public class OrderController {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/order/{message}")
public void sendMessage(@PathVariable("message") String message) {System.out.println(message);    for (int i = 1; i < 10; i++) {kafkaTemplate.send("topic01", i+"", i+"-"+message);}kafkaTemplate.flush();
}

}

5、application.yml 配置文件

server:port: 8888
spring:kafka:#Kafka集群bootstrap-servers: 192.168.216.118:9092,192.168.216.128:9092,192.168.216.138:9092producer:acks: all# 重试次数retries: 2# 批量大小batch-size: 16384# 生产端缓冲区大小buffer-memory: 33554432# Kafka提供的序列化和反序列化类key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.springframework.kafka.support.serializer.JsonSerializer#开启事务# transaction-id-prefix: transaction-id-# 设置提交延时# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka# linger.ms为0表示每接收到一条消息就交给kafkaproperties:enable:idempotence: truelinger:ms: 0consumer:# 是否自动提交offsetenable-auto-commit: true# 提交offset延时(接收到消息后多久提交offset)auto-commit-interval: 100ms# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latest# 默认的消费组IDgroup-id: group01# 消费端key反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消费端value反序列化value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 批量消费每次最多消费条目数max-poll-records: 10properties:# 消费会话超时时间session:timeout:ms: 120000heartbeat:timeout:ms: 1000# 消费请求超时时间request:timeout:ms: 120000spring:json:trusted:packages: com.studyplan.mq.kafka.bean#isolation:#level: read_committedlistener:# 关闭监听topic不存在的话项目启动报错missing-topics-fatal: false# 设置为批量消费# type: batch6、浏览器输入

http://localhost:8888/kafka/order/world 进行测试,观察控制台输出。

Kafka 消息序列化反序列化相关推荐

  1. Kafka消息序列化和反序列化(下)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  2. Kafka消息序列化和反序列化(上)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  3. SpringBoot 自定义Kafka消息序列化和反序列化

    1. 概述 Kafka传输自定义的DTO对象时,不能像平时一样使用StringSerializer和StringDeserializer.这种情况需要自己实现对应DTO的序列化器和反序列化器.假设现在 ...

  4. 7. kafka序列化反序列化

    序列化 kafka序列化消息是在生产端,序列化后,消息才能网络传输.而构造KafkaProducer代码如下: Properties props = new Properties(); props.p ...

  5. 10、Kafka 消息订阅系统

    1.Kafka 简介 Kafka 是一个高吞吐.分布式.基于发布订阅的消息系统,利用 Kafka 技术可在廉价 PCServer 上搭建起大规模消息系统. Kafka 和其他组件比较,具有消息持久化. ...

  6. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  7. 【无废话】SpringBoot集成Kafka消息队列

    0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...

  8. linux查看kafka队列消息,Kafka消息队列-从开始到上线

    运行环境 操作系统:Windows 10 : Linux发行版:CentOS Linux release 7.6.1810 (Core) JDK版本:1.8.0_221 说在前面 kafka作为开源的 ...

  9. kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制

    一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...

最新文章

  1. 同步与异步,阻塞与非阻塞的区别
  2. php企业网站源码安装教程,PHPSCUP企业建站系统v1.4 安装图文教程
  3. Python多线程详解
  4. mysql5.7配置用户名密码_Druid拦截功能的配置与简单绕过
  5. SystemKit 系统分析工具
  6. chrome调试js的小技巧
  7. R语言之离群点检验(part2)--局部离群点因子LOF检验
  8. js实现阶乘算法的三种方法
  9. python把图片存放到数据库_使用Python把图片存入数据库
  10. c语言猜字游戏中期报告,c语言小程序:编写猜字游戏
  11. sql分区表上创建索引_SQL Server中分区表和索引的选项
  12. inject 响应式_Vue 3 组合式 provide/inject
  13. 虚拟化平台cloudstack(7)——新版本的调试
  14. uni-app 使用高德地图
  15. 单片机跑马灯程序c语言,用单片机编写几种跑马灯程序
  16. Maven的依赖(Dependency)
  17. 专门查英语单词的软件_查英语单词的软件_有道翻译
  18. [项目管理] 如何评估工作量
  19. 嵌入式Linux系统的指纹识别,嵌入式指纹识别系统设计
  20. Android 下拉刷新控件

热门文章

  1. Intel 64/x86_64/IA-32/x86处理器 - 指令格式(1) - 概述
  2. Intel 64/x86_64/IA-32/x86处理器 - 通用指令(9/E) - 比特位操控指令(BMI1 BMI2)
  3. C#人脸识别入门篇--提取人脸特征值及人脸识别
  4. 李群与李代数2:李代数求导和李群扰动模型
  5. C++:求第k小的数
  6. flutter从0到1构建大前端应用 pdf_推荐前端热门GitHub代码库「值得收藏」
  7. UIButton的几种触发方式
  8. mysql之 OPTIMIZE TABLE整理碎片
  9. SpringCloud 微服务 (十五) 服务容错 Hystrix
  10. java获得时间和linux系统时间不一致