Coordinator与消费者

在数据库设计过程中,我们经常会有这样的情况下

  1. 某个基础表会被多个视图或者存储过程引用
  2. 修改基础表的时候,我们必须小心翼翼地,因为不会有任何提示告诉我们,如果继续修改,会不会造成视图或者存储过程有问题
  3. 即便我们知道有问题,我们也没有办法去让视图和存储过程刷新得到表最新的信息
  • 如果你在创建视图中使用了select *,就会导致各种各样的bug

  • "协调者"有些陌生,所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责Group Rebalance 以及提供位移管理组成员管理等。

  • Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

Coordinator

  • 所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢
  • 不同的group id会被哈希到不同的分区上,从而不同的broker能充当不同group的Coordinator

Coordinator的确定与分区分配

前面我们说到一个问题,那就是一个group内部,1个parition只能被1个consumer消费,其实看到这里我们就知道应该有这样一个组件来负责partition的分配,而且前面学习消费者组机制的时候还提到过分区的三种分配策略。

对于每一个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator。也就是说1个consumer group对应一个coordinattor

下面我们有一个group有3个consumer: c0, c1, c2

GroupCoordinatorRequest 请求
  • 第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
  • 第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

其实这个过程是发送了一个GroupCoordinatorRequest定位请求去寻找coordinator

首先,Kafka 会计算该 Group 的 group.id 参数的哈希值。比如你有个 Group 的 group.id 设置成了“test-group”,那么它的 hashCode 值就应该是 627841412。其次,Kafka 会计算 __consumer_offsets 的分区数,通常是 50 个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即 abs(627841412 % 50) = 12。
此时,我们就知道了位移主题的分区 12 负责保存这个 Group 的数据。有了分区号,算法的第 2 步就变得很简单了,我们只需要找出位移主题分区 12 的 Leader 副本在哪个 Broker 上就可以了。
这个 Broker,就是我们要找的 Coordinator
JoinGroup

所有consumer都往coordinator发送JoinGroup消息之后,coordinator会指定其中一个consumer作为leader,并把组成员信息以及订阅信息发给leader

其他consumer作为follower,然后由这个leader进行partition分配

SyncGroup

leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition

一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给Coordinator,Coordinator给它返回null

follower发送 null的 SyncGroupRequest 给Coordinator,Coordinator回给它partition分配的结果。

确定的意义

  • Consumer 应用程序,特别是 Java Consumer API,能够自动发现并连接正确的 Coordinator,我们不用操心这个问题。知晓这个算法的最大意义在于,它能够帮助我们解决定位问题。当 Consumer Group 出现问题,需要快速排查 Broker 端日志时,我们能够根据这个算法准确定位 Coordinator 对应的 Broker,不必一台 Broker 一台 Broker 地盲查。

  • 例如提交offset失败,可能是由于Coordinator 所咋 broker 所在节点出了问题,这个时候我们就可以根据这个算法快速找到 Coordinator所在,然后查看日志。

  • 或者是broker rebalance 太频繁,去找Coordinator所在的broker日志,会有类似于"(Re)join group" 之类的日志

源码解析Coordinator

下面这张图就是整个kafka 的源码结构,前面我们已经分析过clients

clients 目录:保存 Kafka 客户端代码,比如生产者和消费者的代码都在该目录下。

config 目录:保存 Kafka 的配置文件,其中比较重要的配置文件是 server.properties。

connect 目录:保存 Connect 组件的源代码。我在开篇词里提到过,Kafka Connect 组件是用来实现 Kafka 与外部系统之间的实时数据传输的。

core 目录:保存 Broker 端代码。Kafka 服务器端代码全部保存在该目录下。

streams 目录:保存 Streams 组件的源代码。Kafka Streams 是实现 Kafka 实时流处理的组件。

下面我们看一下core目录下的代码,这里我们几乎看到kafka 服务端的几乎全部代码,都在这里了,我们比较熟悉的的有controller,我们主要看一下今天的主角coordinator

整个coordinator下面按照功能分为了两大块,第一块就是group 也就是为我们的消费者组服务的,第二块就是transaction,主要是为分布式事务服务的。

我们看到这个group包下面总共也没几个类。

下面我们看一下整个请求的流程,我们前面学习幂等性生产的时候说过了,客户端发起的请求,在服务端进行处理的入口是在KafkaApis这个类里面的,客户端发起不同的请求,KafkaApis里面就有不同的方法来处理对应的请求。

我们今天要了解的就是上面这个几个方法。

handleFindCoordinatorRequest

代码这里有一点需要注意的是,这里判断了这个请求是处理事务的还是消费者组的

后面就是和我们前面介绍的就一样了(groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME 方法里面计算出了partition信息,其实就是那个partition,这个方法的实现的话,大致如下

Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

找到了partition之后就开始找这个partition的lead 分区了,也就是我们下面图上的第二段代码。这样我们的Coordinator节点就找到了

handleJoinGroupRequest

下面这个代码其实就比较简单了,其实就是一个consumer 注册的过程,调用了groupCoordinator.handleJoinGroup() 的方法,将我们的consumer加入到我们的消费组里面去了

同时将确定了consumer group 的leader consumer,然后返回给了客户端,也就是我们的sendResponseCallback 方法。

handleSyncGroupRequest

下面就是comsumer 同步分区的分配信息,最后返回给客户端的assignmentMap,就是分配的结果。

handleOffsetCommitRequest

这个就是处理consumer 客户端提交offset 的方法了,这个代码有点长,我这里就帖几处比较重要的地方了

这里就是获取我们提交的offset 信息了

根据我们的请求,来判断offset 存在那里,老版本是存储在ZK里

最后是调用GroupCoordinator的handleCommitOffsets方法进行offset 的提交

总结

  1. Coordinator 在整个kafka 的消费者体系中的作用,负责Group Rebalance 以及提供位移管理组成员管理
  2. Coordinator 是如何进行和客户端进行交互完成相应的职能

kafka系列之Coordinator(14)相关推荐

  1. Kafka系列之-Kafka Protocol实例分析

    本文基于A Guide To The Kafka Protocol文档,以及Spark Streaming中实现的org.apache.spark.streaming.kafka.KafkaClust ...

  2. Kafka系列之:不重启kafka集群设置kafka topic数据保留时间

    Kafka系列之:不重启kafka集群设置kafka topic数据保留时间 一.kafka topic数据保留3天的bash命令 二.查看kafka删除数据日志 三.批量设置上千个topic保留3天 ...

  3. Kafka系列一之架构介绍和安装

    Kafka架构介绍和安装 写在前面 还是那句话,当你学习一个新的东西之前,你总得知道这个东西是什么?这个东西可以用来做什么?然后你才会去学习它,使用它.简单来说,kafka既是一个消息队列,如今,它也 ...

  4. Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用

    KafkaOffsetMonitor是一个可以用于监控Kafka的Topic及Consumer消费状况的工具,其配置和使用特别的方便.源项目Github地址为:https://github.com/q ...

  5. Kafka系列 —— 生产实践分享

    Kafka系列文章: Kafka系列 -- 入门及应用场景 & 部署 & 简单测试 Kafka系列 -- Kafka核心概念 Kafka系列 -- Kafka常用命令 Kafka系列 ...

  6. Kafka系列之:增加Kafka节点扩展Kafka集群

    Kafka系列之:增加Kafka节点扩展Kafka集群 一.增加Kafka节点 二.分区重新分配工具三种工作模式 三.自动将数据迁移到新机器 四.自定义分区分配和迁移 五.增加复制因子 六.在数据迁移 ...

  7. Kafka系列之:深入理解Kafka 主题、分区、副本、LEO、ISR、HW、Kafka的主写主读和分区leader选举

    Kafka系列之:深入理解Kafka 主题.分区.副本.LEO.ISR.HW.Kafka的主写主读和分区leader选举 一.Kafka重要知识点提炼 二.详细介绍Kafka 主题.分区.副本.LEO ...

  8. Kafka系列之:kafka命令详细总结

    Kafka系列之:kafka命令详细总结 一.添加和删​​除topic 二.修改topic 三.平衡领导者 四.检查消费者位置 五.管理消费者群体 一.添加和删​​除topic bin/kafka-t ...

  9. Kafka系列 —— Kafka监控

    Kafka系列文章: Kafka系列 -- 入门及应用场景 & 部署 & 简单测试 Kafka系列 -- Kafka核心概念 Kafka系列 -- Kafka常用命令 常见Kafka监 ...

最新文章

  1. 快速搞懂监控、链路追踪、日志三者的区别
  2. 分布式文件系统研究-fastDSF文件上传和下载流程
  3. 使用支持向量机训练mnist数据
  4. mysql5.6热升级_Mysql5.6主从热备配置
  5. 寒冬已至?四面楚歌的 Android 工程师该何去何从?
  6. 让计算机桌面更加美丽课件,让计算机桌面更加美丽_1.doc
  7. asp文件上传原理及分析
  8. python编程器手机版ios_手机最强Python编程神器,在手机上运行Python
  9. 朋友圈集赞神器 | 1秒集齐300个赞,从此点赞不求人
  10. pbs 写matlab作业,pbs提交作业.pdf
  11. 基于4G路由器的救护车联网方案:生命,刻不容缓
  12. 教你一招让你高效搞定高品质的H5交互动画
  13. python小游戏课程设计报告_贪吃蛇游戏课程设计报告
  14. IOT(4)---手机中的传感器
  15. matplotlib高级教程之形状与路径——patches和path
  16. 声音/声学成像2021-4-13
  17. 2020微信最新版可以修改ID号了,你的号码还那么尬么?
  18. 百度API的基本介绍和使用场景
  19. MySql查询——Select
  20. [oracle]7788

热门文章

  1. 我们在使用领英时有必要用领英精灵吗?
  2. R语言高比例送转策略
  3. 推荐3本Python高分书籍,居家旅行必备神器!
  4. 冷冻电镜聚类中心(2D Class)粒子图像的解析
  5. 繁體與簡體之間的轉化
  6. python爬取微博热搜
  7. 三维计算机动画,三维计算机动画的设计
  8. 2014年网研上机题目
  9. 知识图谱(二):图数据库neo4j的Linux安装与基本使用
  10. DBUS入门与C编程