KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)
文章目录
- 1. 技术选型
- 2. 导入依赖
- 3. kafka配置
- 4. 生产者(同步)
- 5. 生产者(异步)
- 6. 消费者
1. 技术选型
软件/框架 | 版本 |
---|---|
jdk | 1.8.0_202 |
springboot | 2.5.4 |
kafka server | kafka_2.12-2.8.0 |
kafka client | 2.7.1 |
zookeeper | 3.7.0 |
2. 导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3. kafka配置
properties版本
spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000
yml版本
server:port: 8080
spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.104:9092consumer:auto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truegroup-id: springboot-consumer-02key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
4. 生产者(同步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController
public class KafkaSyncController {private final static Logger log = LoggerFactory.getLogger(KafkaSyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/send/sync/{message}")public String send(@PathVariable String message) {//同步发送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 0, message);try {SendResult<Integer, String> sendResult = future.get();RecordMetadata metadata = sendResult.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());// System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}}
5. 生产者(异步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaAsyncController {private final static Logger log = LoggerFactory.getLogger(KafkaAsyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;//设置回调函数,异步等待broker端的返回结束@RequestMapping("/send/async/{message}")public String sendAsync(@PathVariable String message) {//同步发送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 1, message);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("发送消息失败: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());}});return "success";}
}
6. 消费者
package com.gblfy.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {private final static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = {"topic-springboot-01"})public void onMessage(ConsumerRecord<Integer, String> record) {log.info("消费者接收到消息主题:{} ,消息的分区:{} ,消息偏移量:{} ,消息key: {} ,消息values:{} ",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}
KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)相关推荐
- kafka:消息发送以及消费的过程
摘要 kafka的存储消息,生产者发送消息,消费者消费消息.这些看起来简单,但实际细想,会有很多问题需要解决:消息是单个单个发送还是批量发送?broker的主题里一有消息就立即推送给消费者吗?生产者的 ...
- rocketmq 消息 自定义_RocketMQ的消息发送及消费
RocketMQ消息支持的模式: 消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay. 消息同步发送: 普通消息的发送和接收在前面已经演示过了,在前面的案例中 ...
- KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)
文章目录 一.基础集成 1. 技术选型 2. 导入依赖 3. kafka配置 4. auto-offset-reset 简述 5. 新增一个订单类 6. 生产者(异步) 7. 消费者 8. kafka ...
- 《SpringBoot2.0 实战》系列-整合kafka实现消息发送、消费
之前写过一篇关于ActiveMq的博文,有兴趣的小伙伴可以点击查看.但是ActiveMq总体性能表现一般,如果对消息队列性能要求较高,业务量较大,那我们需要重新考量.本文就介绍一款性能高的消息队列- ...
- kafka发送及消费消息示例
发送消息: 消费消息:
- RocketMQ消息发送及消费的基本原理
这是一个比较宏观的部署架构图,rocketmq天然支持高可用,它可以支持多主多从的部署架构,这也是和kafka最大的区别之一 原因是RocketMQ中并没有master选举功能,所以通过配置多个mas ...
- kafka Java客户端之 consumer API 消费消息
背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...
- Kafka的坑: 消费者无法消费消息
问题:今天使用Kafka做一个小DEMO,但运行的时候,可以在IDEA控制台上看到生产者发送的消息,无法在IDEA看到消费者在消费消息,但通过连接Linux在命令行可以看到消费者消费的消息. 生产者应 ...
- RabbitMQ如何保证消息发送、消费成功
好记忆不如烂笔头,能记下点东西,就记下点,有时间拿出来看看,也会发觉不一样的感受. 目录 1.发送确认机制设置 2.消息丢失.非信任或失败 3.消息重复消费 4.消费成功通知 5.总结 消息因为其:削 ...
最新文章
- Javascript 滑动效果菜单 TreeView [Javascript]
- hive查询where join_Hive系列(4):常用函数where,join
- Enabling HierarchyViewer on Rooted Android Devices
- matplotlb.finance导包报错——ModuleNotFoundError No module named mpl finance
- python随机画散点图-Python使用Plotly绘图工具,绘制散点图、线形图
- 最佳实践丨三种典型场景下的云上虚拟IDC(私有池)选购指南
- Flickr 的开发者的 Web 应用优化技巧(转)
- BAT-批量改文件后缀名
- antlr 4.7.1_新ANTLR 4.6的重要更改
- [react] 写出React动态改变class切换组件样式
- [css] box-sizing常用的属性有哪些?分别有什么作用?
- 使用SharpZipLib.dll压缩zip
- 鸿蒙系统的变化,鸿蒙系统没变化的背后
- Bailian2952 循环数【数学】
- 安卓listview点击空白事件_王者荣耀安卓苹果ios改空白名;重复名字特殊昵称教程...
- NanoPC-T4 RK3399和PC有线本地网络传输摄像头视频python
- 单片机驱动DM9000网卡芯片
- 【script】python 调用阿里云解析 DNS API 实现 DDNS(动态域名解析)
- 十进制与二进制转换(负数+正数)
- JQuery解析二维码
热门文章
- 张文宏又爆“金句”:上班开会,要和关系最差的人坐一起……
- 智商145!比利时神童9岁读完大学,成史上最年轻大学毕业生
- 女博士7年不毕业,她破解了“量子计算最基础问题”
- 百度2015校园招聘软件开发笔试题及答案
- CDH6.x Solr7.x 集成 Ik 分词
- MySQL8.0.17 - 初探 Clone Plugin
- 如何把创建ECS(CreateInstance)作为触发器来触发函数计算
- “阿里巴巴小程序繁星计划”:20亿扶持200万小程序开发者和100万商家
- 阿里开发者们的第16个感悟:让阅读源码成为习惯
- 阿里云新推出 HiTSDB + IoT套件 物联网设备上云步入快车道