文章目录

  • 1. 技术选型
  • 2. 导入依赖
  • 3. kafka配置
  • 4. 生产者(同步)
  • 5. 生产者(异步)
  • 6. 消费者
1. 技术选型
软件/框架 版本
jdk 1.8.0_202
springboot 2.5.4
kafka server kafka_2.12-2.8.0
kafka client 2.7.1
zookeeper 3.7.0
2. 导入依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3. kafka配置

properties版本

spring.application.name=springboot-kafka
server.port=8080
# kafka 配置
spring.kafka.bootstrap-servers=node1:9092# producer 配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 生产者每个批次最多方多少条记录
spring.kafka.producer.batch-size=16384
# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024
spring.kafka.producer.buffer-memory=33544432# consumer 配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer-02
# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
# 消费者偏移量自动提交时间间隔
spring.kafka.consumer.auto-commit-interval=1000

yml版本

server:port: 8080
spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.104:9092consumer:auto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truegroup-id: springboot-consumer-02key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
4. 生产者(同步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController
public class KafkaSyncController {private final static Logger log = LoggerFactory.getLogger(KafkaSyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/send/sync/{message}")public String send(@PathVariable String message) {//同步发送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 0, message);try {SendResult<Integer, String> sendResult = future.get();RecordMetadata metadata = sendResult.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());// System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}}
5. 生产者(异步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaAsyncController {private final static Logger log = LoggerFactory.getLogger(KafkaAsyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;//设置回调函数,异步等待broker端的返回结束@RequestMapping("/send/async/{message}")public String sendAsync(@PathVariable String message) {//同步发送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 1, message);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("发送消息失败: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());}});return "success";}
}
6. 消费者
package com.gblfy.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {private final static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = {"topic-springboot-01"})public void onMessage(ConsumerRecord<Integer, String> record) {log.info("消费者接收到消息主题:{} ,消息的分区:{} ,消息偏移量:{}  ,消息key: {}  ,消息values:{} ",record.topic(), record.partition(), record.offset(), record.key(), record.value());}
}

KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)相关推荐

  1. kafka:消息发送以及消费的过程

    摘要 kafka的存储消息,生产者发送消息,消费者消费消息.这些看起来简单,但实际细想,会有很多问题需要解决:消息是单个单个发送还是批量发送?broker的主题里一有消息就立即推送给消费者吗?生产者的 ...

  2. rocketmq 消息 自定义_RocketMQ的消息发送及消费

    RocketMQ消息支持的模式: 消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay. 消息同步发送: 普通消息的发送和接收在前面已经演示过了,在前面的案例中 ...

  3. KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

    文章目录 一.基础集成 1. 技术选型 2. 导入依赖 3. kafka配置 4. auto-offset-reset 简述 5. 新增一个订单类 6. 生产者(异步) 7. 消费者 8. kafka ...

  4. 《SpringBoot2.0 实战》系列-整合kafka实现消息发送、消费

    之前写过一篇关于ActiveMq的博文,有兴趣的小伙伴可以点击查看.但是ActiveMq总体性能表现一般,如果对消息队列性能要求较高,业务量较大,那我们需要重新考量.本文就介绍一款性能高的消息队列- ...

  5. kafka发送及消费消息示例

    发送消息: 消费消息:

  6. RocketMQ消息发送及消费的基本原理

    这是一个比较宏观的部署架构图,rocketmq天然支持高可用,它可以支持多主多从的部署架构,这也是和kafka最大的区别之一 原因是RocketMQ中并没有master选举功能,所以通过配置多个mas ...

  7. kafka Java客户端之 consumer API 消费消息

    背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...

  8. Kafka的坑: 消费者无法消费消息

    问题:今天使用Kafka做一个小DEMO,但运行的时候,可以在IDEA控制台上看到生产者发送的消息,无法在IDEA看到消费者在消费消息,但通过连接Linux在命令行可以看到消费者消费的消息. 生产者应 ...

  9. RabbitMQ如何保证消息发送、消费成功

    好记忆不如烂笔头,能记下点东西,就记下点,有时间拿出来看看,也会发觉不一样的感受. 目录 1.发送确认机制设置 2.消息丢失.非信任或失败 3.消息重复消费 4.消费成功通知 5.总结 消息因为其:削 ...

最新文章

  1. Javascript 滑动效果菜单 TreeView [Javascript]
  2. hive查询where join_Hive系列(4):常用函数where,join
  3. Enabling HierarchyViewer on Rooted Android Devices
  4. matplotlb.finance导包报错——ModuleNotFoundError No module named mpl finance
  5. python随机画散点图-Python使用Plotly绘图工具,绘制散点图、线形图
  6. 最佳实践丨三种典型场景下的云上虚拟IDC(私有池)选购指南
  7. Flickr 的开发者的 Web 应用优化技巧(转)
  8. BAT-批量改文件后缀名
  9. antlr 4.7.1_新ANTLR 4.6的重要更改
  10. [react] 写出React动态改变class切换组件样式
  11. [css] box-sizing常用的属性有哪些?分别有什么作用?
  12. 使用SharpZipLib.dll压缩zip
  13. 鸿蒙系统的变化,鸿蒙系统没变化的背后
  14. Bailian2952 循环数【数学】
  15. 安卓listview点击空白事件_王者荣耀安卓苹果ios改空白名;重复名字特殊昵称教程...
  16. NanoPC-T4 RK3399和PC有线本地网络传输摄像头视频python
  17. 单片机驱动DM9000网卡芯片
  18. 【script】python 调用阿里云解析 DNS API 实现 DDNS(动态域名解析)
  19. 十进制与二进制转换(负数+正数)
  20. JQuery解析二维码

热门文章

  1. 张文宏又爆“金句”:上班开会,要和关系最差的人坐一起……
  2. 智商145!比利时神童9岁读完大学,成史上最年轻大学毕业生
  3. 女博士7年不毕业,她破解了“量子计算最基础问题”
  4. 百度2015校园招聘软件开发笔试题及答案
  5. CDH6.x Solr7.x 集成 Ik 分词
  6. MySQL8.0.17 - 初探 Clone Plugin
  7. 如何把创建ECS(CreateInstance)作为触发器来触发函数计算
  8. “阿里巴巴小程序繁星计划”:20亿扶持200万小程序开发者和100万商家
  9. 阿里开发者们的第16个感悟:让阅读源码成为习惯
  10. 阿里云新推出 HiTSDB + IoT套件 物联网设备上云步入快车道