一、配置文件

pom.xml

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

application.yml

spring:application:name: double-kafka-consumerprofiles:active: devjackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8server:port: 8008sys:kafka:one:bootstrap-servers: 192.168.1.2:5021consumer:group-id: one-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: one-kafka-testtwo:bootstrap-servers: 192.168.1.3:5021consumer:group-id: two-groupauto-offset-reset: latestenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 5topic: two-kafka-test

二、配置Configuration Bean

(1)第一个kafka配置


public class OneKafkaConfig {@Beanpublic KafkaListenerContainerFactory oneKafkaFactory(@Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(oneConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//手动提交return factory;}@Beanpublic ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());}@ConfigurationProperties(prefix = "sys.kafka.one")@Beanpublic KafkaProperties oneKafkaProperties(){return new KafkaProperties();}}

(2)第二个kafka配置(主)


public class TwoKafkaConfig {@Beanpublic KafkaListenerContainerFactory twoKafkaFactory(@Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(twoConsumerFactory);factory.setConcurrency(10);factory.setBatchListener(true);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}@Primary//必须指定一个默认的消费者工厂@Beanpublic ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());}@Primary//必须指定一个默认的kafka配置@ConfigurationProperties(prefix = "sys.kafka.two")@Beanpublic KafkaProperties twoKafkaProperties(){return new KafkaProperties();}}

(3)kakfka配置导入

@Configuration
@Import({OneKafkaConfig.class, TwoKafkaConfig.class})
public class KafkaConfig {@Beanpublic KafkaConsumer kafkaConsumer(){KafkaConsumer consumer = new KafkaConsumer();return consumer;}}

(4) 消费者监听

public class KafkaConsumer {@KafkaListener(containerFactory = "oneKafkaFactory", topics = "${sys.kafka.one.topic}")public void oneKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing }@KafkaListener(containerFactory = "twoKafkaFactory", topics = "${sys.kafka.two.topic}")public void twoKafkaHandle(List<ConsumerRecord<String,String>> consumerRecords, Acknowledgment ack){//do somthing}}

SpringBoot2.1.9 多Kafka消费者配置相关推荐

  1. Kafka 实战指南——Kafka 消费者配置

    文章目录 1. 消费位点提交 2. 消费位点重置 3. session 超时和心跳监测 4. 拉取大消息 5. 拉取公网 6. 消息重复和消费幂等 7. 消费失败 8. 消费延迟 9. 消费阻塞以及堆 ...

  2. SpringBoot笔记:SpringBoot2.3集成Kafka组件配置

    文章目录 说明 Springboot集成Kafka 依赖配置 配置文件yml配置 Producer生产者 Consumer消费者 测试代码 注意事项 说明 本文是接<Kafka学习:CentOS ...

  3. Kafka学习(十)--Kafka消费者Consumer消费消息配置实战

    一. Kafka消费者Consumer消费消息配置实战 配置: public static Properties getProperties() {Properties props = new Pro ...

  4. kafka消费者接收分区测试

    [README] 本文演示了当有新消费者加入组后,其他消费者接收分区情况: 本文还模拟了 broker 宕机的情况: 本文使用的是最新的 kafka3.0.0 : 本文测试案例,来源于 消费者接收分区 ...

  5. Kafka 安装配置及快速入门

    2019独角兽企业重金招聘Python工程师标准>>> 一.简介 官网:http://kafka.apache.org/ Apache Kafka是分布式发布-订阅消息系统.它最初由 ...

  6. Kafka消费者详解

    一.基本概念 1.消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息.假设有一个T1主题,该主题有4个分区:同时我们有一个消费组G ...

  7. Kafka消费者APi

    Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区.也和服务器交互,平衡均衡消费者. public class KafkaConsumer ...

  8. kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者

    一.Kafka安装与使用 ( kafka介绍     ) 1. 下载Kafka 2. 安装 Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka ...

  9. kafka消费者开发方式小结

    [README] 1, 本文总结了 kafka消费者开发方式: 2, 本文使用的是最新的kafka版本 3.0.0: [1] kafka消费则 [1.1]消费者与消费者组 1)消费者: 应用程序需要创 ...

最新文章

  1. Blender车辆绑定动画制作视频教程
  2. php 管理 mysql 数据库 代码_安装并使用phpMyAdmin管理MySQL数据库_php
  3. 图表中各个参数的应用( AChartEngine XMultipleSeriesRenderer
  4. [BZOJ1444]有趣的游戏(AC自动机+矩阵乘法)
  5. 滑动平均_善杰告诉您初中物理学滑动变阻器的各种作用
  6. Android学习笔记25-画廊控件Gallery的使用
  7. 介绍一款很好用的分区软件--分区助手(不用格式化磁盘哟~)
  8. python for ArcGIS 绘制广州市板块地图
  9. react 实现展示公司层级,选择人员的功能
  10. 矢量图标库Font Awesome的SVG新版本图标库5.x
  11. 计算机网络密码凭据,win7系统共享提示输入网络凭据用户名密码的解决办法
  12. 如何在VSCode配置PHP开发环境(详细版)
  13. Mysql数据库学习笔记[完结]
  14. 宝宝树全自动引流脚本软件高质量活跃粉丝
  15. 【计算机网络实验】笔记(实验一、二)
  16. HTTP学习四:SPDY和HTTP/2.0
  17. LDPC码简介(一)
  18. 关于在neo4j中使用cypher语句实现NOT IN 的功能
  19. Js在业务软件中的方法大全
  20. 微信小程序带来的颠覆

热门文章

  1. 计算机怎么取消脱敏设置,一种敏感数据自适应的脱敏方法、系统技术方案
  2. TypeScript,从0到入门带你进入类型的世界
  3. [RabbitMQ]常用命令
  4. Java语法基础50题训练(上)
  5. 中科大软件测试期末复习
  6. sap 标准委外和工序委外_SAP FICO零基础学习_0035_标准成本估算-主数据-物料主数据...
  7. 数据结构与索引-- B+树索引
  8. 360浏览器linux版本_360安全浏览器崩溃解决方案
  9. Codeforces Round #579 (Div. 3) F1. Complete the Projects (easy version) 排序 + 贪心
  10. HDU - 1998 奇数阶魔方