Kafka 消费者

应用程序使用 KafkaConsumer 向 Kafka 订阅主题,并从订阅的主题上接收消息。

Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。

1个消费者接收4个分区的消息:

2个消费者接收4个分区的消息:

4个消费者接收4个分区的消息:

5个消费者接收4个分区的消息:

如果消费者群组的消费者超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。

两个消费者群组对应一个主题:

当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。

消费者通过向被指派为群组协调器的 broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。消费者会在轮训消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。

当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为"群主"。群主从协调器那里获得群组的成员列表,并负责给每一个消费者分配分区。分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配情况。这个过程会在每次再均衡时重复发生。

Propertites prop = new Propertites();
prop.put("bootstrap.servers", "broker1:9092,borker2:9092");
prop.put("group.id", "CountryCounter");
prop.put("key.deserializer", "org.apache.kafka.common.serialiation.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialiation.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

轮询

消息轮询是消费者 API 的核心。消费者必须持续对 Kafka 进行轮询,否则会被认为已经死亡。

try {while(true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {log.debug("topic = %s, partition = %s, offset = %d, customer = %s, record.topic(), record.partition(), record.offset(), record.key());}}
} finally {consumer.close();
}

在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator,然后加入群组,接受分配的分区。

消费者的配置

fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。

fetch.max.wait.ms

用于指定 broker 的等待时间,默认是500s。

max.partition.fetch.bytes

该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB,也就是说,KafkaConsuer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。

session.timeout.ms

该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s。heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,一般是 session.timeout.ms 的三分之一。

auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况该作何处理,它的默认值是 latest,意思是在偏移量无效的情况下,消费者从最新的记录开始读取数据。另一个值是 earliest,意思是,在偏移量无效的情况下,从起始位置读取分区的记录。

enable.auto.commit

该属性指定了消费者是否自动提交偏移量,默认值是 true。

partition.assignment.strategy

Kafka 有两个默认的分配策略:

  • Range:该策略会把主题的若干个连续的分区分配给消费者。
  • RoundRobin:该策略把主题的所有分区逐个分配给消费者。

默认使用 Range 策略。

max.poll.records

该属性用于控制单次调用 call() 方法能够返回的记录数量。

提交和偏移量

我们把更新分区当前位置的操作叫做提交。

消费者往一个叫做 _consumer_offset 的特殊主体发送消息,消息里包含每个分区的偏移量。

如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会丢失。

最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那么每过5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是5s。

把 auto.commit.offset 设为 false,让应用程序决定何时提交偏移量。使用 comitSync() 将会提交由 poll() 返回的最新偏移量。

while(true) {ConsumerRecords<String, String> records = consuer.poll(100);for (ConsumerRecord<String, String> record : records) {try {consumer.commitSync();} catch (CommitFailedException e) {log.error("commit failed", e);}}
}

在成功提交或碰到无法恢复的错误之前,comitSync() 会一直重试。

获取以上Java高级架构最新视频,欢迎

加入Java进阶架构交流群:142019080。直接点击链接加群。https://jq.qq.com/?_wv=1027&k=5lXBNZ7

Kafka权威指南,Kafka消费者相关推荐

  1. kafka权威指南_Kafka-分区、片段、偏移量

    [分区.片段.偏移量] 1. 每个分区是由多个Segment组成,当Kafka要写数据到一个partition时,它会写入到状态为active的segment中.如果该segment被写满,则一个新的 ...

  2. 送5本《Kafka权威指南》第二版

    文末送书 科学家们每一次发生分歧都是因为掌握的数据不够充分.所以,我们可以先就获取哪一类数据达成一致,只要获取了数据,问题也就迎刃而解了.要么我是对的,要么你是对的,要么我们都是错的,然后继续. -- ...

  3. 《kafka权威指南》之可靠的数据传输

    文章目录 可靠性保证 Kafka做出的四个保证 kafka可靠性保证的核心 kafka的复制机制 不恰当的垃圾回收配置(**) broker配置 复制系数1 不完全的首领选举2 最少同步副本3 可靠的 ...

  4. kafka权威指南中文翻译之一

    kafka 初见 (Meet Kafka) 在讨论Kafka 细节之前,有必要先来了解下消息发布/ 订阅的概念,这个概念非常重要. kafka 中的数据单位是message .对比数据库来说,可以把消 ...

  5. 初识Kafka-概念速览|安装与配置—《Kafka权威指南》笔记

    文章目录 初识Kafka 消息 批次 模式 主题与分区 生产者和消费者 broker和集群 保留消息 多集群 Kafka数据生态 安装与配置 安装 Java 安装 Zookeeper Zookeepe ...

  6. 《kafka权威指南》学习记录1

    本博客只作为自己学习的一个记录. 一.kafka生产者 1.kafka生产者组件 main线程 send线程 producerrecord对象 序列化器 分区器. Producerrecord对象格式 ...

  7. 【Kafka】《Kafka权威指南》入门

    发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性.数据(消息)的发送者(发布者)不会直接把消息发送给接收 ...

  8. 《Kafka权威指南》记录

    生产者 生产流程 32页 生产者创建 Kafka生产者需要三个必须参数:broker地址清单,key和value的序列化方式 (如StringSerializer) 生产者发送 ACKS acks 参 ...

  9. Kafka 实战指南——Kafka 消费者配置

    文章目录 1. 消费位点提交 2. 消费位点重置 3. session 超时和心跳监测 4. 拉取大消息 5. 拉取公网 6. 消息重复和消费幂等 7. 消费失败 8. 消费延迟 9. 消费阻塞以及堆 ...

  10. 《Kafka权威指南》——问题1——onParitionsAssigned

    四.Kafka消费者--从Kafka读取数据 4.8 从特定偏移量处开始处理数据 4.7节中说到,在调用subcribe()方法时传进去一个ConsumerRebalanceListener实例,可以 ...

最新文章

  1. Python中时间戳与时间字符串相互转换
  2. 【安全牛学习笔记】思路、身份认证方法、密码破解方法、字典
  3. mysql数据库的目录_了解MySQl数据库目录
  4. [Web Chart系列之五] 1. 实战draw2d 之总体介绍
  5. 现在的孩子太厉害了阿(老朱语:长江后浪推前浪)
  6. mysqldump实现数据备份及灾难恢复
  7. XE5 搭建DataSnap服务
  8. 关于希捷维修日志中 FAIL Servo Op=0100 Resp=0003 错误信息的解读
  9. Linux环境下命令行下载DM8
  10. 软件开发中如何评估工作量
  11. C语言把十进制转换为二进制数的方法和示例
  12. 自动化运维之k8s——Helm、普罗米修斯、EFK日志管理、k8s高可用集群(未完待续)
  13. 易语言选单选框分组框API全选取消
  14. 使用samba服务在Linux与Windows直接共享文件夹,海康威视网络摄像头录像视频存储到ubuntu服务器
  15. 矩阵篇(四)-- 实随机向量的相关矩阵、协方差矩阵、相关系数
  16. Git教学资源,安装,关联账号,创建/关联/克隆库,版本回退,管理修改基本指令
  17. 不死的LYM NOIP模拟 二分+状压DP
  18. 分布式任务调度平台之 xxl-job配置部署
  19. Python---陈氏绘制雷达图
  20. 华为1+X网络系统建设与运维(中级)——链路聚合

热门文章

  1. AD元件库和封装库转换成KiCAD库方法
  2. (笔记)《游戏脚本高级编程》——第1章 脚本编程概论
  3. 基于web的电影院售票系统的设计与实现
  4. [Android]使用ActivityGroup来切换Activity和Layout
  5. 利用Excel出库明细表批量生成送货单
  6. 计算机应用excel题,计算机应用操作练习题-Excel
  7. 操作系统实验报告(四)文件系统
  8. Jenkins 定时构建和Poll SCM的区别
  9. CSS综合案例——淘宝焦点图(轮播图)布局及网页布局总结
  10. linux ps流程,Linux下PS命令详解 (转)