5.2.2 消费者和消费组元数据

消费者加入组过程发送的“加入组请求”和“同步组请求”,都会指定消费组编号(groupid)和消费者成员编号(l’lel’lberId),同一个消费组编号只对应一个“消费组元数据”(GroupMetadata,下文简称“组元数据”)。服务端使用“消费者成员元数据”(Mel’lberMetadata,下文简称“成员元数据”)表示每个泊费者发送的元数据信息,并添加到对应的“组元数据”中。

注意:协调者处理“加入组请求”和“同步组请求”,不需妥为每种请求都定义一个“成员元数据”。协调者只用了一个统一的“成员元数据”表示“这个消费者在加入组过程中,在服务端保存的相关状态数据”。

  1. 消费者成员元数据

“成员元数据”类的构造函数参数和消费者发送的“加入组请求”数据是一样的,比如都有成员编号、消费组编号、协议元数据集。会话超时时间也是由客户端发送“加入组请求”时指定的,latestHeartbeat变量记录了该消费者最近一次发送心跳的时间。另外,“成员元数据”最重要的一个信息是:当前这个消费者到底分配到了哪些分区(assignl’lent变量),因为消费者加入组的最终目的就是从协调者获取到分区。相关代码如下:

“成员元数据”还定义了两个值对象,它们分别对应服务端在处理请求时定义的两个发送响应回调方法:

这两个值对象使用var定义为“成员元数据”的一个变量,而不是用def定义为一个方法。因为处理“加入组请求”的回调方法参数是Joi叫roupResult,没有返回值,所以awaitingJoinCallback值对象的类型是:JoinGroupResult=>Unit。值对象除了赋值操作和普通的成员变量一样,我们还可以把值对象当作一个方法,传递正确的参数即可。比如下面的代码中要把值对象awaitingJoinCallback当作方法使用,传递的参数必须是JoinGroupResult:

为了方便将“发送响应回调方法”作为消费者“成员元数据”值对象的一个变量,可以把回调方法定义成一个高级的类型。这样看起来,值对象的使用方式其实和普通的类型类似。相关代码如下:

KafkaApis虽然在一开始处理请求时就定义了sendResponseCallback()回调方法,但是因调方法只能通过上面那种“传递参数给调用值对象,把值对象当作方法使用”的方式调用。一旦调用回调方法,就表示服务端会发送响应结果给客户端,说明服务端已经处理完客户端发送的请求。

注意:在调用完值对象的回调方法(倒数第二行)后,妥重直值对象为空(最后一行)。不过,虽然“成员元数据”的值对象为空,但“成员元数据”仍然在“组元数据”中。消费者一旦发送一次“加入组请求”,就会一直被协调者对应的“组元数据”管理(除非这个消费者没有及时发送心跳给协调者而被移除掉)。

  1. 消费组元数据

一个“组元数据”管理了所有消费者的“成员元数据”。消费组协调者对象的addM阴阳rAndRebalance()方法表示“组元数据”中没有该消费者,需要添加消费者的“成员元数据”。updateMel’lberAndRebalance()方法表示“组元数据”中已经有该消费者的元数据,只需做更新。

这两个方法都传递了“组元数据”对象(GroupMetadata),说明必须先有“组元数据”。如果添加“成员元数据”时都还没有“组元数据”(更新时一定存在“组元数据”),就会先创建“组元数据”。创建“组元数据”是必须的,如果没有“组元数据”,即使有“成员元数据”,也是没有意义的。相关代码如下:

注意:添加或更新消费者的成员元数据,都会执行消费纽状态相关的maybePrepareRebalance()方法。

“组元数据”在消费者需要加入或更新时,除了更新对应消费者的“成员元数据”,还会记录一些其他数据。比如,协调者会为消费组选择一个主消费者,来代替它执行分区分配工作。另外,每个消费者发送“加入组请求“时,都会指定一个会话超时时间。协调者会从消费组的所有消费者中,选择一个最大的会话超时时间,作为”再平衡操作的超时时间“。相关代码如下:

注意:notYetRejoinedMef!lberS方法会获取消费纽元数据中awaitingJoinCallback值对象为空的消费者成员。正常来说,协调者刚开始处理消费者的“加入组请求”时,会设直“成员元数据”的awaitingJoinCallback为事先定义的“发送响应回调方法”,这时候该方法不会收集到满足条件的成员。但如果消费者成员已经存在于消费组的元数据中,而且值对象被更新为空,就会被选择出来。

消费组元数据中还有一个很重要的数据:“消费组的当前状态”。因为每个消费者加入消费组都分成“加入组”和“同步组”两个步骤,所以协调者在处理不同消费者的这两种请求时,都需要改变消费组的状态。消费组元数据的状态机有4种状态:“稳定状态”(Stable)、“准备再平衡状态”(preparingRebalance)、“等待同步状态”(AwaitingSync)、“离开状态”(Dead)。协调者新创建一个消费组元数据,这个消费组元数据的初始状态为“稳定状态”。

注意:“稳定状态”分成两种,一种是刚刚创建消费纽元数据,并且消费组元数据中还没有管理任何一个消费者成员。另一种是完成一次分区分配工作后,消费组管理了所有的消费者,这些消费者都分配到了分区,也进入了稳定状态。前者的“稳定”表示没有消费者,也就不会为消费者分配分区。后者的“稳定”表示已经有消费者,并且也都分配到了分区。注意:消费组元数据的状态代表整个消费组级别。


5.2.2 消费者和消费组元数据相关推荐

  1. idea做一个日志自动生成的jar包,并用flume做生产者,采集日志数据,用kafka做消费者来消费日志数据

    前提:先将四台机器的zookeeper和kafka服务开启. 小任务:先准备好日志自动生成的jar包.并将其放入虚拟机master01中. 1.新建一个maven项目,命名为logmaker. pom ...

  2. java每隔 消费队列数据_消费者Rebalance机制

    本文深入的分析了RocketMQ的Rebalance机制,主要包括以下内容:Rebalance必要的元数据信息的维护 Broker协调通知机制: 消费者/启动/运行时/停止时Rebalance触发时机 ...

  3. 消费者洞察:数据影响消费,消费营造数据

    本文根据Stratifyd资深解决方案经理段鑫龙(Bruce Duan)在9月24日的直播演讲内容整理,演讲围绕"如何洞察消费者"从四个层面展开:首先是(疫情期间以及后疫情时代)消 ...

  4. Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后.使用Skywalking+Kafka+ES进行应用监控. 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UA ...

  5. kafka消费者组消费数据问题

    前言 在上一篇的消费者代码中,里面提到了一个很重要的点,那就是在代码中必须要指明消费者组,为什么要这样呢? 这个跟kafka自身在架构设计时是有一定的关系的,通过之前的学习我们知道,kafka天生就是 ...

  6. kafka监听topic消费_Kafka消费者-从Kafka读取数据

    (1)Customer和Customer Group (1)两种常用的消息模型 队列模型(queuing)和发布-订阅模型(publish-subscribe). 队列的处理方式是一组消费者从服务器读 ...

  7. 多线程顺序消费MySQL数据_关于MQ的几件小事(五)如何保证消息按顺序执行

    1.为什么要保证顺序 消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常.举例: 比如通过mysql binlog进行两个数据库的数据 ...

  8. 【kafka】一次磁盘故障后消费者无法消费

    1.概述 转载:Kafka 运维纪实 – 一次磁盘故障后消费者无法消费 Kafka 自从某个版本加入 isr 等机制后真是越来越复杂了,再也不是原来那个单纯的 The log 了,碰上网络有个什么风吹 ...

  9. flume消费kafka数据太慢_kafka补充01

    为什么高吞吐? •写数据 –1.页缓存技术 •kafka写出数据时先将数据写到操作系统的pageCache上,由操作系统自己决定什么时候将数据写到磁盘上 –2.磁盘顺序写 •磁盘顺序写的性能会比随机写 ...

最新文章

  1. 《大话设计模式》--代理模式
  2. 容器删除元素后迭代器失效_STL 4: STL之容器:选择时机,删除元素,迭代器失效...
  3. 算法的优缺点_机器学习算法优缺点 amp; 如何选择
  4. Android深度探索(卷一)第四章读书笔记
  5. 我是新来的,希望大家以后能多指教.
  6. kafka传递文件_是否可以使用Kafka传输文件?
  7. Stp文件在线浏览工具包
  8. 安防监控、智慧交通 视频结构化(车辆+行人)实现方案
  9. 关于JavaScript中万恶的this
  10. es where_阿水出现在eStar二队,ES.JackeyLove正在连接?
  11. 瑞典皇家理工学院计算机学什么,瑞典留学 皇家理工学院的学科设置
  12. 如何修改图片的dpi?图片怎么调dpi?
  13. iOS微信发布8.0.29版本,苹果14用户快来
  14. 25、Java面向对象——抽象类和抽象方法、接口
  15. 《Android源码设计模式》之迭代器模式
  16. 2019春第九周作业
  17. TeleGram都有哪些限制?
  18. js算某天是今年的第几天
  19. 微信删除的聊天记录怎么恢复你还不知道?快快收藏起
  20. 灵活自定义 PDF转换成Word转换器下载

热门文章

  1. 奈飞win10安装包_Windows10系统修复Netflix应用程序错误
  2. 23.Flink-高级特性-新特性-Streaming Flie Sink\介绍\代码演示\Flink-高级特性-新特性-FlinkSQL整合Hive\添加依赖和jar包和配置
  3. 测评:借助6款强大的工具阻止内部攻击
  4. 可能是最详细的字符编码详解
  5. iOS应用中检测第三方app是否安装及跳转解决方案
  6. 单点登录(一)-----理论-----单点登录SSO的介绍和CAS+选型
  7. python 魔法阵
  8. 中国诗歌艺术之人文之魂1,2(四川大学文学与新闻学院王红)
  9. html中colGroup,col
  10. 4.1.3 消费者轮询的流程