文章目录

  • 一、基础集成
    • 1. 技术选型
    • 2. 导入依赖
    • 3. kafka配置
    • 4. auto-offset-reset 简述
    • 5. 新增一个订单类
    • 6. 生产者(异步)
    • 7. 消费者
    • 8. kafka配置类
    • 9.单元测试
    • 9. 效果图
    • 10. 源码地址
    • 11.微服务专栏
一、基础集成
1. 技术选型
软件/框架 版本
jdk 1.8.0_202
springboot 2.5.4
kafka server kafka_2.12-2.8.0
kafka client 2.7.1
zookeeper 3.7.0
2. 导入依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3. kafka配置

properties版本

spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000

yml版本项目内部配置

server:port: 8002
spring:application:# 应用名称name: ly-kafkaprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

nacos-config 服务端配置

在这里插入代码片
4. auto-offset-reset 简述

关于
auto.offset.reset 配置有3个值可以设置,分别如下:

earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset时,从头开始消费;
latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据;
none: topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常;
默认建议用 earliest, 设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。

而 latest 这个设置容易丢失消息,假如 kafka 出现问题,还有数据往topic中写,这个时候重启kafka,这个设置会从最新的offset开始消费, 中间出问题的哪些就不管了。

none 这个设置没有用过,兼容性太差,经常出问题。

5. 新增一个订单类

模拟业务系统中,用户每下一笔订单,就发送一个消息,供其他服务消费

package com.gblfy.kafka.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.time.LocalDateTime;@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {/*** 订单id*/private long orderId;/*** 订单号*/private String orderNum;/*** 订单创建时间*/private LocalDateTime createTime;
}
6. 生产者(异步)
package com.gblfy.lykafka.provider;import com.alibaba.fastjson.JSONObject;
import com.gblfy.common.constant.KafkaTopicConstants;
import com.gblfy.common.entity.Order;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.time.LocalDateTime;/*** Kafka生产者** @author gblfy* @date 2021-09-28*/
@Service
public class KafkaProvider {private final static Logger log = LoggerFactory.getLogger(KafkaProvider.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {// 构建一个订单类Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 发送消息,订单类的 json 作为消息体ListenableFuture<SendResult<String, String>> future =kafkaTemplate.send(KafkaTopicConstants.KAFKA_MSG_TOPIC, JSONObject.toJSONString(order));// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("发送消息失败: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{}  ",metadata.topic(), metadata.partition(), metadata.offset());}});}
}
7. 消费者
package com.gblfy.lykafka.controller;import com.gblfy.lykafka.provider.KafkaProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@RestController
@RequestMapping("/kafka")
public class KafkaProviderController {@Autowiredprivate KafkaProvider kafkaProvider;@GetMapping("/sendMQ")public String sendMQContent() {kafkaProvider.sendMessage(0001, "10", LocalDateTime.now());return "OK";}
}

通过 @KafkaListener注解,我们可以指定需要监听的 topic 以及 groupId, 注意,这里的 topics 是个数组,意味着我们可以指定多个 topic,如:@KafkaListener(topics = {“topic-springboot-01”, “topic-springboot-02”}, groupId = “group_id”)。

注意:消息发布者的 TOPIC 需要保持与消费者监听的 TOPIC 一致,否者消费不到消息。

8. kafka配置类
package com.gblfy.common.constant;public class KafkaTopicConstants {//kafka发送消息主题public static final String KAFKA_MSG_TOPIC = "topic-springboot-01";// kafka消费者组需要和yml文件中的 kafka.consumer.group-id的值保持一致public static final String KAFKA_MSG_TOPIC_GROUP = "springboot-consumer-02";
}
9.单元测试

新建单元测试,功能测试消息发布,以及消费。

package com.gblfy.kafka;import com.gblfy.kafka.controller.KafkaProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@SpringBootTest
class KafkaSpringbootApplicationTests {@Autowiredprivate KafkaProvider kafkaProvider;@Testpublic void sendMessage() throws InterruptedException {// 发送 1000 个消息for (int i = 0; i < 1000; i++) {long orderId = i+1;String orderNum = UUID.randomUUID().toString();kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());}TimeUnit.MINUTES.sleep(1);}
}
9. 效果图


10. 源码地址

https://gitee.com/gb_90/kafka-parent

11.微服务专栏

https://gitee.com/gb_90/micro-service-parent

KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)相关推荐

  1. 如何在优雅地Spring 中实现消息的发送和消费

    本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...

  2. 如何在优雅地Spring 中实现消息的发送和消费 1

    本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...

  3. RocketMQ(十三)——实战-普通消息的发送与消费

    文章目录 普通消息 消息发送 同步发送消息 异步发送消息 单向发送消息 代码示例 导入RocketMQ的依赖 定义同步消息的发送者 定义异步消息的发送者 定义消费者 普通消息 消息发送 同步发送消息 ...

  4. Android 手机卫士--解析json与消息机制发送不同类型消息

    本文地址:http://www.cnblogs.com/wuyudong/p/5900800.html,转载请注明源地址. 1.解析json数据 解析json的代码很简单 JSONObject jso ...

  5. kafka发送及消费消息示例

    发送消息: 消费消息:

  6. 异步发送,那消息可靠性怎么保证?

    消息丢失可能发生在生产者发送消息.MQ本身丢失消息.消费者丢失消息3个方面. 生产者丢失 生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消 ...

  7. 查看当前服务器中的所有的topic,创建topic,删除topic,通过shell命令发送消息,通过shell消费消息,查看topic详情,对分区数进行修改

    一. Kafka常用操作命令  查看当前服务器中的所有topic [root@hadoop3 kafka]# bin/kafka-topics.sh --list --zookeeper hadoo ...

  8. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

  9. RocketMQ消息顺序发送和消费问题

    事故现场分析: 由于创新业务产品上线,运营产品想通过一些活动来刺激用户,采用注册邀请机制即可获取积分的相关活动.考虑到后续可能还有其他可能的活动来发放积分,所以设计的时候,采用mq消息模式发放积分,异 ...

最新文章

  1. 达摩院年终预测重磅出炉:AI for Science 高居榜首,2022 十大科技趋势!
  2. Ubuntu Docker安装
  3. (轉貼) Jolt 2007得獎名單 (News) (.NET)
  4. 避免大规模故障的微服务架构设计之道
  5. 两台服务器建立信任关系(root,普通用户)
  6. Linux 入门笔记
  7. python中jupyter notebook 去掉警告
  8. VS2019 OpenCL安装和快速入门
  9. POJ 6184 【三元环 +分治】
  10. A survey on challenges and progresses in blockchain technologies区块链综述
  11. Go 使用consul服务治理 rpc通讯
  12. python分析红楼梦中人物形象_红楼梦中的人物形象及其性格特点
  13. fpga实现dds和混频器
  14. 阿里云拨测:主动探测Web应用质量,助力提升用户体验
  15. 怎么样查看视图+mysql_如何查看视图的sql语句
  16. linux桌面入口文件(.desktop)规范
  17. 所见即所得的 markdown 编辑器:Typora
  18. 浅谈标签传播算法LPA
  19. Trigger触发器常见问题
  20. 【自动驾驶】自动驾驶涉及的知识概览

热门文章

  1. 最高201万!华为高薪招应届生!专业是...
  2. 隐私和网络安全将是未来科技发展的屏障
  3. 自称迪拜十星级酒店,震撼了!
  4. 这些有笑点的故事,只有程序员才能get
  5. java中正则表达式
  6. 阿里云马涛:因云进化的基础软件
  7. Apache Flink 在汽车之家的应用与实践
  8. mPaaS小程序技术架构深度解析
  9. 支撑数千家天猫商家CRM业务,数云高弹性数据库如何做
  10. 友盟+联合EB级云数据 实现友盟域和企业私域数据全面融合