KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)
文章目录
- 一、基础集成
- 1. 技术选型
- 2. 导入依赖
- 3. kafka配置
- 4. auto-offset-reset 简述
- 5. 新增一个订单类
- 6. 生产者(异步)
- 7. 消费者
- 8. kafka配置类
- 9.单元测试
- 9. 效果图
- 10. 源码地址
- 11.微服务专栏
一、基础集成
1. 技术选型
软件/框架 | 版本 |
---|---|
jdk | 1.8.0_202 |
springboot | 2.5.4 |
kafka server | kafka_2.12-2.8.0 |
kafka client | 2.7.1 |
zookeeper | 3.7.0 |
2. 导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3. kafka配置
properties版本
spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000
yml版本项目内部配置
server:port: 8002
spring:application:# 应用名称name: ly-kafkaprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
nacos-config 服务端配置
在这里插入代码片
4. auto-offset-reset 简述
关于
auto.offset.reset 配置有3个值可以设置,分别如下:
earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset时,从头开始消费;
latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据;
none: topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常;
默认建议用 earliest, 设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。
而 latest 这个设置容易丢失消息,假如 kafka 出现问题,还有数据往topic中写,这个时候重启kafka,这个设置会从最新的offset开始消费, 中间出问题的哪些就不管了。
none 这个设置没有用过,兼容性太差,经常出问题。
5. 新增一个订单类
模拟业务系统中,用户每下一笔订单,就发送一个消息,供其他服务消费
package com.gblfy.kafka.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.time.LocalDateTime;@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {/*** 订单id*/private long orderId;/*** 订单号*/private String orderNum;/*** 订单创建时间*/private LocalDateTime createTime;
}
6. 生产者(异步)
package com.gblfy.lykafka.provider;import com.alibaba.fastjson.JSONObject;
import com.gblfy.common.constant.KafkaTopicConstants;
import com.gblfy.common.entity.Order;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.time.LocalDateTime;/*** Kafka生产者** @author gblfy* @date 2021-09-28*/
@Service
public class KafkaProvider {private final static Logger log = LoggerFactory.getLogger(KafkaProvider.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {// 构建一个订单类Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 发送消息,订单类的 json 作为消息体ListenableFuture<SendResult<String, String>> future =kafkaTemplate.send(KafkaTopicConstants.KAFKA_MSG_TOPIC, JSONObject.toJSONString(order));// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("发送消息失败: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ",metadata.topic(), metadata.partition(), metadata.offset());}});}
}
7. 消费者
package com.gblfy.lykafka.controller;import com.gblfy.lykafka.provider.KafkaProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@RestController
@RequestMapping("/kafka")
public class KafkaProviderController {@Autowiredprivate KafkaProvider kafkaProvider;@GetMapping("/sendMQ")public String sendMQContent() {kafkaProvider.sendMessage(0001, "10", LocalDateTime.now());return "OK";}
}
通过 @KafkaListener注解,我们可以指定需要监听的 topic 以及 groupId, 注意,这里的 topics 是个数组,意味着我们可以指定多个 topic,如:@KafkaListener(topics = {“topic-springboot-01”, “topic-springboot-02”}, groupId = “group_id”)。
注意:消息发布者的 TOPIC 需要保持与消费者监听的 TOPIC 一致,否者消费不到消息。
8. kafka配置类
package com.gblfy.common.constant;public class KafkaTopicConstants {//kafka发送消息主题public static final String KAFKA_MSG_TOPIC = "topic-springboot-01";// kafka消费者组需要和yml文件中的 kafka.consumer.group-id的值保持一致public static final String KAFKA_MSG_TOPIC_GROUP = "springboot-consumer-02";
}
9.单元测试
新建单元测试,功能测试消息发布,以及消费。
package com.gblfy.kafka;import com.gblfy.kafka.controller.KafkaProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@SpringBootTest
class KafkaSpringbootApplicationTests {@Autowiredprivate KafkaProvider kafkaProvider;@Testpublic void sendMessage() throws InterruptedException {// 发送 1000 个消息for (int i = 0; i < 1000; i++) {long orderId = i+1;String orderNum = UUID.randomUUID().toString();kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());}TimeUnit.MINUTES.sleep(1);}
}
9. 效果图
10. 源码地址
https://gitee.com/gb_90/kafka-parent
11.微服务专栏
https://gitee.com/gb_90/micro-service-parent
KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)相关推荐
- 如何在优雅地Spring 中实现消息的发送和消费
本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...
- 如何在优雅地Spring 中实现消息的发送和消费 1
本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...
- RocketMQ(十三)——实战-普通消息的发送与消费
文章目录 普通消息 消息发送 同步发送消息 异步发送消息 单向发送消息 代码示例 导入RocketMQ的依赖 定义同步消息的发送者 定义异步消息的发送者 定义消费者 普通消息 消息发送 同步发送消息 ...
- Android 手机卫士--解析json与消息机制发送不同类型消息
本文地址:http://www.cnblogs.com/wuyudong/p/5900800.html,转载请注明源地址. 1.解析json数据 解析json的代码很简单 JSONObject jso ...
- kafka发送及消费消息示例
发送消息: 消费消息:
- 异步发送,那消息可靠性怎么保证?
消息丢失可能发生在生产者发送消息.MQ本身丢失消息.消费者丢失消息3个方面. 生产者丢失 生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消 ...
- 查看当前服务器中的所有的topic,创建topic,删除topic,通过shell命令发送消息,通过shell消费消息,查看topic详情,对分区数进行修改
一. Kafka常用操作命令 查看当前服务器中的所有topic [root@hadoop3 kafka]# bin/kafka-topics.sh --list --zookeeper hadoo ...
- kafka Java客户端之 consumer API 多线程消费消息
kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...
- RocketMQ消息顺序发送和消费问题
事故现场分析: 由于创新业务产品上线,运营产品想通过一些活动来刺激用户,采用注册邀请机制即可获取积分的相关活动.考虑到后续可能还有其他可能的活动来发放积分,所以设计的时候,采用mq消息模式发放积分,异 ...
最新文章
- 达摩院年终预测重磅出炉:AI for Science 高居榜首,2022 十大科技趋势!
- Ubuntu Docker安装
- (轉貼) Jolt 2007得獎名單 (News) (.NET)
- 避免大规模故障的微服务架构设计之道
- 两台服务器建立信任关系(root,普通用户)
- Linux 入门笔记
- python中jupyter notebook 去掉警告
- VS2019 OpenCL安装和快速入门
- POJ 6184 【三元环 +分治】
- A survey on challenges and progresses in blockchain technologies区块链综述
- Go 使用consul服务治理 rpc通讯
- python分析红楼梦中人物形象_红楼梦中的人物形象及其性格特点
- fpga实现dds和混频器
- 阿里云拨测:主动探测Web应用质量,助力提升用户体验
- 怎么样查看视图+mysql_如何查看视图的sql语句
- linux桌面入口文件(.desktop)规范
- 所见即所得的 markdown 编辑器:Typora
- 浅谈标签传播算法LPA
- Trigger触发器常见问题
- 【自动驾驶】自动驾驶涉及的知识概览