5.1.3 主消费者执行分配任务

消费者发送的“加入组请求”(Jo1nGroupRequest)的内容包括:消费组编号、消费者成员编号、协议类型、协议内容和元数据(protocolMetadata)。其中,协议内容是分区分配算法的名称,元数据是消费者订阅的主题列表。“加入组响应”对象的内容包括:消费者成员编号、统一的消费组协议、主消费者编号、协调者执行分区分配工作的次数、消费者成员列表。

  • 客户端发送的协议与服务端返回的“消费组协议”(groupProtocol)。虽然“加入组请求”中的“协议名称”包括了系统支持的所有协议类型(范围分配和循环分配),但且正执行具体的分区分配时只允许一种协议。协调者会负责统一所有消费者的协议,选择一个大家都支持认可的协议作为“消费组协议”。协调者发送“加入组响应”给每个消费者的“消费组”协议都是一样的,虽然只有主消费者会使用这个协议来做实际的分配t作。
  • 消费者成员编号(mel’\berId)。消费者发送的“加入组请求”需要指定消费者成员编号,当消费者初次加入消费组时这个编号是UNKNOWN_MEMBER。协调者处理每个消费者发送的“加入组请求”,会为每个消费者指定唯一的消费者成员编号,并包含在“加入组响应”中运回给消费者。后续消费者需要重新加入消费组时,发送“加入组请求”巾的消费者成员编号,就是协调者之前分配给它的编号。
  • 主消费者编号(leaderId)。协调者选择的主消费者编号,如果消费者的成员编号和主消费者编号相等,那么这个消费者就是主消费者。
  • 所有消费者成员信息C111e111bers)。协调者会将它收集到的所有消费者信息,都发送给主消费者。注意:111e111ber5不仅包括所有的消费者成员编号,还包括每个消费者订阅的主题。必须要包含订阅信息,否则只有消费者成员编号,不知道订阅了哪些主题,主消费者还是无法执行分区分配工作。
  • 纪元编号(generat-Lon)。只在每个消费者每次需要重新加入组时,才会在协调者端进行更新,它表示协调者从启动至今一共发生了多少次分区分配的工作。每次消费组发生再平衡操作时,协调者都会发起一次分区分配的工作。虽然分区分配工作是由主消费者执行的,但主消费者有可能变化,所以要由服务端的协调者来记录这个编号。

普通消费者收到“加入组响应”会调用onJoinFollower()方法,立即发送“同步组请求”给协调者,并给返回的“同步组”异步请求链接上“加入组”的异步请求。当消费者收到“同步组响应”后,会完成“同步组”的异步请求,再完成“加入组”的异步请求,这样普通消费者就可以从“加入组”
的异步请求结果中获取分配给它的分区。主消费者在收到“加入组响应”时会调用。『1Joinleader()方法,也会发送“同步组请求”给协调者。它也会给返回的“同步组”异步请求链接上“加入组”异步请求,后续流程和普通也费者类似,分别是:收到“同步组响应”、完成“同步组”异步请求、完成“加入组”异步请求、获取“加入组”异步请求结果。

消费者发送“同步组请求”(SyncGroupRequest)的内容包括:消费组编号、纪元编号、消费者成员编号、消费组的分配结果。其中前3个信息都在协调者返回给消费者的“加入组响应”结果中,“消费组的分配结果”只有主消费者会传递。主消费者在收到“加入组响应”后,并不会立即发送“同步组请求”给协调者,而是要等到执行分区分配的工作完成后才发送“同步组请求”。主消费者发送的“同步组请求”带有“消费组的分配结果”(groupAssign111ent),普通消费者发送的“同步组请求”没有分配结果,因为它并没有执行分区分配工作。

  1. 执行任务获取消费组的分配结果

主消费者调用perfor111Assign111ent()方法执行分区分配工作。其中,members是所有消费者的订阅信息,它的键是每个消费者的成员编号,值是消费者的订阅信息。这个数据是由协调者在收集完所有消费者发送的订阅信息后,作为“加入组响应”传给主消费者的。相关代码如下:

主消费者在执行分区分配工作时,根据每个消费者发送的订阅信息,会通过分区分配器的assign()方法计算出每个消费者分配的分区结果。如表5-1所示,这个过程产生了一些字典数据结构,它们的键都是字符串类型的消费者成员编号,值的类型不同,表示的含义也不同。后两个的值都表示每个消费者的分区分配结果,不同点是前者是一个对象,后者是序列化的字节数组。

  1. 抽象分区分配器类

“抽象分区分配器类”(AbstractPartitionAssignor)实现了“分区分配器接口”(PartitionAssignor)自"Jassign()分区分配方法,但它又定义了一个参数类型不同的assign()抽象方法。这样具体的“分区分配实现类”不需要实现“分区分配器接口”,只需继承并实现抽象类的assign()方法。这个方法的两个参数的含义如下,它和“分区分配器接口”的两个参数类似,但做了一点处理。比如,不使用“集群元数据”,而是从集群元数据获取主题对应的分区数;不使用“订阅信息对象”,而是使用订阅信息的订阅主题列表。

  • partitionsPerTopic。每个主题有多少个分区。从传入分区分配器的集群元数据获取数据。
  • subscriptions。每个消费者的订阅主题。和分区分配器的subscriptions类似,但把订阅信息对象(Subscription)转为字符串列表,实际上订阅信息就是由主题列表组成的。

相关代码如下:

注意:具体分区分配器的实现类和1日API的算法类似(详见322节),这里不再分析。图5-4总结了消费者发送“加入组请求”给协调者,到获取到分区列表的过程,具体步骤如下。

(1)消费者发送“加入组请求”,得到一个“加入组”的异步请求。
(2)消费者获得“加入组响应”结果,表示协调者已经收集到所有发送了“加入组请求”的消费者。
(3)主消费者会执行分区分配任务,返回结果是消费组中所有消费者及其对应的分区列表。
(4)每个消费者都会发送“同步组请求”,得到一个“同步组”的异步请求。
(5)每个消费者获得“同步组响应”结果,表示分配给当前消费者的分区列表。
(6)完成“同步组”的异步请求,并通过模式完成“加入组”的异步请求。
(7)消费者获取“加入组”异步请求的结果,这个数据表示的就是分配给消费者的分区。

5.1.3 主消费者执行分配任务相关推荐

  1. C语言main()主函数执行完毕后是否会再执行一段代码

    C语言main()主函数执行完毕后是否会再执行一段代码 分享到: QQ空间 新浪微博 腾讯微博 豆瓣 人人网 main() 主函数执行完毕后,是否可能会再执行一段代码?给出说明. main主函数是所有 ...

  2. 【kafka】Kafka消费者分区分配策略详解

    文章目录 1.概述 2.RoundRobinAssignor详解 3.RangeAssignor详解 4.StickyAssignor详解 5.CooperativeStickyAssignor详解 ...

  3. 10. kafka消费者如何分配分区

    消费者如何分配分区就是指某个topic,其N个分区和消费该topic的若干消费者群组下M个消费者的关系.如下图所示,C0和C1两个消费者如何分配N个分区: 核心接口:org.apache.kafka. ...

  4. 关于调用子函数给主函数指针分配内存

    典型的错误例子如下 在这个主函数的指针给子函数传递一个指针,而在子函数中形参有开辟了一块内存,此子函数的指针的内存里存储的地址与主函数是同一地址, 即主函数的指 针和子函数形参的指针都指向同一块内存的 ...

  5. RabbitMQ多消费者消息分配

    一. 轮询分配   当有多个消费者同时监听一个队列时,RabbitMQ默认将消息逐一顺序分配给各消费者,该消息分配机制称为轮询(Round-Robin).   为验证该机制,建立两个消费者,同时监听同 ...

  6. Kafka消费者组内各消费者分区分配

    1.两个基本概念介绍 1.1.GroupCoordinator 1.1.ConsumerCoordinator 2.分区分配的几个阶段 2.1.FIND_COORDINATOR(阶段1) 2.2.JO ...

  7. disruptor架构四 多生产者多消费者执行

    1.首先介绍下那个时候使用RingBuffer,那个时候使用disruptor ringBuffer比较适合场景比较简单的业务,disruptor比较适合场景较为复杂的业务,很多复杂的结果必须使用di ...

  8. 5.1.2 消费者的加入组和同步组

    5.1.2 消费者的加入组和同步组 消费者向协调者发送"加入组请求"获取分区和现实生活中的任务分配很相似.为了帮助理解分区分配的过程,我们以软件开发常见的任务分士为例如图5-1所示 ...

  9. 5.4.7 延迟的心跳

    5.4.7 延迟的心跳 延迟操作有3个主要的方法:尝试完成方法(返回布尔值,表示是有可以完成).超时的回调方法.完成的回调方法.对于"延迟加入",尝试完成是判断消费组成员中是否还有 ...

最新文章

  1. joomla3.6.5 nginx下 前台页面404错误
  2. Android 开发学习随笔
  3. SignalR集成Autofac
  4. 描述一下JAVA的加载过程_JVM源码分析之Java类的加载过程
  5. 燕山大学计算机学院官网,燕山大学信息科学与工程学院(专业学位)计算机技术保研夏令营...
  6. python多个判断条件体重_python基础之如何用if语句判断多个条件?
  7. python生成安装程序_python生成安装文件 msi
  8. python爬虫案例——根据网址爬取中文网站,获取标题、子连接、子连接数目、连接描述、中文分词列表
  9. 如何读取H264文件获得每一帧的数据(VsParserPro)
  10. 步进电机驱动C语言代码,单片机控制步进电机系统(C语言源代码)
  11. 上粱正,下粱不歪——网吧母盘制作流程(转)
  12. 2022年中国工业机器人市场现状研究分析与发展前景预测报告
  13. PS CC 2014破解版
  14. ARM架构——转自维基百科
  15. 遗传算法--函数最值问题
  16. 7-6 王牌特工3 (15 分)
  17. unity 多个物体围绕一个点生成圆形状
  18. javascript里将函数名字符串转为函数并执行
  19. InChat一版,仅仅两个接口实现自己的IM系统(可兼容)
  20. 诸葛io的技术架构图_大数据浪潮下,诸葛io平台的技术演化之路

热门文章

  1. 血战上海滩寻找英雄血量地址 实现无敌效果
  2. 灰度值 与 RGB值 及其转换
  3. 产品经理三大领域的技术
  4. 西安地图 百度西安高清卫星地图 最高19级 可商用地图
  5. 我的世界1.12.2java下载_我的世界java版整合包
  6. odoo12企业版修改邮箱配置
  7. 目标检测—RCNN系列
  8. docker部署codereview/gerrit
  9. 解析 Java 类和对象的初始化过程
  10. Word的常用操作和快捷键