SpringBoot2.1.9 多Kafka消费者配置
一、配置文件
pom.xml
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
application.yml
spring:application:name: double-kafka-consumerprofiles:active: devjackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8server:port: 8008sys:kafka:one:bootstrap-servers: 192.168.1.2:5021consumer:group-id: one-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: one-kafka-testtwo:bootstrap-servers: 192.168.1.3:5021consumer:group-id: two-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: two-kafka-test
二、配置Configuration Bean
(1)第一个kafka配置
public class OneKafkaConfig {@Beanpublic KafkaListenerContainerFactory oneKafkaFactory(@Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(oneConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//手动提交return factory;}@Beanpublic ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@ConfigurationProperties(prefix = "sys.kafka.one")@Beanpublic KafkaProperties oneKafkaProperties(){return new KafkaProperties();}}
(2)第二个kafka配置(主)
public class TwoKafkaConfig {@Beanpublic KafkaListenerContainerFactory twoKafkaFactory(@Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(twoConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Primary//必须指定一个默认的消费者工厂@Beanpublic ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}@Primary//必须指定一个默认的kafka配置@ConfigurationProperties(prefix = "sys.kafka.two")@Beanpublic KafkaProperties twoKafkaProperties(){return new KafkaProperties();}}
(3)kakfka配置导入
@Configuration
@Import({OneKafkaConfig.class, TwoKafkaConfig.class})
public class KafkaConfig {@Beanpublic KafkaConsumer kafkaConsumer(){KafkaConsumer consumer = new KafkaConsumer();return consumer;}}
(4) 消费者监听
public class KafkaConsumer {@KafkaListener(containerFactory = "oneKafkaFactory", topics = "${sys.kafka.one.topic}")public void oneKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing }@KafkaListener(containerFactory = "twoKafkaFactory", topics = "${sys.kafka.two.topic}")public void twoKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing}}
SpringBoot2.1.9 多Kafka消费者配置相关推荐
- Kafka 实战指南——Kafka 消费者配置
文章目录 1. 消费位点提交 2. 消费位点重置 3. session 超时和心跳监测 4. 拉取大消息 5. 拉取公网 6. 消息重复和消费幂等 7. 消费失败 8. 消费延迟 9. 消费阻塞以及堆 ...
- SpringBoot笔记:SpringBoot2.3集成Kafka组件配置
文章目录 说明 Springboot集成Kafka 依赖配置 配置文件yml配置 Producer生产者 Consumer消费者 测试代码 注意事项 说明 本文是接<Kafka学习:CentOS ...
- Kafka学习(十)--Kafka消费者Consumer消费消息配置实战
一. Kafka消费者Consumer消费消息配置实战 配置: public static Properties getProperties() {Properties props = new Pro ...
- kafka消费者接收分区测试
[README] 本文演示了当有新消费者加入组后,其他消费者接收分区情况: 本文还模拟了 broker 宕机的情况: 本文使用的是最新的 kafka3.0.0 : 本文测试案例,来源于 消费者接收分区 ...
- Kafka 安装配置及快速入门
2019独角兽企业重金招聘Python工程师标准>>> 一.简介 官网:http://kafka.apache.org/ Apache Kafka是分布式发布-订阅消息系统.它最初由 ...
- Kafka消费者详解
一.基本概念 1.消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息.假设有一个T1主题,该主题有4个分区:同时我们有一个消费组G ...
- Kafka消费者APi
Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区.也和服务器交互,平衡均衡消费者. public class KafkaConsumer ...
- kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者
一.Kafka安装与使用 ( kafka介绍 ) 1. 下载Kafka 2. 安装 Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka ...
- kafka消费者开发方式小结
[README] 1, 本文总结了 kafka消费者开发方式: 2, 本文使用的是最新的kafka版本 3.0.0: [1] kafka消费则 [1.1]消费者与消费者组 1)消费者: 应用程序需要创 ...
最新文章
- Blender车辆绑定动画制作视频教程
- php 管理 mysql 数据库 代码_安装并使用phpMyAdmin管理MySQL数据库_php
- 图表中各个参数的应用( AChartEngine XMultipleSeriesRenderer
- [BZOJ1444]有趣的游戏(AC自动机+矩阵乘法)
- 滑动平均_善杰告诉您初中物理学滑动变阻器的各种作用
- Android学习笔记25-画廊控件Gallery的使用
- 介绍一款很好用的分区软件--分区助手(不用格式化磁盘哟~)
- python for ArcGIS 绘制广州市板块地图
- react 实现展示公司层级,选择人员的功能
- 矢量图标库Font Awesome的SVG新版本图标库5.x
- 计算机网络密码凭据,win7系统共享提示输入网络凭据用户名密码的解决办法
- 如何在VSCode配置PHP开发环境(详细版)
- Mysql数据库学习笔记[完结]
- 宝宝树全自动引流脚本软件高质量活跃粉丝
- 【计算机网络实验】笔记(实验一、二)
- HTTP学习四:SPDY和HTTP/2.0
- LDPC码简介(一)
- 关于在neo4j中使用cypher语句实现NOT IN 的功能
- Js在业务软件中的方法大全
- 微信小程序带来的颠覆
热门文章
- 计算机怎么取消脱敏设置,一种敏感数据自适应的脱敏方法、系统技术方案
- TypeScript,从0到入门带你进入类型的世界
- [RabbitMQ]常用命令
- Java语法基础50题训练(上)
- 中科大软件测试期末复习
- sap 标准委外和工序委外_SAP FICO零基础学习_0035_标准成本估算-主数据-物料主数据...
- 数据结构与索引-- B+树索引
- 360浏览器linux版本_360安全浏览器崩溃解决方案
- Codeforces Round #579 (Div. 3) F1. Complete the Projects (easy version) 排序 + 贪心
- HDU - 1998 奇数阶魔方