springboot 集成kafka 实现多个customer不同group
springboot正常集成kafka
这个网上很多资料都有些集成,我就不浪费太多篇幅和时间了,笔者找了篇还算很容易理解的博客,自行学习
https://blog.csdn.net/tzs_1041218129/article/details/78988439
多个customer不同group
刚那篇博客,很容易就能上手,那么问题来了,group-id在配置文件中设置,我要是想要有不同的groupid怎么办?
莫慌,看看@KafkaListener 注解,里面就有group属性可以设置啊,感觉还是比较人性化的吗?
@KafkaListener(group = "= =!!!")
马上试试。。。。
配置文件设置consumer的group-id为myGroup。
@Component
public class KafkaConsumer {@KafkaListener(id = "test1", topics="test-topic", group = "test1")public void processMessage(String content) throws InterruptedException {System.out.println("收到消息 1=>" + content);}@KafkaListener(id = "test2", topics="test-topic", group = "test2")public void processMessage1(String content) throws InterruptedException {System.out.println("收到消息 2=>" + content);}
}
启动,额,控制台打印
[ main] xxx : Kafka version : 0.10.1.1
[ main] xxx : Kafka commitId : f10ef2720b03b247
[ main] xxx : Tomcat started on port(s): 82 (http)
[ main] xxx : Started App in 3.653 seconds (JVM running for 4.119)
[ test2-0-C-1] xxx : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group myGroup.
[ test1-0-C-1] xxx : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group myGroup.
最后几行???????我猜的没错的话,两个listener都被归属到myGroup去了。。
嗯哼??说好的人性化呢?TM压根没反应,没用啊?
再试多几次,好吧,我放弃了,反正是没用。
绝望,好吧,问题还是要解决的。
最后在一阵搜寻下,找到了一个办法,请看:
那就是消费者,不要用配置文件配置的方式
细心的话,会发现@KafkaListener 注解,里面有一个containerFactory参数,就是让你指定容器工厂的
动手吧。
新建一个KafkaConsumerConfig类,代码如下,指定了两个容器,也就两个group
分别为kafkaListenerContainerFactory1和kafkaListenerContainerFactory2
import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;@Configuration
public class KafkaConsumerConfig {private String brokers = "192.168.52.130:9092,192.168.52.131:9092,192.168.52.133:9092";private String group1 = "test1";private String group2 = "test2";@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();factory.setConsumerFactory(consumerFactory1());factory.setConcurrency(4);factory.getContainerProperties().setPollTimeout(4000);return factory;}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();factory.setConsumerFactory(consumerFactory2());factory.setConcurrency(4);factory.getContainerProperties().setPollTimeout(4000);return factory;}public Map<String, Object> getCommonPropertis() {Map<String, Object> properties = new HashMap<String, Object>();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");return properties;}public ConsumerFactory<String, String> consumerFactory1() {Map<String, Object> properties = getCommonPropertis();properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);return new DefaultKafkaConsumerFactory<String, String>(properties);}public ConsumerFactory<String, String> consumerFactory2() {Map<String, Object> properties = getCommonPropertis();properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);return new DefaultKafkaConsumerFactory<String, String>(properties);}
}
上面代码中,其实,很多配置项,你也可以直接用@value的方式,从配置文件中读取过来,那么需要修改参数值的时候,就直接更改配置文件就行了,这点相信就不用教了,不懂的网上一搜一堆。
最后,在@KafkaListener 中指定容器名称
@KafkaListener(id="test1",topics = "test-topic", containerFactory="kafkaListenerContainerFactory1")
@KafkaListener(id="test2",topics = "test-topic", containerFactory="kafkaListenerContainerFactory2")
启动,你就会发现,卧槽,还真可以
[ main] xxx : Kafka version : 0.10.1.1
[ main] xxx : Kafka commitId : f10ef2720b03b247
[ main] xxx : Tomcat started on port(s): 82 (http)
[ main] xxx : Started App in 3.913 seconds (JVM running for 4.321)
[ test2-0-C-1] xxx : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group test2.
[ test1-0-C-1] xxx : Discovered coordinator 192.168.52.131:9092 (id: 2147483645 rack: null) for group test1.
至此,就实现了多个customer不同group的功能,亲测有效。
感谢您的阅读。
springboot 集成kafka 实现多个customer不同group相关推荐
- springboot集成kafka及kafka web UI的使用
springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...
- 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例
从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...
- SpringBoot集成Kafka
SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...
- SpringBoot 集成 kafka,基于注解批量消费设置
网上关于SpringBoot 集成kafka的批量消费功能需要手动创建类(这篇文章不错:[弄nèng - Kafka]应用篇(三) -- Springboot整合Kafka(批量消费)_司马缸砸缸了- ...
- SpringBoot集成Kafka消息队列
1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...
- 【无废话】SpringBoot集成Kafka消息队列
0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...
- springboot集成kafka消费手动启动停止
项目场景: 在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 解决分析 KafkaList ...
- SpringBoot集成Kafka低版本和高版本
SpringBoot集成Kafka低版本和高版本 说明 地址 低版本SpringBoot集成Kafka代码 代码 kafka生产者配置 kafka消费者配置 发送消息给kafka的Controller ...
- SpringBoot集成kafka全面实战
本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章 ...
最新文章
- 复习webpack的常用loader
- web开发的java语言步骤_java web开发入门一(servlet和jsp)基于eclispe
- Vue项目中遇到了大文件分片上传的问题
- Lombok的使用方法
- 对996最客观的描述,一叶知秋
- MFC开发IM-第三篇、资源视图--显示在另一个编辑器中打开
- Gtk的entry传递数据到内部程序
- 解析button和input type=”button”的区别
- WordPress中自带的处理AJAX请求的HOOK
- [数据仓库]我理解的数据中台
- 快解析:管家婆辉煌II TOP+异地访问解决方案
- Chives 集群收割机图文说明
- flutter 踩的那些坑 (一) Scheme not starting with alphabetic character
- The Finalless——新学期的scrum
- 正在存储windows支持软件_ibm/lenovoDS3500扩展柜存储管理软件-北京瑞腾世纪科技有限公司...
- 三角形,斜线,表头css实现方法
- 生物特征识别六大技术,你知道多少?
- 名片 - 名片设计的比例
- Mongoose disconnected. Mongoose connection error: MongoError: Authentication failed. (node:1532) Unh
- JavaWeb — 系统结构分析