要想知道如何从 Kafka 读取消息,需要先了解消费者和消费者群组的概念。

假设我们有一个应用程序需要从 Kafka 主题读取消息井验证这些消息,然后再把它们保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息,然后验证消息井保存结果。

过了一阵子,生产者往主题写入消息的速度超过了应用程序验证数据的速度,这个时候该怎么办?如果只使用单个消费者处理消息,应用程序会永远跟不上消息生成的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的主题写入消息一样,我们也可以使用多个消费者从同一个主题读取消息,对消息进行分流

Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者只接收主题的部分分区的消息。

  • 假设主题 T1 有4 个分区,我们创建了消费者 C1 ,它是群组 G1 里唯一的消费者,我们用它订阅主题 T1 。那么消费者 C1 将收到主题 T1 全部 4 个分区的消息。
  • 如果在群组 G1 里新增一个消费者 C2 ,那么每个消费者将分别从两个分区接收消息。消费者 C1 接收分区 1 和分区 2 的消息,消费者 C2 接收分区 3 和分区 4 的消息。
  • 如果群组有 4 个消费者,那么每个消费者可以分配到一个分区。
  • 如果我们往群组里添加更多的消费者,甚至超过主题的分区数量,那么多出来的消费者就会被 闲置,不会接收到任何消息。

往群组里增加消费者是横向伸缩消费能力的主要方式。

Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担 载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。

我们有必要为主题创建大量的分区,这样在负载增长时可以加入更多的消费者。不过不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。

除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同主题读取数据的情况。

实际上, Kafka 设计的主要目标之一 ,就是要让 Kafka 主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息, 而不只是其中的一部分。那么只要保证每个应用程序有自己的消费者群组,就可以让它们获取到主题所有的消息。不同于传统的消息系统,横向伸缩 Kafka 消费者和消费者群组并不对性能造成负面影响。

在上面的例子里,如果新增一个只包含一个消费者的群组 G2 ,那么这个消费者将从主题 T1上接收所有的消息,与群组 G1之间互不影响。群组 G2 可以增加更多的消费者,每个消费者可以消费若干个分区,就像群组 G1 那样。

简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组, 然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理部分消息。


然而,分区并非如此简单。当一个群组的消费者增加和减少时,当这个主题的分区数量变化时,有很多问题要处理。

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡

再均衡非常重要, 它为群组带来了高可用性和伸缩性(可以放心地添加或移除梢费者)。

不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组小段时间的不可用。

另外,当分区被重新分配给一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢程序。


分配分区是怎样进行的?

当消费者要加入群组时,它会向群组协调器发送 Join Group 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每一个消费者分配分区。

消费者通过向被指派为群组协调器的 broker (不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息 (为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。


创建消费者对象与创建生产者对象非常相似。

唯一不同的属性是 group.i.d ,而且甚至不是必填的,不过我们现在姑且认为它是必需 。它指定了属于哪 个消费者群组。创建不属于任何一个群组的消费者也是可以的,只是这样不太常见。订阅主题非常简单,甚至支持正则。

consumer .subscribe( "test.*" );


轮询

轮训是消费者 API 的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅 了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,使用者只需要使用一组简单的 API 来处理从分区返回的数据即可。消费者代码的主要部分如下所示:

try{while (true) {ConsumerRecords<String String> records = consumer.poll(100);for(ConsumerRecord<String String> record :records){int updatedCount = 1;if(custCountryMap.countainsValue(record.value()) {updatedCount = custCountryMap.get (record.value() + 1);}custCountryMap.put(record.value(), updatedCount)System.out.println(custCountryMap);} finally {consumer.close(); }

  • 一个无线循环。
  • poll 方法非常重要。就像鲨鱼停止移动就会死掉一样,消费者必须持续对 Kafka 进行轮询,否则会被认为己经死亡 ,其负责的分区会被移交给群组里的其他消费者。传给 poll 方法 参数是一个超时时间,用于控制 poll 方法的阻塞时间。如果该参数被设为 0, poll 会立即返回。
  • poll 返回的是记录列表。每条记录都包含了记录所属主题的信息、分区的信息、分区偏移量 ,以及记录的键值对。一般会遍历这个列表 ,逐条处理这些记录。
  • 一般落地的处理结果就是结果保存起来或者对已有的记录进行更新,处理过程也随之结束。
  • 在退出应用程序之前会触发一次再均衡,而不是等待群组协调器发现它不再发送心跳井认定它已死亡, 因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。

在第一次调用新消费者的 poll 方法时,它会负责查找 GroupCoordinator 然后加入群组,接受分配分区。 如果发生了再均衡,整个过程也会在轮询期间进行 。当然 ,心跳也会在轮询里发迭出去的。

注意,一个消费者活在一个独立的线程里。


提交和偏移量

Kafka 不会像其它队列那样需要得到消费者的确认,这是 Kafka 的独特之处。

我们把更新分区当前位置的操作叫作提交

那么消费者是如何提交偏移量的呢?

消费者往一个叫作 _consumer_offset d 特殊主题发送消息,消息里包含每个分区的偏移量。

如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果发生崩坏或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。

为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

但是,

如果提交的偏移量小于客户端处理的最后一个消息的偏移量 ,那么处于两个偏移量之间的消息就会被重复处理

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

处理偏移量的方式对客户端会有很大的影响。


自动提交

最简单的提交方式是让悄费者自动提交偏移量。

如果 enable.auto.commit 被设为 true ,那么每过5s,消费者会自动把从 poll 方法接收到的最大偏移量提交上去。自动提交也是在轮询里进行的。

可是,假设使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡。再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 3s ,所以在这 3s 内到达的消息会被重复处理。

自动提交不能避免重复消息。


开发者同步提交当前偏移量

把 enable.auto.commit 设为 false ,让应用程序决定时提交偏移量。使用 commit.Sync() 提交偏移量。

commit.Sync() 将会提交由 poll 返回的最新偏移量。

如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。


异步提交

手动提交有一个不足之处在对提交请求作出回应之前,应用程序会阻塞,这会限制应用程序的吞吐量。

可以通过降低提交频率来提升吞吐,但如果发生了再均衡, 会增加重复消息的数量。

可以使用异步提交。只管发送提交请求,无需等待 broker 的响应。

commit.Async();

在成功提交或碰到无怯恢复的错误之前, 同步方法会一直重试,但是异步方法不会。


同步和异步组合提交

般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题。

但如果这是发生在关闭消费者或均衡前的最后一次提交,就要确保能够提交成功。 在消费者关闭前一般会组合使用两种方式。

kafka 同步提交 异步_极限MQ (5) Kafka 消费者相关推荐

  1. kafka 同步提交 异步_腾讯游戏工程师分享:简单理解 Kafka 的消息可靠性策略

    作者:hymanzhang,腾讯 IEG 运营开发工程师 背景 部门的开发同学最近在开发一个活动的过程中,需要关注大量的应用后台逻辑,捕捉各种事件的触发.在设计时打算采用 kafka 消息队列进行业务 ...

  2. kafka 同步提交 异步_详解Kafka设计架构核心——Kafka副本机制详解

    所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝.副本机制有什么好处呢? 1. 提供数据冗余.即使系统部分组件失效,系统依然 ...

  3. kafka 同步提交 异步_Kafka 位移提交那些事儿

    最近,在维护公司的一个 Kafka 消息转发器的项目,这个项目主要是为了转发不同部门不同系统的消息队列之间的消息,包括从 RocketMQ 转入到 Kafka, 从 Kafka 转出到 RocketM ...

  4. KAFKA 同步和异步消息的发送(开发实战)

    文章目录 一.消费者监听 1. 启动zk 2. 启动kafka 3. 创建主题 4. 消费者监听消息 二.生产者工程 2.1. 依赖 2.2. 生产者代码(同步) 2.3. 生产者代码(异步) 2.4 ...

  5. setstate是同步还是异步_【vert.x准备篇1】同步和异步,阻塞和非阻塞概念澄清

    为了能更好的理解vert.x的线程模型,我们必须要先明确几个概念:同步(Synchronous)和异步(Asynchronous),阻塞(Blocking)和非阻塞(Non-Blocking).关于这 ...

  6. setstate是同步还是异步_谈谈 IO模型:同步、异步、阻塞、非阻塞

    同步/异步.阻塞/非阻塞 说的是一回事儿吗? 同步/异步.阻塞/非阻塞 你能通俗易懂的讲清楚吗? Java 中的 BIO.NIO.AIO 你了解吗? Socket 编程你还会吗? Linux 操作系统 ...

  7. java 同步和异步_知道什么叫同步和异步吗?

    评论 # re: 知道什么叫同步和异步吗? 2006-11-06 15:34 chicken 你翻译的很垃圾阿 看了英文才懂...  回复  更多评论 # re: 知道什么叫同步和异步吗? 2006- ...

  8. kafka计算机专业读法_面试官:Kafka 为什么快?

    无论 kafka 作为 MQ 也好,作为存储层也罢,无非就是两个功能(好简单的样子),一是 Producer 生产的数据存到 broker,二是 Consumer 从 broker 读取数据.那 Ka ...

  9. kafka安装完整步骤_还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家...

    Kafka集群部署 概述 之前的大数据集群主要是离线处理的方式对集群的数据进行开发处理.当前的集群数据量已经达到了PB级别了,离线数据获取主要是从数仓侧进行全量或者增量的方式导入大数据平台,部分是通过 ...

最新文章

  1. 水痘痊愈就能终生免疫?其实病毒仍潜伏在神经,随时以更可怕的形式爆发......
  2. 数据结构笔记 递推与迭代
  3. 8.STM32中对ADC1_Config()函数(ADC1_GPIO_Config()和ADC1_Mode_Config())的理解(自定义)测试ADC转换电压值输出到终端上。
  4. project5 大数据
  5. synchronized底层是如何实现的?
  6. [uboot 移植]uboot 移植过程
  7. OpenCV 3 image shape - size - dtype
  8. 向量索引算法HNSW和NSG的比较
  9. RX 6600XT vs RTX 2060Super 显卡对比
  10. 深入OpenJDK源码-偏向锁的延时生效如何实现的
  11. 我们选择登月(肯尼迪总统在赖斯大学的演讲)
  12. Servlet规范之安全
  13. Ubuntu调用USB摄像头
  14. 如何预期计算cuda kernel代码的性能水平
  15. 观看《创新的力量》观后感
  16. 转:S3C2440上LCD驱动(FrameBuffer)实例开发详解
  17. pdf如何去除水印文字?这个办法值得一试
  18. 架构设计第五讲:数据巡检系统的设计与应用
  19. 发布四大战略举措,亚马逊云科技看准了中国云市场的哪些新机会?
  20. OC Apple IAP 自动续费订阅 重复订阅问题

热门文章

  1. 在bash脚本中进行浮点运算
  2. 人的价值不在于能力,而在于位置 » 社区 | Ruby China
  3. HDU_2795 Billboard(线段树)
  4. WinCE6.0的极速启动
  5. ASP.NET状态管理
  6. cefSharp通过js操控页面,含跨域操控
  7. jQuery 遍历 - slice() 方法
  8. 在64位windows下使用instsrv.exe和srvany.exe创建windows服务
  9. 网站安全狗”响应内容保护“网页错误返回页面优化功能介绍
  10. html 图片点击查看大图_【神游千年,大美敦煌】北魏-260窟【高清大图】