搭建大型分布式服务(二十二)SpringBoot 如何优雅地整合多个kafka数据源?
系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、本文要点
- 二、开发环境
- 三、创建项目
- 四、修改项目
- 五、测试一下
- 六、小结
前言
在日常开发当中,经常会遇到需要消费的topic不在同一个kafka集群内,这时候需要配置多个数据源,如何优雅地完成这个配置呢?
一、本文要点
接前文,我们已经在项目里集成了kafka。本文将介绍如何优雅地整合多个kafka数据源。系列文章完整目录
- springboot 整合多个kafka数据源
- springboot 整合多个消费者topic
- kafka listener 配置factory
二、开发环境
- jdk 1.8
- maven 3.6.2
- springboot 2.4.3
- kafka 2.12-2.3
- idea 2020
三、创建项目
1、使用早期文章快速创建项目。
《搭建大型分布式服务(十二)Docker搭建开发环境安装Kafka和zookeeper》
《搭建大型分布式服务(十三)SpringBoot整合kafka》
《搭建大型分布式服务(十八)Maven自定义项目脚手架》
2、创建Ticket项目
mvn archetype:generate -DgroupId="com.mmc.lesson" -DartifactId="ticket" -Dversion=1.0-SNAPSHOT -Dpackage="com.mmc.lesson" -DarchetypeArtifactId=member-archetype -DarchetypeGroupId=com.mmc.lesson -DarchetypeVersion=1.0.0-SNAPSHOT -B
四、修改项目
1、编写KafkaPropertiesConfiguration.java,用来接收两个kafka配置。
@Configuration
public class KafkaPropertiesConfiguration {/*** one的kafka配置.*/@Bean("oneKafkaProperties")@ConfigurationProperties("spring.one.kafka")public CustomKafkaProperties oneKafkaProperties() {return new CustomKafkaProperties();}/*** two的kafka配置.*/@Bean("twoKafkaProperties")@ConfigurationProperties("spring.two.kafka")public CustomKafkaProperties twoKafkaProperties() {return new CustomKafkaProperties();}@Datapublic static class CustomKafkaProperties {private final Consumer consumer = new Consumer();/*** Create an initial map of consumer properties from the state of this instance.* <p>* This allows you to add additional properties, if necessary, and override the* default kafkaConsumerFactory bean.** @return the consumer properties initialized with the customizations defined on this* instance*/public Map<String, Object> buildConsumerProperties() {return new HashMap<>(this.consumer.buildProperties());}}
}
2、编写KafkaConsumerConfiguration.java,定义两个消费工厂。
@Configuration
@AutoConfigureAfter(KafkaPropertiesConfiguration.class)
public class KafkaConsumerConfiguration {@Value("${spring.kafka.listener.type:batch}")private String listenerType;/*** 消费one数据.*/@Bean("oneContainerFactory")public ConcurrentKafkaListenerContainerFactory<Object, Object> oneContainerFactory(@Qualifier("oneKafkaProperties")CustomKafkaProperties kafkaProperties) {ConcurrentKafkaListenerContainerFactory<Object, Object> container =new ConcurrentKafkaListenerContainerFactory<>();container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));// 设置并发量,小于或等于Topic的分区数// container.setConcurrency(5);// 设置为批量监听container.setBatchListener("batch".equalsIgnoreCase(listenerType));return container;}/*** 消费two数据.*/@Bean("twoContainerFactory")public ConcurrentKafkaListenerContainerFactory<Object, Object> twoContainerFactory(@Qualifier("twoKafkaProperties")CustomKafkaProperties kafkaProperties) {ConcurrentKafkaListenerContainerFactory<Object, Object> container =new ConcurrentKafkaListenerContainerFactory<>();container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));// 设置并发量,小于或等于Topic的分区数// container.setConcurrency(5);// 设置为批量监听container.setBatchListener("batch".equalsIgnoreCase(listenerType));return container;}
}
3、编写KafkaReceiver.java,消费两个topic的数据。
@Service
@Slf4j
public class KafkaReceiver {@KafkaListener(id = "kafka-one-demo",topics = Const.KAFKA_ONE_DEMO_TOPIC,groupId = "oneGroup",containerFactory = "oneContainerFactory")public void receiveOne(ConsumerRecord<String, String> record) {if (null == record || !StringUtils.hasText(record.value())) {log.warn("KafkaReceiver record is null or record.value is empty.");return;}String reqJson = record.value();log.info("one KafkaReceiver {}", reqJson);}@KafkaListener(id = "kafka-two-demo",topics = Const.KAFKA_TWO_DEMO_TOPIC,groupId = "twoGroup",containerFactory = "twoContainerFactory")public void receiveTwo(ConsumerRecord<String, String> record) {if (null == record || !StringUtils.hasText(record.value())) {log.warn("KafkaReceiver record is null or record.value is empty.");return;}String reqJson = record.value();log.info("two KafkaReceiver {}", reqJson);}
}
4、修改application-dev.properties 增加双kafka配置,其它环境同理(可以放到Apollo托管)。
#################### KAFKA ###################### 以下为消费者配置
spring.kafka.listener.type=single
spring.kafka.listener.missing-topics-fatal=falsespring.ymall.kafka.consumer.bootstrapServers=127.0.0.1:9092
spring.ymall.kafka.consumer.auto-offset-reset=latest
spring.ymall.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.ymall.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.ymall.kafka.consumer.max-poll-records=50spring.center.kafka.consumer.bootstrapServers=127.0.0.1:9092
spring.center.kafka.consumer.auto-offset-reset=latest
spring.center.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.center.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.center.kafka.consumer.max-poll-records=50## 以下为单元测试生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.buffer-memory=524288
spring.kafka.producer.batch-size=65536
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
五、测试一下
1、修改并运行单元测试
@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest
class KafkaSenderTest {@Resourceprivate KafkaSender kafkaSender;@Testvoid sendMessage() throws IOException {String json = "hello";for (int i = 0; i < 1; i++) {kafkaSender.sendMessage(Const.KAFKA_ONE_DEMO_TOPIC, json);kafkaSender.sendMessage(Const.KAFKA_TWO_DEMO_TOPIC, json);}System.in.read();}}
2、测试通过。
[2021-08-19 17:09:02.203] [kafka-two-demo-0-C-1] [INFO] [o.s.kafka.listener.KafkaMessageListenerContainer:292] - twoGroup: partitions assigned: [kafka-single-demo-topic-0]
[2021-08-19 17:09:02.203] [kafka-one-demo-0-C-1] [INFO] [o.s.kafka.listener.KafkaMessageListenerContainer:292] - oneGroup: partitions assigned: [kafka-single-demo-topic-0]
[2021-08-19 17:09:09.721] [kafka-two-demo-0-C-1] [INFO] [com.mmc.lesson.kafka.KafkaReceiver:?] - two KafkaReceiver hello
[2021-08-19 17:09:12.736] [kafka-one-demo-0-C-1] [INFO] [com.mmc.lesson.kafka.KafkaReceiver:?] - one KafkaReceiver hello
六、小结
至此,我们就优雅地整合多个kafka消费者数据源,小伙伴们可以发挥自己的动手能力,配置多个生产者哦。下一篇《搭建大型分布式服务(二十三)SpringBoot 如何整合比GuavaCache性能好n倍的Caffeine并根据名称设置不同的失效时间?》
加我加群一起交流学习!更多干货下载和大厂内推等着你
搭建大型分布式服务(二十二)SpringBoot 如何优雅地整合多个kafka数据源?相关推荐
- 搭建大型分布式服务(十四)SpringBoot整合dubbo starter
一.本文要点 接上文,我们已经把SpringBoot整合mybatis+Hikari+es+redis+kafka了,本文将介绍SpringBoot如何整合dubbo.系列文章完整目录 dubbo注解 ...
- 搭建大型分布式服务(二十五)如何将应用部署到TKE容器集群?
系列文章目录 文章目录 系列文章目录 前言 一.本文要点 二.开发环境 三.部署容器服务 1.制作Nginx镜像,用来打包前端web服务. 2.创建工作负载,用来运行前端web服务. 3.配置serv ...
- 使用AFS, Active Directory和SSSD搭建用于集成电路设计的分布式存储系统 【十二】部署第一台 AFS 服务器 1
使用AFS, Active Directory和SSSD搭建用于集成电路设计的分布式存储系统 [十二]部署第一台 AFS 服务器 1 预备条件检查清单 第一台服务器上将要部署和运行的服务 ptserv ...
- RHEL4- WEB服务(十二)用户访问apache服务器认证
RHEL4- WEB服务(十二)用户访问apache服务器认证 有些时候网站上的内容不是希望所有的用户都可以访问,由于网页内容性质的不同,会对来访的用户有所分类,网站的提供方希望部分网页内容只提供 ...
- uniapp 学习笔记二十二 购物车页面结构搭建
uniapp 学习笔记二十二 购物车页面结构搭建 cart.vue <template><view><view class="flex padding" ...
- 使用O2OA二次开发搭建企业办公平台(十二)流程开发篇:报销审批流程需求和应用创建
本博客为O2OA系列教程.O2OA使用手册,教程目录和各章节天梯将在连载完后更新. 使用O2OA二次开发搭建企业办公平台(一)平台部署篇:平台下载和部署 使用O2OA二次开发搭建企业办公平台(二)平台 ...
- 使用O2OA二次开发搭建企业办公平台(十二)流程开发篇:报销审批流程需求和应用创建...
本博客为O2OA系列教程.O2OA使用手册,教程目录和各章节天梯将在连载完后更新. 使用O2OA二次开发搭建企业办公平台(一)平台部署篇:平台下载和部署 使用O2OA二次开发搭建企业办公平台(二)平台 ...
- 2021年大数据Hadoop(二十二):MapReduce的自定义分组
全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 MapReduce的自定义分组 需求 分析 实现 第一步: ...
- (二十二)admin-boot项目之集成just-auth实现第三方授权登录
(二十二)集成just-auth实现第三方授权登录 项目地址:https://gitee.com/springzb/admin-boot 如果觉得不错,给个 star 简介: 这是一个基础的企业级基础 ...
最新文章
- java uml 为什么_Java开发为什么需要UML (转)
- 【分布式事务系列九】聊聊分布式事务
- 如何将CSS应用于iframe?
- 【动态规划】【图论】[NOIP模拟赛]独立集
- 输入输出系统 2--- 中断(未完)
- vim简单命令教程-firstblood
- java final属性
- 2019.4.26今日任务
- 20155320 2016-2017-2 《Java程序设计》第五周学习总结
- 去掉CSDN blog 多余的版权申明部分[转贴]
- 6.3创建自己执行的二进制文件
- MAC编译lame ld: symbol(s) not found for architecture x86_64/_lame_init_old“, referenced from
- 中小企业如何有效利用桌面共享软件降低营运成本
- python读取像素值
- 二级运放压摆率分析(SR)
- 上海-苏州 100公里徒步旅行心情分享(二)
- JavaSE总结知识点 重要的点(并不是详细的教材语录)
- 用HTML制作简单的个人介绍主页
- 【仿淘宝首页】前端网页模板,大学生前端作业分享,html5+css电商网站模板,包含js动效
- linux断掉crt、xshell依然运行命令?查找命令执行是否完成?
热门文章
- ROS使用(10)URDF
- Python实现性能测试(locust)
- Ubuntu-Touch-03:使用SSH连接手机
- 龙族幻想服务器维修到几点,【龙族幻想】4月1日维护公告
- 用计算机解锁ipad密码忘了怎么办,iPad密码忘记了怎么办_iPad密码忘记解锁办法-太平洋IT百科手机版...
- 三国现实主义的巅峰——贾诩
- QQ的春节红包有多火?疑似阿里员工也转粉
- 华为 EchoLife HG522无线猫设置图解 电信封杀路由全部搞定
- dnf服务器维护11.12,2018dnf11上12会不会归零 | 手游网游页游攻略大全
- 全世界禁用谷歌的五大国家_5个国家站在Google与世界统治之间