Kafka消费者订阅方式

  • 1、指定主题消费
  • 2、指定分区消费
  • 3、取消订阅
  • 4、总结

Kafka为消费者提供了三种类型的订阅消费方式:订阅主题集合、正则表达式订阅主题、订阅指定主题的分区集合。三种方式只能使用其中一种。

1、指定主题消费

一个消费者可以使用KafkaConsumer提供的subscribe()方法订阅一个或多个主题,订阅主题集合和正则表达式订阅主题都使用此方法实现的。下面两种方式都可以订阅topic_1120主题。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("topic_1120"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//正则表达式.*代表后续0个或者多个任意字符。
consumer.subscribe(Pattern.compile("topic.*"));

订阅主题在源码中由4个方法重载实现,其中两个带listener的方法是可以自定义Rebalance重平衡的监听类。

@Override
public void subscribe(Collection<String> topics) {subscribe(topics, new NoOpConsumerRebalanceListener());
}@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {//省略源码
}@Override
public void subscribe(Pattern pattern) {subscribe(pattern, new NoOpConsumerRebalanceListener());
}@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {//省略源码
}

2、指定分区消费

消费者指定分区消费是通过KafkaConsumer提供的assign()方法实现的,assign()方法入参为Collection, 其中TopicPartition有2个属性, topic和partition, 分区从0开始编号。使用assign()方法订阅指定主题test_1120分区0的消息。

/订阅指定分区
consumer.assign(Collections.singleton(new TopicPartition("topic_1120", 0)));

3、取消订阅

取消订阅调用unsubscribe()方法。

consumer.unsubscribe();

4、总结

subscribe()具有自动重平衡的功能,来实现消费负载均衡和故障自动转移,而assign()不具备这种功能。

Kafka消费者订阅方式相关推荐

  1. Kafka消费者消费方式

    consumer消费方式 pull模式,consumer从Broker中主动拉取数据 pull模式不足之处: 如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据.针对这一点,Kafka ...

  2. 深入分析Kafka架构(三):消费者消费方式、三种分区分配策略、offset维护

    本文目录 一.前言 二.消费者消费方式 三.分区分配策略 3.1.分配分区的前提条件 3.2.Range分配策略 3.3.RoundRobin分配策略 3.4.Sticky分配策略 四.offset维 ...

  3. Kafka学习-----Kafka消费者Consumer:消费方式,分区分配策略,RangeRoundRobin

    目录 一.消费方式 二.消费者的分配模式 1.分配时机? 2.Range策略 2.RoundRobin 策略 三.代码解释 RangeAssignor: RoundRobinAssignor 一.消费 ...

  4. graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四)

    graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四) 问题背景 graylog+kafka+zookeeper(单机测 ...

  5. Kafka消费者详解

    一.基本概念 1.消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息.假设有一个T1主题,该主题有4个分区:同时我们有一个消费组G ...

  6. kafka 同步提交 异步_极限MQ (5) Kafka 消费者

    要想知道如何从 Kafka 读取消息,需要先了解消费者和消费者群组的概念. 假设我们有一个应用程序需要从 Kafka 主题读取消息井验证这些消息,然后再把它们保存起来.应用程序需要创建一个消费者对象, ...

  7. Kafka消费者APi

    Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区.也和服务器交互,平衡均衡消费者. public class KafkaConsumer ...

  8. kafka消费者如何读同一生产者消息_Kafka入门之生产者消费者

    一.Kafka安装与使用 ( kafka介绍     ) 1. 下载Kafka 2. 安装 Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka ...

  9. kafka消费者开发方式小结

    [README] 1, 本文总结了 kafka消费者开发方式: 2, 本文使用的是最新的kafka版本 3.0.0: [1] kafka消费则 [1.1]消费者与消费者组 1)消费者: 应用程序需要创 ...

最新文章

  1. 关于牛客网运行超时的原因分析
  2. UNIX网络编程笔记(4):简单的回射程序
  3. 如何清空android ListView控件的内容
  4. Python库:Python OS库
  5. php preg_split,php汉字截取函数_preg_split()
  6. 循环不变式(loop invariant)
  7. RAC动态资源(DRM)管理介绍
  8. SpringBoot 与 Kotlin 完美交融
  9. table表格表头单元格添加斜线
  10. 《python大战机器学习》勘误
  11. hive实战——谷粒影音
  12. mysql 文本类型 深度解析
  13. 无人驾驶不听指挥,交警该怎么办?
  14. 普通电脑可以装苹果系统吗?Windows电脑装Mac系统
  15. 【安卓开发】android studio 学习入门篇
  16. 有了 HTTP 协议,为什么还需要 Websocket?
  17. 5G NR Rel16 两步接入/2-step RACH
  18. 用什么软件测试光纤稳定性,工欲善其事,谈谈光纤的几种常用工具用途及使用方法!...
  19. 解决ubuntu的wifi连接不稳定
  20. Anaconda环境下tensorflow1.12.0保姆式安装及相关奶妈级配置

热门文章

  1. 如何用宏定义的方式实现MAX(A,B),MAX(A,B,C),MAX(A,B,C,D)
  2. iTween之iTweenPath的使用
  3. 2022年金砖国家职业技能大赛(决赛)网络空间安全赛项 | 浙江赛区选拔赛 任务书
  4. java crumb_vuex中,我在index.vue组件设置的值,为什么在另外一个组件crumb.vue获取不到...
  5. VB.NET自制动态桌面
  6. 腾讯2018第一季度财报:微信用户超10亿,线下零售红利已到来
  7. Android HOME键那些事
  8. 如何在Excel中使用公式将生日自动转换成星座?
  9. Dichotomy二分法学习笔记
  10. 已安装各个模块,程序仍报错:ModuleNotFoundError: No module named 'numpy'