springboot正常集成kafka

这个网上很多资料都有些集成,我就不浪费太多篇幅和时间了,笔者找了篇还算很容易理解的博客,自行学习

https://blog.csdn.net/tzs_1041218129/article/details/78988439

多个customer不同group

刚那篇博客,很容易就能上手,那么问题来了,group-id在配置文件中设置,我要是想要有不同的groupid怎么办?

莫慌,看看@KafkaListener 注解,里面就有group属性可以设置啊,感觉还是比较人性化的吗?

@KafkaListener(group = "= =!!!")

马上试试。。。。

配置文件设置consumer的group-id为myGroup。

@Component
public class KafkaConsumer {@KafkaListener(id = "test1", topics="test-topic", group = "test1")public void processMessage(String content) throws InterruptedException {System.out.println("收到消息 1=>" + content);}@KafkaListener(id = "test2", topics="test-topic", group = "test2")public void processMessage1(String content) throws InterruptedException {System.out.println("收到消息 2=>" + content);}
}

启动,额,控制台打印

[           main] xxx    : Kafka version : 0.10.1.1
[           main] xxx    : Kafka commitId : f10ef2720b03b247
[           main] xxx    : Tomcat started on port(s): 82 (http)
[           main] xxx    : Started App in 3.653 seconds (JVM running for 4.119)
[    test2-0-C-1] xxx    : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group myGroup.
[    test1-0-C-1] xxx    : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group myGroup.

最后几行???????我猜的没错的话,两个listener都被归属到myGroup去了。。

嗯哼??说好的人性化呢?TM压根没反应,没用啊?

再试多几次,好吧,我放弃了,反正是没用。

绝望,好吧,问题还是要解决的。

最后在一阵搜寻下,找到了一个办法,请看:

那就是消费者,不要用配置文件配置的方式

细心的话,会发现@KafkaListener 注解,里面有一个containerFactory参数,就是让你指定容器工厂的

动手吧。

新建一个KafkaConsumerConfig类,代码如下,指定了两个容器,也就两个group

分别为kafkaListenerContainerFactory1和kafkaListenerContainerFactory2

import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;@Configuration
public class KafkaConsumerConfig {private String brokers = "192.168.52.130:9092,192.168.52.131:9092,192.168.52.133:9092";private String group1 = "test1";private String group2 = "test2";@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();factory.setConsumerFactory(consumerFactory1());factory.setConcurrency(4);factory.getContainerProperties().setPollTimeout(4000);return factory;}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();factory.setConsumerFactory(consumerFactory2());factory.setConcurrency(4);factory.getContainerProperties().setPollTimeout(4000);return factory;}public Map<String, Object> getCommonPropertis() {Map<String, Object> properties = new HashMap<String, Object>();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");return properties;}public ConsumerFactory<String, String> consumerFactory1() {Map<String, Object> properties = getCommonPropertis();properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);return new DefaultKafkaConsumerFactory<String, String>(properties);}public ConsumerFactory<String, String> consumerFactory2() {Map<String, Object> properties = getCommonPropertis();properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);return new DefaultKafkaConsumerFactory<String, String>(properties);}
}

上面代码中,其实,很多配置项,你也可以直接用@value的方式,从配置文件中读取过来,那么需要修改参数值的时候,就直接更改配置文件就行了,这点相信就不用教了,不懂的网上一搜一堆。

最后,在@KafkaListener 中指定容器名称

@KafkaListener(id="test1",topics = "test-topic", containerFactory="kafkaListenerContainerFactory1")
@KafkaListener(id="test2",topics = "test-topic", containerFactory="kafkaListenerContainerFactory2")

启动,你就会发现,卧槽,还真可以

[           main] xxx     : Kafka version : 0.10.1.1
[           main] xxx     : Kafka commitId : f10ef2720b03b247
[           main] xxx     : Tomcat started on port(s): 82 (http)
[           main] xxx     : Started App in 3.913 seconds (JVM running for 4.321)
[    test2-0-C-1] xxx     : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group test2.
[    test1-0-C-1] xxx     : Discovered coordinator 192.168.52.131:9092 (id: 2147483645 rack: null) for group test1.

至此,就实现了多个customer不同group的功能,亲测有效。

感谢您的阅读。

springboot 集成kafka 实现多个customer不同group相关推荐

  1. springboot集成kafka及kafka web UI的使用

    springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...

  2. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  3. SpringBoot集成Kafka

    SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...

  4. SpringBoot 集成 kafka,基于注解批量消费设置

    网上关于SpringBoot 集成kafka的批量消费功能需要手动创建类(这篇文章不错:[弄nèng - Kafka]应用篇(三) -- Springboot整合Kafka(批量消费)_司马缸砸缸了- ...

  5. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  6. 【无废话】SpringBoot集成Kafka消息队列

    0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...

  7. springboot集成kafka消费手动启动停止

    项目场景: 在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 解决分析 KafkaList ...

  8. SpringBoot集成Kafka低版本和高版本

    SpringBoot集成Kafka低版本和高版本 说明 地址 低版本SpringBoot集成Kafka代码 代码 kafka生产者配置 kafka消费者配置 发送消息给kafka的Controller ...

  9. SpringBoot集成kafka全面实战

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章 ...

最新文章

  1. 复习webpack的常用loader
  2. web开发的java语言步骤_java web开发入门一(servlet和jsp)基于eclispe
  3. Vue项目中遇到了大文件分片上传的问题
  4. Lombok的使用方法
  5. 对996最客观的描述,一叶知秋
  6. MFC开发IM-第三篇、资源视图--显示在另一个编辑器中打开
  7. Gtk的entry传递数据到内部程序
  8. 解析button和input type=”button”的区别
  9. WordPress中自带的处理AJAX请求的HOOK
  10. [数据仓库]我理解的数据中台
  11. 快解析:管家婆辉煌II TOP+异地访问解决方案
  12. Chives 集群收割机图文说明
  13. flutter 踩的那些坑 (一) Scheme not starting with alphabetic character
  14. The Finalless——新学期的scrum
  15. 正在存储windows支持软件_ibm/lenovoDS3500扩展柜存储管理软件-北京瑞腾世纪科技有限公司...
  16. 三角形,斜线,表头css实现方法
  17. 生物特征识别六大技术,你知道多少?
  18. 名片 - 名片设计的比例
  19. Mongoose disconnected. Mongoose connection error: MongoError: Authentication failed. (node:1532) Unh
  20. JavaWeb — 系统结构分析

热门文章

  1. python热更新原理_Python功能点实现:数据热更新
  2. kit_00_001-为创建新的虚拟机做准备
  3. 快端午了,用Python画一盘粽子送给你
  4. vue2.0引入icon.styl不断报错
  5. linux7 etc下的grub2,Centos7安装 grub2 配置技巧:改变启动顺序
  6. iPhone手机必备宝藏APP
  7. KiCad 5.1.6 泪滴插件安装与使用
  8. 关于电脑网络显示红叉的解决方法
  9. 如何做内网穿透,在家里连回公司服务器做操作
  10. 深入理解SHA系列加密算法