Kafka多个消费者监听消费同一个Topic主题
多个消费者监听消费同一个Topic主题
- 一、需求介绍
- 二、@kafkaListener注解
- 三、代码实现
- 3.1 第一个消费者
- 3.2 第二个消费者
- 3.3 生产者
- 四、测试
一、需求介绍
有一个Topic:hw_data 有3个分区 3个副本
组:hw-data-group
将这个主题的消息分发给两个(或者多个)消费者消费,(不能消费相同的消息)
二、@kafkaListener注解
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })@Retention(RetentionPolicy.RUNTIME)@MessageMapping@Documented@Repeatable(KafkaListeners.class)public @interface KafkaListener {/*** 消费者的id,当GroupId没有被配置的时候,默认id为GroupId*/String id() default "";/*** 监听容器工厂,当监听时需要区分单数据还是多数据消费需要配置containerFactory 属性*/String containerFactory() default "";/*** 需要监听的Topic,可监听多个,和 topicPattern 属性互斥*/String[] topics() default {};/*** 需要监听的Topic的正则表达。和 topics,topicPartitions属性互斥*/String topicPattern() default "";/*** 可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥*/TopicPartition[] topicPartitions() default {};/***侦听器容器组 */String containerGroup() default "";/*** 监听异常处理器,配置BeanName*/String errorHandler() default "";/*** 消费组ID */String groupId() default "";/*** id是否为GroupId*/boolean idIsGroup() default true;/*** 消费者Id前缀*/String clientIdPrefix() default "";/*** 真实监听容器的BeanName,需要在 BeanName前加 "__"*/String beanRef() default "__listener";
}
三、代码实现
3.1 第一个消费者
package com.dataWarehouseOss.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;/*** @author :LiuShihao* @date :Created in 2020/9/16 4:15 下午* @desc :* containerGroup:侦听器容器组* topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥*/
@Slf4j
@Component
public class Consumer1 {@KafkaListener(containerGroup="first-group",topicPartitions = {@TopicPartition(topic = "first",partitions = {"0","1"})})public void m1(ConsumerRecord<String, String> record){log.info("分区0,1 :"+record.topic()+" : "+record.value());}
}
3.2 第二个消费者
package com.dataWarehouseOss.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;/*** @author :LiuShihao* @date :Created in 2020/9/16 4:15 下午* @desc :* containerGroup:侦听器容器组* topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥*/
@Slf4j
@Component
public class Consumer2 {@KafkaListener(containerGroup="first-group",topicPartitions = {@TopicPartition(topic = "first",partitions = {"2"})})public void m1(ConsumerRecord<String, String> record){log.info("分区2 :"+record.topic()+" : "+record.value());}
}
3.3 生产者
@Component
@Slf4j
public class SendKafkaToFirst {@AutowiredKafkaTemplate kafkaTemplate;public static final String TOPIC = "first";@Scheduled(cron = "0 */2 * * * ?")public void sendKafka(){log.info("---====定时任务执行了:向first发送10条数据====---");for (int i = 1; i <=10 ; i++) {kafkaTemplate.send(TOPIC,i+"");log.info("---==="+i+"===---");}}
}
四、测试
创建first
主题 、 三个分区 、 三个副本
向first
主题中发送10
条消息,会到first
的三个分区中
可以看到,我们发送了10条消息到 first 主题的三个分区,
然后第一个消费者消费的 0和1 分区的消息,第二个分区 消费的是 2 分区的消息。
通过日志显示,消息并没有被重复消费。
Kafka多个消费者监听消费同一个Topic主题相关推荐
- rabbitmq多个消费者监听一个队列_RabbitMQ的六种工作模式
一.基于erlang语言:是一种支持高并发的语言 RabbitMQ的六种工作模式: 1.1 simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息 ...
- SpringCloudStream整合Kafka,解决两个通道对应同一个topic报错问题。
总结 一个通道(如:evad_input)只能唯一对应一个topic,否则会报错 消费者组则可以被多个通道共同使用 报错日志 2022-05-25 14:46:03.697 ERROR 17108 - ...
- 【Flink】Flink 单个任务 多个流的消费同一个topic的时候其中一个流卡死 不消费
文章目录 1.场景1 1.1.概述 1.2.新思路 1.3.一个怀疑 2. 场景 1.场景1 1.1.概述 本次文章主要参考: 场景再现5 本次场景更加奇怪.因为一个规则不触发数据了,我们将现场的数据 ...
- sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案
下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...
- 单个进程监听多个端口及多个进程监听同一个端口
单个进程监听多个端口 单个进程创建多个 socket 绑定不同的端口,TCP, UDP 都行 多个进程监听同一个端口(multiple processes listen on same port) 方 ...
- Kafka生产者与消费者详解
什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...
- java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明
本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...
- 聊聊RabbitMq动态监听这点事
很长时间没有分享过学习心得了,看了下发布记录,最后一篇文章的时间都在2020-12-10年了,今天抽时间整理下一个很早就想整理的技术分享.顺便说句题外话,因为我一直没时间整理,再加上开发的小伙伴对Mq ...
- RabbitMQ消息监听(多种模式-fanout/topic)
1.rabbitmq消息监听,兼容多种模式的消息,fanout/topic等模式 MQ消息配置监听: package com.test.ddyin.conf;import java.util.Hash ...
最新文章
- 怎么优雅的处理Java异常?
- Android中图表AChartEngine学习使用与例子
- LTE中QPSK、16QAM、64QAM
- 在上位计算机控制时不能将s7-200,PLC控制统编程题库.doc
- IDOCALE常用tcode
- Dynamics AX 2012 R2 外部程序运行在没有AD的环境(如PDA) 调用AX服务
- oracle 清理跟踪文件trc,trm
- php js对话框,JavaScript_js弹出框、对话框、提示框、弹窗实现方法总结(推荐),一、JS的三种最常见的对话框- phpStudy...
- SpringCloud Consul Config 配置中心(一)
- mySQL常用操作及基础知识
- 精通使用K米短信教程
- 服务器已爆满 请前往最新区服,斗破苍穹手游服务器达到上限不能创建角色怎么办_斗破苍穹手游服务器达到上限不能创建角色解决方法介绍_游戏吧...
- 智能家居新体验:什么样的数据让语音交互更智慧
- 微信小程序创建一个空白页面
- domino服务器库文件,Domino 服务器设置程序 涉及的文件
- JVM常见命令行及图形工具
- STM32 keyboard USB键盘功能的实现
- 《APUE》在Ubuntu上使用apue.h
- 安卓微信小程序https抓包
- 《深入理解Android 卷III》第七章 深入理解SystemUI(完整版)
热门文章
- JS 封装一个判断闰年平年的方法 aa(nian)
- 电源知识——LDO线性电源、开关电源(基础)
- 12.5米分辨率DEM
- 美通企业日报 | 内容质量是亚太媒体最重视的指标;“豆蔻青”将成2020年度色彩...
- mac通过跳板机对服务器上传下载文件
- 英语老师唱歌软件测试,【出彩教育人】课上打电话,课下能K歌,这样的英语课给我来一打!...
- android_usb_msd,MSD6A828安卓智能电视主板
- html 自动 浏览器窗口,一种html文件实现显示浏览器窗口内容的方法
- maya python教程下载_[转载]技术教程-MayaPython教程四之实战篇
- linux操作系统---信号