多个消费者监听消费同一个Topic主题

  • 一、需求介绍
  • 二、@kafkaListener注解
  • 三、代码实现
    • 3.1 第一个消费者
    • 3.2 第二个消费者
    • 3.3 生产者
  • 四、测试

一、需求介绍

有一个Topic:hw_data 有3个分区 3个副本
组:hw-data-group
将这个主题的消息分发给两个(或者多个)消费者消费,(不能消费相同的消息)

二、@kafkaListener注解

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })@Retention(RetentionPolicy.RUNTIME)@MessageMapping@Documented@Repeatable(KafkaListeners.class)public @interface KafkaListener {/*** 消费者的id,当GroupId没有被配置的时候,默认id为GroupId*/String id() default "";/*** 监听容器工厂,当监听时需要区分单数据还是多数据消费需要配置containerFactory      属性*/String containerFactory() default "";/*** 需要监听的Topic,可监听多个,和 topicPattern 属性互斥*/String[] topics() default {};/*** 需要监听的Topic的正则表达。和 topics,topicPartitions属性互斥*/String topicPattern() default "";/*** 可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥*/TopicPartition[] topicPartitions() default {};/***侦听器容器组 */String containerGroup() default "";/*** 监听异常处理器,配置BeanName*/String errorHandler() default "";/*** 消费组ID */String groupId() default "";/*** id是否为GroupId*/boolean idIsGroup() default true;/*** 消费者Id前缀*/String clientIdPrefix() default "";/*** 真实监听容器的BeanName,需要在 BeanName前加 "__"*/String beanRef() default "__listener";
}

三、代码实现

3.1 第一个消费者

package com.dataWarehouseOss.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;/*** @author :LiuShihao* @date :Created in 2020/9/16 4:15 下午* @desc :* containerGroup:侦听器容器组* topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥*/
@Slf4j
@Component
public class Consumer1 {@KafkaListener(containerGroup="first-group",topicPartitions = {@TopicPartition(topic = "first",partitions = {"0","1"})})public void m1(ConsumerRecord<String, String> record){log.info("分区0,1 :"+record.topic()+" : "+record.value());}
}

3.2 第二个消费者

package com.dataWarehouseOss.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;/*** @author :LiuShihao* @date :Created in 2020/9/16 4:15 下午* @desc :* containerGroup:侦听器容器组* topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥*/
@Slf4j
@Component
public class Consumer2 {@KafkaListener(containerGroup="first-group",topicPartitions = {@TopicPartition(topic = "first",partitions = {"2"})})public void m1(ConsumerRecord<String, String> record){log.info("分区2 :"+record.topic()+" : "+record.value());}
}

3.3 生产者

@Component
@Slf4j
public class SendKafkaToFirst {@AutowiredKafkaTemplate kafkaTemplate;public static final String  TOPIC = "first";@Scheduled(cron = "0 */2 * * * ?")public void sendKafka(){log.info("---====定时任务执行了:向first发送10条数据====---");for (int i = 1; i <=10 ; i++) {kafkaTemplate.send(TOPIC,i+"");log.info("---==="+i+"===---");}}
}

四、测试

创建first主题 、 三个分区 、 三个副本
first主题中发送10条消息,会到first 的三个分区中

可以看到,我们发送了10条消息到 first 主题的三个分区,
然后第一个消费者消费的 0和1 分区的消息,第二个分区 消费的是 2 分区的消息。
通过日志显示,消息并没有被重复消费。

Kafka多个消费者监听消费同一个Topic主题相关推荐

  1. rabbitmq多个消费者监听一个队列_RabbitMQ的六种工作模式

    一.基于erlang语言:是一种支持高并发的语言 RabbitMQ的六种工作模式: 1.1 simple简单模式 消息产生着§将消息放入队列 消息的消费者(consumer) 监听(while) 消息 ...

  2. SpringCloudStream整合Kafka,解决两个通道对应同一个topic报错问题。

    总结 一个通道(如:evad_input)只能唯一对应一个topic,否则会报错 消费者组则可以被多个通道共同使用 报错日志 2022-05-25 14:46:03.697 ERROR 17108 - ...

  3. 【Flink】Flink 单个任务 多个流的消费同一个topic的时候其中一个流卡死 不消费

    文章目录 1.场景1 1.1.概述 1.2.新思路 1.3.一个怀疑 2. 场景 1.场景1 1.1.概述 本次文章主要参考: 场景再现5 本次场景更加奇怪.因为一个规则不触发数据了,我们将现场的数据 ...

  4. sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案

    下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...

  5. 单个进程监听多个端口及多个进程监听同一个端口

    单个进程监听多个端口 单个进程创建多个 socket 绑定不同的端口,TCP, UDP 都行 多个进程监听同一个端口(multiple processes listen on same port) 方 ...

  6. Kafka生产者与消费者详解

    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...

  7. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  8. 聊聊RabbitMq动态监听这点事

    很长时间没有分享过学习心得了,看了下发布记录,最后一篇文章的时间都在2020-12-10年了,今天抽时间整理下一个很早就想整理的技术分享.顺便说句题外话,因为我一直没时间整理,再加上开发的小伙伴对Mq ...

  9. RabbitMQ消息监听(多种模式-fanout/topic)

    1.rabbitmq消息监听,兼容多种模式的消息,fanout/topic等模式 MQ消息配置监听: package com.test.ddyin.conf;import java.util.Hash ...

最新文章

  1. 怎么优雅的处理Java异常?
  2. Android中图表AChartEngine学习使用与例子
  3. LTE中QPSK、16QAM、64QAM
  4. 在上位计算机控制时不能将s7-200,PLC控制统编程题库.doc
  5. IDOCALE常用tcode
  6. Dynamics AX 2012 R2 外部程序运行在没有AD的环境(如PDA) 调用AX服务
  7. oracle 清理跟踪文件trc,trm
  8. php js对话框,JavaScript_js弹出框、对话框、提示框、弹窗实现方法总结(推荐),一、JS的三种最常见的对话框- phpStudy...
  9. SpringCloud Consul Config 配置中心(一)
  10. mySQL常用操作及基础知识
  11. 精通使用K米短信教程
  12. 服务器已爆满 请前往最新区服,斗破苍穹手游服务器达到上限不能创建角色怎么办_斗破苍穹手游服务器达到上限不能创建角色解决方法介绍_游戏吧...
  13. 智能家居新体验:什么样的数据让语音交互更智慧
  14. 微信小程序创建一个空白页面
  15. domino服务器库文件,Domino 服务器设置程序 涉及的文件
  16. JVM常见命令行及图形工具
  17. STM32 keyboard USB键盘功能的实现
  18. 《APUE》在Ubuntu上使用apue.h
  19. 安卓微信小程序https抓包
  20. 《深入理解Android 卷III》第七章 深入理解SystemUI(完整版)

热门文章

  1. JS 封装一个判断闰年平年的方法 aa(nian)
  2. 电源知识——LDO线性电源、开关电源(基础)
  3. 12.5米分辨率DEM
  4. 美通企业日报 | 内容质量是亚太媒体最重视的指标;“豆蔻青”将成2020年度色彩...
  5. mac通过跳板机对服务器上传下载文件
  6. 英语老师唱歌软件测试,【出彩教育人】课上打电话,课下能K歌,这样的英语课给我来一打!...
  7. android_usb_msd,MSD6A828安卓智能电视主板
  8. html 自动 浏览器窗口,一种html文件实现显示浏览器窗口内容的方法
  9. maya python教程下载_[转载]技术教程-MayaPython教程四之实战篇
  10. linux操作系统---信号