系列文章目录


文章目录

  • 系列文章目录
  • 前言
    • 一、本文要点
    • 二、开发环境
    • 三、创建项目
    • 四、修改项目
    • 五、测试一下
    • 六、小结

前言

在日常开发当中,经常会遇到需要消费的topic不在同一个kafka集群内,这时候需要配置多个数据源,如何优雅地完成这个配置呢?


一、本文要点

接前文,我们已经在项目里集成了kafka。本文将介绍如何优雅地整合多个kafka数据源。系列文章完整目录

  • springboot 整合多个kafka数据源
  • springboot 整合多个消费者topic
  • kafka listener 配置factory

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka 2.12-2.3
  • idea 2020

三、创建项目

1、使用早期文章快速创建项目。

《搭建大型分布式服务(十二)Docker搭建开发环境安装Kafka和zookeeper》
《搭建大型分布式服务(十三)SpringBoot整合kafka》
《搭建大型分布式服务(十八)Maven自定义项目脚手架》

2、创建Ticket项目

mvn archetype:generate  -DgroupId="com.mmc.lesson" -DartifactId="ticket" -Dversion=1.0-SNAPSHOT -Dpackage="com.mmc.lesson" -DarchetypeArtifactId=member-archetype  -DarchetypeGroupId=com.mmc.lesson -DarchetypeVersion=1.0.0-SNAPSHOT -B

四、修改项目

1、编写KafkaPropertiesConfiguration.java,用来接收两个kafka配置。

@Configuration
public class KafkaPropertiesConfiguration {/*** one的kafka配置.*/@Bean("oneKafkaProperties")@ConfigurationProperties("spring.one.kafka")public CustomKafkaProperties oneKafkaProperties() {return new CustomKafkaProperties();}/*** two的kafka配置.*/@Bean("twoKafkaProperties")@ConfigurationProperties("spring.two.kafka")public CustomKafkaProperties twoKafkaProperties() {return new CustomKafkaProperties();}@Datapublic static class CustomKafkaProperties {private final Consumer consumer = new Consumer();/*** Create an initial map of consumer properties from the state of this instance.* <p>* This allows you to add additional properties, if necessary, and override the* default kafkaConsumerFactory bean.** @return the consumer properties initialized with the customizations defined on this*         instance*/public Map<String, Object> buildConsumerProperties() {return new HashMap<>(this.consumer.buildProperties());}}
}

2、编写KafkaConsumerConfiguration.java,定义两个消费工厂。

@Configuration
@AutoConfigureAfter(KafkaPropertiesConfiguration.class)
public class KafkaConsumerConfiguration {@Value("${spring.kafka.listener.type:batch}")private String listenerType;/*** 消费one数据.*/@Bean("oneContainerFactory")public ConcurrentKafkaListenerContainerFactory<Object, Object> oneContainerFactory(@Qualifier("oneKafkaProperties")CustomKafkaProperties kafkaProperties) {ConcurrentKafkaListenerContainerFactory<Object, Object> container =new ConcurrentKafkaListenerContainerFactory<>();container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));// 设置并发量,小于或等于Topic的分区数// container.setConcurrency(5);// 设置为批量监听container.setBatchListener("batch".equalsIgnoreCase(listenerType));return container;}/*** 消费two数据.*/@Bean("twoContainerFactory")public ConcurrentKafkaListenerContainerFactory<Object, Object> twoContainerFactory(@Qualifier("twoKafkaProperties")CustomKafkaProperties kafkaProperties) {ConcurrentKafkaListenerContainerFactory<Object, Object> container =new ConcurrentKafkaListenerContainerFactory<>();container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));// 设置并发量,小于或等于Topic的分区数// container.setConcurrency(5);// 设置为批量监听container.setBatchListener("batch".equalsIgnoreCase(listenerType));return container;}
}

3、编写KafkaReceiver.java,消费两个topic的数据。

@Service
@Slf4j
public class KafkaReceiver {@KafkaListener(id = "kafka-one-demo",topics = Const.KAFKA_ONE_DEMO_TOPIC,groupId = "oneGroup",containerFactory = "oneContainerFactory")public void receiveOne(ConsumerRecord<String, String> record) {if (null == record || !StringUtils.hasText(record.value())) {log.warn("KafkaReceiver record is null or record.value is empty.");return;}String reqJson = record.value();log.info("one KafkaReceiver {}", reqJson);}@KafkaListener(id = "kafka-two-demo",topics = Const.KAFKA_TWO_DEMO_TOPIC,groupId = "twoGroup",containerFactory = "twoContainerFactory")public void receiveTwo(ConsumerRecord<String, String> record) {if (null == record || !StringUtils.hasText(record.value())) {log.warn("KafkaReceiver record is null or record.value is empty.");return;}String reqJson = record.value();log.info("two KafkaReceiver {}", reqJson);}
}

4、修改application-dev.properties 增加双kafka配置,其它环境同理(可以放到Apollo托管)。

#################### KAFKA ###################### 以下为消费者配置
spring.kafka.listener.type=single
spring.kafka.listener.missing-topics-fatal=falsespring.ymall.kafka.consumer.bootstrapServers=127.0.0.1:9092
spring.ymall.kafka.consumer.auto-offset-reset=latest
spring.ymall.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.ymall.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.ymall.kafka.consumer.max-poll-records=50spring.center.kafka.consumer.bootstrapServers=127.0.0.1:9092
spring.center.kafka.consumer.auto-offset-reset=latest
spring.center.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.center.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.center.kafka.consumer.max-poll-records=50## 以下为单元测试生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.buffer-memory=524288
spring.kafka.producer.batch-size=65536
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

五、测试一下

1、修改并运行单元测试

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest
class KafkaSenderTest {@Resourceprivate KafkaSender kafkaSender;@Testvoid sendMessage() throws IOException {String json = "hello";for (int i = 0; i < 1; i++) {kafkaSender.sendMessage(Const.KAFKA_ONE_DEMO_TOPIC, json);kafkaSender.sendMessage(Const.KAFKA_TWO_DEMO_TOPIC, json);}System.in.read();}}

2、测试通过。

[2021-08-19 17:09:02.203] [kafka-two-demo-0-C-1] [INFO] [o.s.kafka.listener.KafkaMessageListenerContainer:292] - twoGroup: partitions assigned: [kafka-single-demo-topic-0]
[2021-08-19 17:09:02.203] [kafka-one-demo-0-C-1] [INFO] [o.s.kafka.listener.KafkaMessageListenerContainer:292] - oneGroup: partitions assigned: [kafka-single-demo-topic-0]
[2021-08-19 17:09:09.721] [kafka-two-demo-0-C-1] [INFO] [com.mmc.lesson.kafka.KafkaReceiver:?] - two KafkaReceiver hello
[2021-08-19 17:09:12.736] [kafka-one-demo-0-C-1] [INFO] [com.mmc.lesson.kafka.KafkaReceiver:?] - one KafkaReceiver hello

六、小结

至此,我们就优雅地整合多个kafka消费者数据源,小伙伴们可以发挥自己的动手能力,配置多个生产者哦。下一篇《搭建大型分布式服务(二十三)SpringBoot 如何整合比GuavaCache性能好n倍的Caffeine并根据名称设置不同的失效时间?》

加我加群一起交流学习!更多干货下载和大厂内推等着你

搭建大型分布式服务(二十二)SpringBoot 如何优雅地整合多个kafka数据源?相关推荐

  1. 搭建大型分布式服务(十四)SpringBoot整合dubbo starter

    一.本文要点 接上文,我们已经把SpringBoot整合mybatis+Hikari+es+redis+kafka了,本文将介绍SpringBoot如何整合dubbo.系列文章完整目录 dubbo注解 ...

  2. 搭建大型分布式服务(二十五)如何将应用部署到TKE容器集群?

    系列文章目录 文章目录 系列文章目录 前言 一.本文要点 二.开发环境 三.部署容器服务 1.制作Nginx镜像,用来打包前端web服务. 2.创建工作负载,用来运行前端web服务. 3.配置serv ...

  3. 使用AFS, Active Directory和SSSD搭建用于集成电路设计的分布式存储系统 【十二】部署第一台 AFS 服务器 1

    使用AFS, Active Directory和SSSD搭建用于集成电路设计的分布式存储系统 [十二]部署第一台 AFS 服务器 1 预备条件检查清单 第一台服务器上将要部署和运行的服务 ptserv ...

  4. RHEL4- WEB服务(十二)用户访问apache服务器认证

    RHEL4- WEB服务(十二)用户访问apache服务器认证   有些时候网站上的内容不是希望所有的用户都可以访问,由于网页内容性质的不同,会对来访的用户有所分类,网站的提供方希望部分网页内容只提供 ...

  5. uniapp 学习笔记二十二 购物车页面结构搭建

    uniapp 学习笔记二十二 购物车页面结构搭建 cart.vue <template><view><view class="flex padding" ...

  6. 使用O2OA二次开发搭建企业办公平台(十二)流程开发篇:报销审批流程需求和应用创建

    本博客为O2OA系列教程.O2OA使用手册,教程目录和各章节天梯将在连载完后更新. 使用O2OA二次开发搭建企业办公平台(一)平台部署篇:平台下载和部署 使用O2OA二次开发搭建企业办公平台(二)平台 ...

  7. 使用O2OA二次开发搭建企业办公平台(十二)流程开发篇:报销审批流程需求和应用创建...

    本博客为O2OA系列教程.O2OA使用手册,教程目录和各章节天梯将在连载完后更新. 使用O2OA二次开发搭建企业办公平台(一)平台部署篇:平台下载和部署 使用O2OA二次开发搭建企业办公平台(二)平台 ...

  8. 2021年大数据Hadoop(二十二):MapReduce的自定义分组

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 MapReduce的自定义分组 需求 分析 实现 第一步: ...

  9. (二十二)admin-boot项目之集成just-auth实现第三方授权登录

    (二十二)集成just-auth实现第三方授权登录 项目地址:https://gitee.com/springzb/admin-boot 如果觉得不错,给个 star 简介: 这是一个基础的企业级基础 ...

最新文章

  1. java uml 为什么_Java开发为什么需要UML (转)
  2. 【分布式事务系列九】聊聊分布式事务
  3. 如何将CSS应用于iframe?
  4. 【动态规划】【图论】[NOIP模拟赛]独立集
  5. 输入输出系统 2--- 中断(未完)
  6. vim简单命令教程-firstblood
  7. java final属性
  8. 2019.4.26今日任务
  9. 20155320 2016-2017-2 《Java程序设计》第五周学习总结
  10. 去掉CSDN blog 多余的版权申明部分[转贴]
  11. 6.3创建自己执行的二进制文件
  12. MAC编译lame ld: symbol(s) not found for architecture x86_64/_lame_init_old“, referenced from
  13. 中小企业如何有效利用桌面共享软件降低营运成本
  14. python读取像素值
  15. 二级运放压摆率分析(SR)
  16. 上海-苏州 100公里徒步旅行心情分享(二)
  17. JavaSE总结知识点 重要的点(并不是详细的教材语录)
  18. 用HTML制作简单的个人介绍主页
  19. 【仿淘宝首页】前端网页模板,大学生前端作业分享,html5+css电商网站模板,包含js动效
  20. linux断掉crt、xshell依然运行命令?查找命令执行是否完成?

热门文章

  1. ROS使用(10)URDF
  2. Python实现性能测试(locust)
  3. Ubuntu-Touch-03:使用SSH连接手机
  4. 龙族幻想服务器维修到几点,【龙族幻想】4月1日维护公告
  5. 用计算机解锁ipad密码忘了怎么办,iPad密码忘记了怎么办_iPad密码忘记解锁办法-太平洋IT百科手机版...
  6. 三国现实主义的巅峰——贾诩
  7. QQ的春节红包有多火?疑似阿里员工也转粉
  8. 华为 EchoLife HG522无线猫设置图解 电信封杀路由全部搞定
  9. dnf服务器维护11.12,2018dnf11上12会不会归零 | 手游网游页游攻略大全
  10. 全世界禁用谷歌的五大国家_5个国家站在Google与世界统治之间