本文来说下SpringBoot整合kafka之kafka分区实战

文章目录

  • 准备工作
  • 程序代码
  • 程序测试
  • 本文小结

准备工作

当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send(“topic1”, normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下

初始化配置信息

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaInitialConfiguration {/**** 创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1* 通过bean创建(bean的名字为initialTopic)* @return*/@Beanpublic NewTopic initialTopic() {return new NewTopic("topic.quick.initial",8, (short) 1 );}/*** 此种@Bean的方式,如果topic的名字相同,那么会覆盖以前的那个* //修改后|分区数量会变成11个 注意分区数量只能增加不能减少* @return*/@Beanpublic NewTopic initialTopic2() {return new NewTopic("topic.quick.initial",11, (short) 1 );}}

程序代码

生产者

@Slf4j
@RestController
@RequestMapping("/api/kafka")
@Api(tags = "kafka测试开发")
public class KafkaController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@Resource(name = "initialTopic")private NewTopic newTopic;@GetMapping("/callbackOne")@ApiOperation(value = "带回调的生产者")public void sendMessage2(@RequestParam("message") @ApiParam(value="消息",required = true) String callbackMessage) {log.info("========================================>>>");log.info(newTopic.name());// 带回调的生产者kafkaTemplate.send(newTopic.name(), callbackMessage).addCallback(success -> {// 消息发送到的topicString topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();log.info("发送消息成功:" + topic + "-" + partition + "-" + offset + "-" + callbackMessage);}, failure -> {log.info("发送消息失败:" + failure.getMessage());});}}

消费者

@Component
@Slf4j
public class KafkaConsumer {// 消费监听@KafkaListener(topics = {"topic.quick.initial"})public void onMessage1(ConsumerRecord<?, ?> record){// 消费的哪个topic、partition的消息,打印出消息内容log.info("==============================================>");StringBuffer sb = new StringBuffer();// 主题sb.append(record.topic() + "-");// 分区sb.append(record.partition() + "-");// 需要消费的值sb.append(record.value() + "-");// 位移sb.append(record.offset());log.info("消费者进行消费:"+ sb);}
}

程序测试

使用swagger来进行程序测试


本文小结

本文简单进行了SpringBoot整合kafka之kafka分区实战。

SpringBoot整合kafka之kafka分区实战相关推荐

  1. 搭建大型分布式服务(二十二)SpringBoot 如何优雅地整合多个kafka数据源?

    系列文章目录 文章目录 系列文章目录 前言 一.本文要点 二.开发环境 三.创建项目 四.修改项目 五.测试一下 六.小结 前言 在日常开发当中,经常会遇到需要消费的topic不在同一个kafka集群 ...

  2. 【七】springboot整合redis(超详细)

    springboot篇章整体栏目: [一]springboot整合swagger(超详细 [二]springboot整合swagger(自定义)(超详细) [三]springboot整合token(超 ...

  3. SpringBoot整合kafka实战之带回调的生产者

    本文来说下SpringBoot整合kafka部分知识内容 文章目录 带回调的生产者 方式一 方式二 本文小结 带回调的生产者 前面我们说了简单的生产和消费,本文说下带回调的生产者.kafkaTempl ...

  4. springboot 整合 kafka demo 顺便看一下源码

    大家好,我是烤鸭: 今天分享下 springboot 整合 kafka. 1.  环境参数: windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + spring ...

  5. kafka自定义分区实战

    本文来说下kafka自定义分区相关的知识与内容,同时说下springboot整合kafka如何来实现自定义分区 文章目录 Kafka如何实现分区 Kafka集群是如何知道投递到哪个broker中 默认 ...

  6. SpringBoot整合kafka(安装)

    项目路径:https://github.com/zhaopeng01/springboot-study/tree/master/study_14 序言 Kafka 是一种高吞吐的分布式发布订阅消息系统 ...

  7. springboot 整合kafka 实现生产,消费数据

    一 kafka集群的启动 1.1 机器说明 192.168.152.128 master 192.168.152.129 slaver01 192.168.152.130 slaver02 1.2 查 ...

  8. Kafka精品教学(入门,安装,Springboot整合Kafka)

    ps:本文是博主结合视频和博客学习之后,自己实验总结编写的,如果侵权请联系删除. 要学习kafka首先要了解什么是消息队列,因为Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Messag ...

  9. Kafka原理以及SpringBoot整合Kafka

    1.Kafka原理 1. brokers有多个broker组成,broker是指Kafka服务器(192.168.223.140就是其中的一个broker),上面三台Kafka服务器组成了Kafka集 ...

最新文章

  1. 机器学习(MACHINE LEARNING) 【周志华版-”西瓜书“-笔记】 DAY16-强化学习
  2. 线性规划与网络流24题 运输问题(最裸的费用流了)
  3. 2.2.1 MySQL基本功能与参数文件管理
  4. [TJOI2010]阅读理解
  5. java中挂起和恢复,应用程序“未能及时恢复”并挂起
  6. 提交文件至服务器的设置——表单属性中的 enctype
  7. VisualSVNServer的使用
  8. IIS无组件的解决办法 xp系统组件无IIS iis解决办法 IIS
  9. net 架构师-数据库-sql server-002-工具
  10. grpc通信原理_gRPC原理简析
  11. 游戏迷看过来 年末促销买个华为平板 M3可畅玩
  12. 25.构造ICMP数据包
  13. Bigemap中添加离线地图数据包 教程
  14. 小球碰撞python代码_python开发的小球完全弹性碰撞游戏代码_python_脚本之家
  15. python查看mac的usb信息_Python实现的读取电脑硬件信息功能示例
  16. 远程主机和本地文件互传的2种方法
  17. 整理项目管理中的挣值管理相关计算 AC PV EV BAC CV SV CPI SPI ETC EAC 计算
  18. CSS高手布局:让footer完美处于网页下方
  19. SUST暑期集训题解(可持久化数据结构)
  20. jTemplates异步加载实现与HTML5 video视频开发

热门文章

  1. mfs1.6.x故障一例,血的经验教训 推荐
  2. 在tomcat上全手工部署Servlet3.0
  3. qiniudn.com域名已完全恢复
  4. LayerDrawable层叠样式layer
  5. 淘宝成全球电商第一人气网站
  6. ASP.NET AJAX入门系列
  7. 前端开发要注意的浏览器兼容性问题整理
  8. ERROR 程序出错,错误原因:'bytes' object has no attribute 'read'
  9. cocos2dx迷你地图
  10. hdu 1300(dp)