文章目录

  • 1.概述
  • 2. Group 状态机
  • 3.offset 那些事
  • 4.Topic __consumer_offsets
  • 5.GroupCoordinator
  • 6.状态转移图
  • 7.Consumer 初始化
  • 8.Consumer poll 过程解析
  • 9.Consumer 初始化时 group 状态变化
  • 10.Consumer Rebalance

1.概述

转载:Kafka 之 Group 状态变化分析及 Rebalance 过程

前段时间看一下 Kafka 的部分源码(0.10.1.0 版),对一些地方做了一些相应的总结。本文主要就 Kafka Group 方面的内容做一下详细的讲述,重点讲述 Consumer Client 如何进行初始化、Server 端对应的 Consumer Group 状态如何进行变化以及对一些 Kafka 的新设计(与旧版不同之处)简单介绍一下。

2. Group 状态机

在 0.9.0.0 之后的 Kafka,出现了几个新变动,一个是在 Server 端增加了 GroupCoordinator 这个角色,另一个较大的变动是将 topic 的 offset 信息由之前存储在 zookeeper 上改为存储到一个特殊的 topic 中(__consumer_offsets)。

3.offset 那些事

在 Kafka 中,无论是写入 topic,还是从 topic 读取数据,都免不了与 offset 打交道,关于 Kafka 的 offset 主要有以下几个概念,如下图。

其中,Last Committed Offset 和 Current Position 是与 Consumer Client 有关,High Watermark 和 Log End Offset 与 Producer Client 数据写入和 replica 之间的数据同步有关。

  • Last Committed Offset:这是 group 最新一次 commit 的 offset,表示这个 group 已经把 Last Committed Offset 之前的数据都消费成功了;

  • Current Position:group 当前消费数据的 offset,也就是说,Last Committed Offset 到 Current Position 之间的数据已经拉取成功,可能正在处理,但是还未 commit;

  • Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset;

  • High Watermark:已经成功备份到其他 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,但是还未成功备份到其他的 replicas 中,这部分数据被认为是不安全的,是不允许 Consumer 消费的(这里说得不是很准确,可以参考:Kafka水位(high watermark)与leader epoch的讨论 这篇文章)。

4.Topic __consumer_offsets

__consumer_offsets 是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 三副本,而具体 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions 来计算(其中,NumPartitions 是__consumer_offsets 的 partition 数,默认是50个)的。

5.GroupCoordinator

根据上面所述,一个具体的 group,是根据其 group 名进行 hash 并计算得到其具对应的 partition 值,该 partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator,GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息。

在 Broker 启动时,每个 Broker 都会启动一个 GroupCoordinator 服务,但只有 __consumer_offsets 的 partition 的 leader 才会直接与 Consumer Client 进行交互,也就是其 group 的 GroupCoordinator,其他的 GroupCoordinator 只是作为备份,一旦作为 leader 的 Broker 挂掉之后及时进行替代。

6.状态转移图

Server 端,Consumer 的 Group 共定义了五个状态

  • Empty:Group 没有任何成员,如果所有的 offsets 都过期的话就会变成 Dead,一般当 Group 新创建时是这个状态,也有可能这个 Group 仅仅用于 offset commits 并没有任何成员(Group has no more members, but lingers until all offsets have - - expired. This state also represents groups which use Kafka only for offset commits and have no members.);
  • PreparingRebalance:Group 正在准备进行 Rebalance(Group is preparing to rebalance);
  • AwaitingSync:Group 正在等待来 group leader 的 assignment(Group is awaiting state assignment from the leader);
  • Stable:稳定的状态(Group is stable);
  • Dead:Group 内已经没有成员,并且它的 Meta 已经被移除(Group has no more members and its metadata is being removed)。

其各个状态的定义及转换都在 GroupMetadata 中定义,根据状态转移的条件和转移的结果做一个状态转移图如下所示


各个状态转化的情况,只有有对应箭头才能进行转移,比如 Empty 到 PreparingRebalance 是可以转移的,而 Dead 到 PreparingRebalance 是不可以的。后面会根据一个 Consumer Client 启动的过程,讲述一下其 Group 状态变化情况。

7.Consumer 初始化

Server 端 Group 状态的变化,其实更多的时候是由 Client 端触发的,一个 group 在最初初始化的过程总其实就是该 Group 第一个 Consumer Client 初始化的过程。

8.Consumer poll 过程解析

对 Consumer 的初始化,正如 Apache Kafka 0.9 Consumer Client 介绍 这篇文章所述,Consumer 的核心逻辑部分主要在其 poll 模型。而其源码的实现上,主要的逻辑实现也是在 pollOnce 方法,如下所示。

//NOTE: 一次 poll 过程
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {coordinator.poll(time.milliseconds());//NOTE: 获取 GroupCoordinator 并连接、加入 Group、Group 进行 rebalance 并获取 assignment// fetch positions if we have partitions we're subscribed to that we// don't know the offset forif (!subscriptions.hasAllFetchPositions())//NOTE: 更新 offsetupdateFetchPositions(this.subscriptions.missingFetchPositions());// if data is available already, return it immediatelyMap<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();//NOTE: 根据最大限制拉取数据(按 partition 拉取,这个 partition 数据拉取完之后,拉取下一个 partition)if (!records.isEmpty())return records;//NOTE: 说明上次 fetch 到是的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据// send any new fetches (won't resend pending fetches)fetcher.sendFetches();//NOTE: 向订阅的所有 partition 发送 fetch 请求,会从多个 partition 拉取数据long now = time.milliseconds();long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);client.poll(pollTimeout, now, new PollCondition() {@Overridepublic boolean shouldBlock() {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasCompletedFetches();}});// after the long poll, we should check whether the group needs to rebalance// prior to returning data so that the group can stabilize fasterif (coordinator.needRejoin())return Collections.emptyMap();return fetcher.fetchedRecords();
}

与 Server 进行交互,尤其初始化 Group 这一部分,主要是在 coordinator.poll() 方法,源码如下

public void poll(long now) {invokeCompletedOffsetCommitCallbacks();//NOTE: 触发回调函数if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {//NOTE: 通过 subscribe() 方法订阅 topic,并且 coordinator 未知ensureCoordinatorReady();//NOTE: 获取 GroupCoordinator 地址,并且建立连接now = time.milliseconds();}if (needRejoin()) {//NOTE: 判断是否需要重新加入 group,如果订阅的 partition 变化或则分配的 partition 变化时,需要 rejoin// due to a race condition between the initial metadata fetch and the initial rebalance,// we need to ensure that the metadata is fresh before joining initially. This ensures// that we have matched the pattern against the cluster's topics at least once before joining.if (subscriptions.hasPatternSubscription())client.ensureFreshMetadata();ensureActiveGroup();//NOTE: 确保 group 是 active;加入 group;分配订阅的 partitionnow = time.milliseconds();}pollHeartbeat(now);//NOTE: 检查心跳线程运行是否正常,如果心跳线程失败,则抛出异常,反之更新 poll 调用的时间maybeAutoCommitOffsetsAsync(now);//NOTE: 自动 commit 时,当定时达到时,进行自动 commit
}

ensureCoordinatorReady() 方法是获取该 group 对应的 GroupCoordinator 地址,并建立连接,然后再进行判断,如果当前的这个 Consumer Client 需要加入一个 group,将进行以下操作(向 Server 端发送 join-group 请求以加入 group,然后再发送 sync-group 请求,获取 client 的 assignment)

//NOTE: 确保 Group 是 active,并且加入该 group
public void ensureActiveGroup() {// always ensure that the coordinator is ready because we may have been disconnected// when sending heartbeats and does not necessarily require us to rejoin the group.ensureCoordinatorReady();//NOTE: 确保 GroupCoordinator 已经连接startHeartbeatThreadIfNeeded();//NOTE: 启动心跳发送线程(并不一定发送心跳,满足条件后才会发送心跳)joinGroupIfNeeded();//NOTE: 发送 JoinGroup 请求,并对返回的信息进行处理,还包括了发送 sync-group 请求并进行相应处理
}

9.Consumer 初始化时 group 状态变化

这里详述一下 Client 进行以上操作时,Server 端 Group 状态的变化情况。当 Consumer Client 首次进行拉取数据,如果该其所属 Group 并不存在时,Group 的状态变化过程如下:

  1. Consumer Client 发送 join-group 请求,如果 Group 不存在,创建该 Group,Group 的状态为 Empty;

  2. 由于 Group 的 member 为空,将该 member 加入到 Group 中,并将当前 member (client)设置为 Group 的 leader,进行 rebalance 操作,Group 的状态变为 preparingRebalance,等待 rebalance.timeout.ms 之后(为了等待其他 member 重新发送 join-group,如果 Group 的状态变为 preparingRebalance,Consumer Client 在进行 poll 操作时,needRejoin() 方法结果就会返回 true,也就意味着当前 Consumer Client 需要重新加入 Group),Group 的 member 更新已经完成,此时 Group 的状态变为 AwaitingSync,并向 Group 的所有 member 返回 join-group 响应;

  3. client 在收到 join-group 结果之后,如果发现自己的角色是 Group 的 leader,就进行 assignment,该 leader 将 assignment 的结果通过 sync-group 请求发送给 GroupCoordinator,而 follower 也会向 GroupCoordinator 发送一个 sync-group 请求(只不过对应的字段为空);

  4. 当 GroupCoordinator 收到这个 Group leader 的请求之后,获取 assignment 的结果,将各个 member 对应的 assignment 发送给各个 member,而如果该 Client 是 follower 的话就不做任何处理,此时 group 的状态变为 Stable(也就是说,只有当收到的 Leader 的请求之后,才会向所有 member 返回 sync-group 的结果,这个是只发送一次的,由 leader 请求来触发)。

10.Consumer Rebalance

根据上图,当 group 在 Empty、AwaitSync 或 Stable 状态时,group 可能会进行 rebalance;
rebalance 的过程就是:等待所有 member 发送 join-group(上述过程的第2步),然后设置 Group 的 leader,进行 reassignment,各个 client 发送 sync-group 来同步 server 的 assignment 结果。

【kafka】Kafka 之 Group 状态变化分析及 Rebalance 过程相关推荐

  1. kafka原理以及源码分析

    1.什么是kafka以及kafka的基础架构 Kafka是一个高吞吐的分布式的消息系统,是基于发布/订阅模式的消息队列. 2. 术语 Producer:消息生产者,就是向 Kafka broker 发 ...

  2. 聊聊 Kafka:如何避免消费组的 Rebalance

    一.前言 我们上一篇聊了 Rebalance 机制,相信你对消费组的重平衡有个整体的认识.这里再简单回顾一下,Rebalance 就是让一个 Consumer Group 下所有的 Consumer ...

  3. ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台(elk5.2+filebeat2.11)

    ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台 参考:http://www.tuicool.com/articles/R77fieA 我在做ELK日志平台开始之初选择为 ...

  4. Kafka与RocketMQ的对比分析

    本文来说下Kafka与RocketMQ的对比分析 文章目录 概述 概述

  5. Kafka分区与group

    Kafka 分区与group 1.原理图 2.原理描述 一个topic 可以配置几个partition,produce发送的消息分发到不同的partition中,consumer接受数据的时候是按照g ...

  6. [Big Data - Kafka] Kafka设计解析(四):Kafka Consumer解析

    High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...

  7. 消费者组 Consumer Group 和 重平衡 Rebalance

    kafka设计了consumer group: 具有可扩展性和容错性的consumer机制,consumer group有3个特性: 1. Consumer Group 下可以有一个或多个 Consu ...

  8. [kafka]kafka术语白话

    本文致力于写一篇非技术人员也能看懂的kafka术语介绍. kafka是一个消息引擎系统,它可以传递消息,也具有转换消息的能力,对于今天的主题来说,我们关注传递消息的能力就够了,因为我们一般接触到的术语 ...

  9. SAP WM 2-Step Picking流程里创建的Group的分析

    SAP WM 2-Step Picking流程里创建的Group的分析 SAP WM模块的2-Step Picking流程里,需要根据实际业务情况,首先为外向交货单(Outbound Delivery ...

最新文章

  1. cookie的简单学习
  2. wxWidgets:wxFloatingPointValidator<T> 类模板用法
  3. 一个罐子统治一切:Apache TomEE + Shrinkwrap == JavaEE引导
  4. 非酋用计算机弹唱,非酋简谱 薛明媛/朱贺 听说爱情就是这样子
  5. 阿里P8大神十年珍藏,Java技术电子书绝佳推荐,每一本都要吃透
  6. 基于熵权法评估某高校各班级整体情况(公式详解+简单工具介绍)
  7. IBM Cloud 2015 - CDN
  8. docker MySQL 双主_DockerMysql数据库实现双主同步配置详细·TesterHome
  9. iOS H5原生WKWebView调起支付宝客户端支付方案
  10. ERROR 1366 (HY000): Incorrect string value: '\xCA\xD6\xBB\xFA\xCA\xFD...' for column 'cname' at row
  11. 数据库中数据的独立性解释
  12. C#1309. 解码字母到整数映射
  13. SPSS Modeler与Google地图的完美结合
  14. 国家计算机与软件资格考试因试卷丢失延考
  15. qrect在图片上显示矩形框_2019年6月百度大脑产品上新技术升级盘点内容
  16. mysql主从配置详细教程
  17. Nginx之TCP端口转发
  18. Mysql逻辑模块组成
  19. JAVA ik es_安装elasticsearch及中文IK和近义词配置
  20. Linux上搭建java环境(Tomact mysql jdk)

热门文章

  1. 转转集团Q4手机行情:二手市场iPhone交易量今年以来首次“反弹”
  2. 一加9RT外观和部分参数揭晓:搭载骁龙888+E4直屏
  3. 《原神》月入16亿,米哈游为何仍然被嫌弃?
  4. 诺基亚赢得运营商Orange比利时5G合同,华为回应...
  5. 中兴通讯:将在全球范围内发布近10款5G手机
  6. 发黄图再截图举报!这个社交软件运营合伙人被逮捕:“设局”恶意举报同行...
  7. 让携号转网不再难!但你得了解这几大限制
  8. 在游戏设备上砸钱 其实小姐姐们更疯狂!
  9. 7月26日见!华为Mate 20 X 5G正式官宣:国内首款5G双模手机
  10. 任达华遇袭是效仿“宏颜获水”事件?百度回应:严惩肇事者 以儆效尤