一 核心字段

brokerId: Int 该GroupCoordinator所在的节点

groupConfig: GroupConfig 记录了消费者组中consumer session过期的最小时长和最大时长

offsetConfig: OffsetConfig 记录哦OffsetMetadata相关配置,比如metatdata字段允许的最大长度,Offset Topic每一个分区的副本数

groupManager: GroupMetadataManager 管理消费者组元数据和对应的offset信息的组件

heartbeatPurgatory:DelayedOperationPurgatory[DelayedHeartbeat] 用于管理DelayedHeartbeat延迟操作

joinPurgatory:DelayedOperationPurgatory[DelayedJoin] 用于管理DelayedJoin延迟操作

二 GroupState分析

GroupState接口用于表示消费者组的状态,四个子类分别表示这不同的状态,这个状态只是在服务器端使用,并不是客户端消费者的状态

各个状态之间的转换图:

各个状态的转化时机:

# PreparingRebalance状态

当消费者处于PreparingRebalance状态,GroupCoordinator可以正常处理OffsetFetchRequest,ListGroupRequest,OffsetCommitRequest请求;

但是对于HeartbeatRequest和SyncGroupRequest,则会在其响应里携带REBALANCE_IN_PROGRESS错误码进行标识;当收到JoinGroup

Request的时候会先创建对应的DelayedJoin,等待满足条件后对其响应。

PreparingRebalance -> AwaitingSync: 当有DelayedJoin超时或是消费者组之前的成员(消费者)都已经重新申请加入时进行切换

PreparingRebalance -> Empty: 当所有消费者都离开消费者组时候切换

PreparingRebalance -> Dead:分区迁移的时候删除消费者组

# AwaitingSync状态

表示消费者组正在等待Group Leader的SyncGroupRequest请求时,当GroupCoordinator收到OffsetCommitRequest和HeartbeatRequest时候,会在响应中添加REBALANCE_IN_PROGRESS错误码进行标识,对于来自follower的SyncGroupRequest则直接抛弃,直到收到Group Leader的SyncGroupRequest

AwaitingSync -> Stable:  当GroupCoordinator收到Group Leader发来的SyncGroupRequest时进行切换

AwaitingSync -> PreparingRebalance: 有消费者加入或者退出消费者组;消费者组中有消费者心跳超时;已知成员更新元数据

AwaitingSync -> Dead: 分区迁移的时候删除消费者组

# Stable状态

该状态下,GroupCoordinator可以处理所有的请求,例如:Offset

FetchRequest,HeartbeatRequest,OffsetCommitRequest,来自follower的JoinGroupRequestd等等

Stable -> PreparingRebalance:消费者组有消费者心跳检测超时;有消费者主动退出;当前Group Leader发送JoinGroupRequest;有新的消费者请求加入消费者组

Stable -> Dead: 分区迁移的时候删除消费者组

Dead状态:

处于此状态的消费者组中没有消费者,其对应的GroupMetadata也将被删除,除了OffsetCommitRequest其他请求响应会携带UNKNOWN_MEMBER_ID.

Empty状态:

消费者组中没有消费者了,但是不会被删除,直到所有offset都已经到期;这个状态还表示消费者组只用于offset提交

Empty -> Dead: 最后的offset被删除;组因到期被删除;组因分区迁移被删除

Empty -> PreparingRebalance:新的成员加入,发送JoinGroupRequest

三 JoinGroupRequest分析

首先:会调用KafkaApis的handleJoinGroupRequest方法

def handleJoinGroupRequest(request: RequestChannel.Request) {
  import JavaConversions._
  // 请求转换成JoinGroupRequest
 
val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
  val responseHeader = new ResponseHeader(request.header.correlationId)

// 回调函数的定义
 
def sendResponseCallback(joinResult: JoinGroupResult) {
    val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
    val responseBody = new JoinGroupResponse(request.header.apiVersion, joinResult.errorCode, joinResult.generationId,
      joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members)

trace("Sending join group response %s for correlation id %d to client%s."
      .format(responseBody, request.header.correlationId, request.header.clientId))
    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  }

if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
    val responseBody = new JoinGroupResponse(
      request.header.apiVersion,
      Errors.GROUP_AUTHORIZATION_FAILED.code,
      JoinGroupResponse.UNKNOWN_GENERATION_ID,
      JoinGroupResponse.UNKNOWN_PROTOCOL,
      JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
     
JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
     
Map.empty[String, ByteBuffer])
    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  } else {
    // 调用GroupCoordinator#handleJoinGroup方法处理
   
val protocols = joinGroupRequest.groupProtocols().map(protocol =>
      (protocol.name, Utils.toArray(protocol.metadata))).toList
    coordinator.handleJoinGroup(
      joinGroupRequest.groupId,
      joinGroupRequest.memberId,
      request.header.clientId,
      request.session.clientAddress.toString,
      joinGroupRequest.rebalanceTimeout,
      joinGroupRequest.sessionTimeout,
      joinGroupRequest.protocolType,
      protocols,
      sendResponseCallback)
  }
}

def handleJoinGroup(groupId: String, memberId: String, clientId: String, clientHost: String, rebalanceTimeoutMs: Int,sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback) {// 首先进行一系列的检测if (!isActive.get) {responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))} else if (!validGroupId(groupId)) { // 检测group idresponseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code))} else if (!isCoordinatorForGroup(groupId)) {// 检测GroupCoordinator是否管理此responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))} else if (isCoordinatorLoadingInProgress(groupId)) {// GroupCoordinator是否已经加载消费者组对应的Offsets Topic分区responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code))} else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {// 检测消费者的超时时长是否合法responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))} else {// 根据groupId获取GroupMetadata数据groupManager.getGroup(groupId) match {case None =>// 判断memberId是否为空,如果不为空,返回错误if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))} else {// 根据groupId闯将GroupMetadata,并缓存起来val group = groupManager.addGroup(new GroupMetadata(groupId))doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)}case Some(group) =>doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)}}
}

doJoinGroup方法主要会做以下2方面的检测:第一检测memberId,

JoinGroupRequest可能是来自消费者组中的已知的member,此时请求就会携带之前分配过的memberId,这里就需要检测memberId是否能够GroupMetadata识别;第二检测Member支持的PartitionAssignor,这里需要检测每一个消费者的支持的PartitionAssignor集合与GroupMetadata中候选PartitionAssignor集合(candidateProtocal字段)是否有交集,只有这样才能够选择出所有消费者都支持的PartitionAssignor。

之后就会按照消费者组的状态分类处理:

# Dead

直接返回UNKNOWN_MEMBER_ID错误码

# PreparingRebalance

如果是已知的member重新申请加入,则更新GroupMetadata中记录的member信息;如果是未知的新member申请加入,则创建member并分配memberId,并加入GroupMetadata中

# AawaitingSync

如果是未知的新的member申请加入,则创建member并分配memberId,并加入到GroupMetadata。然后调用maybePrepareRebalance操作将状态切换为PreparingRebalance

如果是已知的member重新申请加入,则要区分member支持的PartitionAssignor是否发生了变化:若未发生变化,则将当前member集合信息返回给Group Leader;若是发生了变化则更新member信息,并且调用maybePrepareRebalance将状态切换为PreparingRebalance

# Stable

如果是未知新的member加入,创建member,并且分配memberId,加入到GroupMetatdata中,然后调用maybePrepareRebalance方法将状态切换为PreparingRebalance

如果是已知member重新申请加入,则要区分member所支持的PartitionAssignor是否发生了变化:若未发生变化则将当前的GroupMetadata的当前状态返回,然后消费者会发送SyncGroupRequest继续后面的操作;如果发生了变化或者Group Leader发送JoinGroupRequest,则更新member信息,并调用方法maybePrepareRebalance将状态切换为PreparingRebalance.

private def doJoinGroup(group: GroupMetadata, memberId: String, clientId: String, clientHost: String, rebalanceTimeoutMs: Int,sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback) {group synchronized {// 检测消费者支持的partition assignorif (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {// 如果不支持,则返回错误responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {// 检测memberId是否能够被识别responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))} else {group.currentState match {case Dead =>// 直接返回UNKNOWN_MEMBER_ID错误码responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))case PreparingRebalance =>// 如果是未知的新member申请加入,则创建member并分配memberId,并加入GroupMetadata中if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)} else {// 如果是已知的member重新申请加入,则更新GroupMetadata中记录的member信息val member = group.get(memberId)updateMemberAndRebalance(group, member, protocols, responseCallback)}case AwaitingSync =>// 如果是未知的新的member申请加入,则创建member并分配memberId,并加入到GroupMetadata。// 然后调用maybePrepareRebalance操作将状态切换为PreparingRebalanceif (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)} else {// 如果是已知的member重新申请加入,则要区分member支持的PartitionAssignor是否发生了变化val member = group.get(memberId)// 若未发生变化,则将当前member集合信息返回给Group Leader,if (member.matches(protocols)) {responseCallback(JoinGroupResult(members = if (memberId == group.leaderId) {group.currentMemberMetadata} else {Map.empty},memberId = memberId,generationId = group.generationId,subProtocol = group.protocol,leaderId = group.leaderId,errorCode = Errors.NONE.code))} else {// 若是发生了变化则更新member信息,并且调用maybePrepareRebalance将状态切换为PreparingRebalanceupdateMemberAndRebalance(group, member, protocols, responseCallback)}}case Empty | Stable =>if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {// 如果是未知新的member加入,创建member,并且分配memberId,加入到GroupMetatdata中,// 然后调用maybePrepareRebalance方法将状态切换为PreparingRebalanceaddMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)} else {// 如果是已知member重新申请加入,则要区分member所支持的PartitionAssignor是否发生了变化val member = group.get(memberId)//如果发生了变化或者Group Leader发送JoinGroupRequest,则更新member信息,并调用方法maybePrepareRebalance// 将状态切换为PreparingRebalance.if (memberId == group.leaderId || !member.matches(protocols)) {updateMemberAndRebalance(group, member, protocols, responseCallback)} else {// 若未发生变化则将当前的GroupMetadata的当前状态返回,然后消费者会发送SyncGroupRequest继续后面的操作responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,generationId = group.generationId,subProtocol = group.protocol,leaderId = group.leaderId,errorCode = Errors.NONE.code))}}}// 尝试完成相关的DelayedJoinif (group.is(PreparingRebalance))joinPurgatory.checkAndComplete(GroupKey(group.groupId))}}
}
private def addMemberAndRebalance(rebalanceTimeoutMs: Int, sessionTimeoutMs: Int, clientId: String,clientHost: String, protocolType: String, protocols: List[(String, Array[Byte])],group: GroupMetadata, callback: JoinCallback) = {// 构走一个memberIdval memberId = clientId + "-" + group.generateMemberIdSuffix// 创建MemberMetadata对象val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,sessionTimeoutMs, protocolType, protocols)// 设置awaitingJoinCallback回调函数,该函数是KafkaApis#handleJoinGroupRequest里的回调函数sendResponseCallbackmember.awaitingJoinCallback = callback// 添加到GroupMetadata保存group.add(member.memberId, member)// 尝试进行状态切换到PrepareRebalancemaybePrepareRebalance(group)member
}
private def prepareRebalance(group: GroupMetadata) {// 如果处于AwaitingSync状态,则先要重置MemberMetadata#assignment字段if (group.is(AwaitingSync))resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)// 将消费者组的状态切换为PreparingRebalance,表示准备执行rebalance操作group.transitionTo(PreparingRebalance)info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))// DelayedJoin超时时长是GroupMetadata中所有Member设置的超时时长的最大值val rebalanceTimeout = group.rebalanceTimeoutMs// 创建DelayedJoin对象val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)// 创建DelayedJoin的keyval groupKey = GroupKey(group.groupId)// 尝试立即完成DelayedJoin,否则将DelayedFetch添加到joinPurgatory中joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
private def updateMemberAndRebalance(group: GroupMetadata, member: MemberMetadata,protocols: List[(String, Array[Byte])], callback: JoinCallback) {// 更新MemberMetadata支持的协议和awaitingJoinCallback回调函数member.supportedProtocols = protocols// 设置awaitingJoinCallback回调函数member.awaitingJoinCallback = callback// 尝试进行状态转换maybePrepareRebalance(group)
}

四 DelayedJoin分析

我们知道消费者组转台切换为PreparingRebalance时会创建一个DelayedJoin对象并添加到GroupCoordinator的joinPurgatory中管理。DelayedJoin也是一种延迟操作,主要功能就是等待消费者组中所有消费者发送JoinGroupRequest申请加入。每当处理完新收到的JoinGroupRequest时候,都会检测相关的DelayedJoin 是否能够完成,经过一段时间的等待,DelayedJoin也会到期执行

coordinator: GroupCoordinator 对应的GroupCoordinator

group: GroupMetadata 对应的GroupMetadata

rebalanceTimeout: Long 指定DelayedJoin到期时长

def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {group synchronized {// 判断已知的member是否已经申请加入if (group.notYetRejoinedMembers.isEmpty)forceComplete()else false}
}
def onCompleteJoin(group: GroupMetadata) {var delayedStore: Option[DelayedStore] = Nonegroup synchronized {// 获取未重新申请加入加入的已知member,从GroupMetadtat删除之group.notYetRejoinedMembers.foreach { failedMember =>group.remove(failedMember.memberId)}// 如果组内还有member或者没有memeber但是元数据还没有被删除if (!group.is(Dead)) {// 递增generationId,并且还会选择消费者组最终使用的PartitionAssignorgroup.initNextGeneration()// 如果组内已经没有成员,但是元数据还没有被删除if (group.is(Empty)) {info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")// 创建DelayedStore对象delayedStore = groupManager.prepareStoreGroup(group, Map.empty, error => {if (error != Errors.NONE) {warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")}})} else {// 就是组内还有成员info(s"Stabilized group ${group.groupId} generation ${group.generationId}")// 向GroupMetadata中所有的Member发送JoinGroupResponsefor (member <- group.allMemberMetadata) {assert(member.awaitingJoinCallback != null)// 发给Group Leader和follower的JoinGroupResponse不一样val joinResult = JoinGroupResult(members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },memberId=member.memberId,generationId=group.generationId,subProtocol=group.protocol,leaderId=group.leaderId,errorCode=Errors.NONE.code)member.awaitingJoinCallback(joinResult)member.awaitingJoinCallback = nullcompleteAndScheduleNextHeartbeatExpiration(group, member)}}}}// 调用GroupMetadataMangaer的store方法delayedStore.foreach(groupManager.store)
}

五 HearbeatRequest分析

每一个消费者都会定期向GroupCoordinator发送HeartbeatRequest请求,告诉GroupCoordinator自己还活着。

def handleHeartbeat(groupId: String, memberId: String, generationId: Int, responseCallback: Short => Unit) {// 首先进行一系列检测,当前GroupCoordinator是否管理该消费者组;是否已经加载对应的offset topic分区if (!isActive.get) {responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)} else if (!isCoordinatorForGroup(groupId)) {responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)} else if (isCoordinatorLoadingInProgress(groupId)) {responseCallback(Errors.NONE.code)} else {// 根据groupId 得到groupgroupManager.getGroup(groupId) match {case None =>responseCallback(Errors.UNKNOWN_MEMBER_ID.code)case Some(group) =>group synchronized {// 检测group 状态group.currentState match {case Dead =>// 如果Dead状态,表示其他线程已经从元数据中删除了该组,这可能组已经迁移到其他GroupCoordinator或者// 组正处于非stable状态的转换responseCallback(Errors.UNKNOWN_MEMBER_ID.code)case Empty =>responseCallback(Errors.UNKNOWN_MEMBER_ID.code)// 如果处于AwaitingSynccase AwaitingSync =>if (!group.has(memberId))responseCallback(Errors.UNKNOWN_MEMBER_ID.code)elseresponseCallback(Errors.REBALANCE_IN_PROGRESS.code)// 如果处于PreparingRebalancecase PreparingRebalance =>if (!group.has(memberId)) {responseCallback(Errors.UNKNOWN_MEMBER_ID.code)} else if (generationId != group.generationId) {responseCallback(Errors.ILLEGAL_GENERATION.code)} else {val member = group.get(memberId)completeAndScheduleNextHeartbeatExpiration(group, member)responseCallback(Errors.REBALANCE_IN_PROGRESS.code)}// 如果处于Stable状态case Stable =>if (!group.has(memberId)) {responseCallback(Errors.UNKNOWN_MEMBER_ID.code)} else if (generationId != group.generationId) {responseCallback(Errors.ILLEGAL_GENERATION.code)} else {val member = group.get(memberId)completeAndScheduleNextHeartbeatExpiration(group, member)responseCallback(Errors.NONE.code)}}}}}
}
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {// complete current heartbeat expectationmember.latestHeartbeat = time.milliseconds()// 获取DelayedHeartbeat的keyval memberKey = MemberKey(member.groupId, member.memberId)// 尝试完成之前添加的DelayedHeartbeatheartbeatPurgatory.checkAndComplete(memberKey)// 计算下一次的Heartbeat的超时时间val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs// 创建新的DelayedHeartbeat,并添加到heartbeatPurgatory中val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}

六 SyncGroupRequest分析

def handleSyncGroup(groupId: String, generation: Int, memberId: String,groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback) {if (!isActive.get) {responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)} else if (!isCoordinatorForGroup(groupId)) { // 检测是否该GroupCoordinator管理这个消费者组responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)} else {groupManager.getGroup(groupId) match {case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)// 调用doSyncGroupcase Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)}}
}
private def doSyncGroup(group: GroupMetadata, generationId: Int, memberId: String,groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback) {var delayedGroupStore: Option[DelayedStore] = Nonegroup synchronized {// 检测memeber是不是group成员if (!group.has(memberId)) {responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)} else if (generationId != group.generationId) { //检测generationId是否合法responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)} else {group.currentState match {case Empty | Dead =>// 直接返回UNKNOWN_MEMBER_ID错误码responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)case PreparingRebalance =>// 直接返回REBALANCE_IN_PROGRESS错误码responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)case AwaitingSync =>// 设置awaitingSyncCallback函数也就是kafkaApis里的handleSyncGroupRequest里的sendResponseCallback回调group.get(memberId).awaitingSyncCallback = responseCallback// 如果当前发送该请求的memeber是leaderif (memberId == group.leaderId) {info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")// 将没有分配分区的memeber对应的分配结果填充为空的byte数组val missing = group.allMembers -- groupAssignment.keySetval assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap// 通过GroupMetadataManager将GroupMetadata相关信息形成消息,并且写入到对应的offset topic中delayedGroupStore = groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {group synchronized {if (group.is(AwaitingSync) && generationId == group.generationId) {if (error != Errors.NONE) {// 清空分区分配结果,发送异常响应resetAndPropagateAssignmentError(group, error)// 状态转换为PrepareRebalancemaybePrepareRebalance(group)} else {// 设置分区的分配结果,发送正常的SyncGroupResponsesetAndPropagateAssignment(group, assignment)// 状态转换为Stablegroup.transitionTo(Stable)}}}})}case Stable =>// 将分配给这个member的负责处理的分区信息返回val memberMetadata = group.get(memberId)responseCallback(memberMetadata.assignment, Errors.NONE.code)completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))}}}// 调用GroupMetadataManager的store方法delayedGroupStore.foreach(groupManager.store)
}

七 OffsetCommitRequest分析

def handleCommitOffsets(groupId: String, memberId: String, generationId: Int,offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],responseCallback: immutable.Map[TopicPartition, Short] => Unit) {// 首先进行一系列的检查if (!isActive.get) {responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))} else if (!isCoordinatorForGroup(groupId)) {responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code))} else if (isCoordinatorLoadingInProgress(groupId)) {responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code))} else {groupManager.getGroup(groupId) match {// 如果对象消费者组不存在且generationId < 0,表示GroupCoordinator不维护消费组的分区分配结果,只记录提交的offsetcase None =>if (generationId < 0) {// the group is not relying on Kafka for group management, so allow the commitval group = groupManager.addGroup(new GroupMetadata(groupId))doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)} else {// or this is a request coming from an older generation. either way, reject the commitresponseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))}case Some(group) =>doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)}}
}
def doCommitOffsets(group: GroupMetadata, memberId: String, generationId: Int,offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],responseCallback: immutable.Map[TopicPartition, Short] => Unit) {var delayedOffsetStore: Option[DelayedStore] = Nonegroup synchronized {// 如果group状态时Deadif (group.is(Dead)) {// 直接返回UNKNOWN_MEMBER_ID错误码responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))} else if (generationId < 0 && group.is(Empty)) {// 如果group没有member且generationId < 0// 这个组仅仅用于存储offsetsdelayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,offsetMetadata, responseCallback)} else if (group.is(AwaitingSync)) {// 如果组状态AwaitingSync// 直接返回响应REBALANCE_IN_PROGRESSresponseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))} else if (!group.has(memberId)) { // 如果发送请求meneber不是该组的成员,返回UNKNOWN_MEMBER_ID错误码responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))} else if (generationId != group.generationId) {// 如果genrationId不合法,返回ILLEGAL_GENERATION错误码responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))} else {// 获取memberval member = group.get(memberId)// 更新心跳completeAndScheduleNextHeartbeatExpiration(group, member)// 创建DelayedOffsetStoredelayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,offsetMetadata, responseCallback)}}// 调用GroupMetadataManger的store方法存储offsetdelayedOffsetStore.foreach(groupManager.store)
}

八 LeaveGroupRequest分析

当消费者离开消费者组,例如调用unsubscribe方法取消对topic的订阅,会向GroupCoordiantor发送LeaveGroupRequest.

def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Short => Unit) {if (!isActive.get) {responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)} else if (!isCoordinatorForGroup(groupId)) {responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)} else if (isCoordinatorLoadingInProgress(groupId)) {responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code)} else {groupManager.getGroup(groupId) match {// 如果没找到该组,返回UNKNOWN_MEMBER_ID错误码case None =>responseCallback(Errors.UNKNOWN_MEMBER_ID.code)case Some(group) =>group synchronized {// 如果group已经没有组了,且元数据也被删除了或者该组没有这个memberif (group.is(Dead) || !group.has(memberId)) {// 返回UNKNOWN_MEMBER_ID错误码responseCallback(Errors.UNKNOWN_MEMBER_ID.code)} else {// 获取该memberval member = group.get(memberId)// 标记该成员为isLeaving为true;尝试执行对应的DelayedHeartbeat操作removeHeartbeatForLeavingMember(group, member)// 从消费者组中删除该成员,并且进行组的状态转换onMemberFailure(group, member)responseCallback(Errors.NONE.code)}}}}
}
private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {// 标记该成员为isLeaving为truemember.isLeaving = true// 构建 member keyval memberKey = MemberKey(member.groupId, member.memberId)// 尝试执行对应的DelayedHeartbeat操作heartbeatPurgatory.checkAndComplete(memberKey)
}
private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {trace("Member %s in group %s has failed".format(member.memberId, group.groupId))// 从消费者组中删除该成员group.remove(member.memberId)group.currentState match {// group当前状态是Dead | Empty,什么也不做case Dead | Empty =>// group当前状态是Stable | AwaitingSync ,则转换成PrepareRebalancecase Stable | AwaitingSync => maybePrepareRebalance(group)// group当前状态是PrepareRebalance,则试图完成DelayedJoin操作case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))}
}

GroupCoordinator分析相关推荐

  1. Java面试题库,腾讯面试需要Java转go

    第1章快速入门 1.1 Kafka简介 1.2 以Kafka为中心的解决方案 1.3 Kafka核心概念 1.4 Kafka源码环境 第2章生产者 2.1 KafkaProducer 使用示例 2.2 ...

  2. Kafka GroupCoordinator机制(十六):GroupCoordinator之LeaveGroupRequest分析

    消费者离开消费者组是向GroupCoordinator发送LeaveGroupRequest. def handleLeaveGroup(groupId: String, consumerId: St ...

  3. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  4. 消费者rebalance机制分析

    一 触发rebalance的时机 # 有新的消费者加入 # 有消费者宕机或者下线 # 消费者主动退出消费者组 # 消费者组订阅的topic出现分区数量变化 # 消费者调用unsubscrible取消对 ...

  5. 消费者Heartbeat分析

    消费者会定期向GroupCoordinator发送HeartbeatRequest来确定 彼此在线,也就是说告诉GroupCoordinator我还活着,或者也判断GrooupCoordinator是 ...

  6. 【kafka】Kafka 之 Group 状态变化分析及 Rebalance 过程

    文章目录 1.概述 2. Group 状态机 3.offset 那些事 4.Topic __consumer_offsets 5.GroupCoordinator 6.状态转移图 7.Consumer ...

  7. Kafka系列之-Kafka Protocol实例分析

    本文基于A Guide To The Kafka Protocol文档,以及Spark Streaming中实现的org.apache.spark.streaming.kafka.KafkaClust ...

  8. kafka源码愫读(5)、ReplicaManager模块源码分析

    1.ReplicaManager模块简介 replicaManager主要用来管理topic在本broker上的副本信息.并且读写日志的请求都是通过replicaManager进行处理的. 每个rep ...

  9. 聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

    四.ApiKeys.FIND_COORDINATOR 我们前面的文章说过,与消费组相关的两个组件,一个是消费者客户端的 ConsumerCoordinator,一个是 Kafka Broker 服务端 ...

最新文章

  1. 机械爪角度与距离之间的关系
  2. 在win8.1 64位系统+cocos2d-x2.2.3下搭建android交叉编译环境
  3. Dijkstra模板(java)
  4. 多进程Socket_Server
  5. 以IP段作为监听地址
  6. 计算机视觉python入门_计算机视觉应该怎样入门?
  7. 添加组合索引时,做相等运算字段应该放在最前面
  8. 关于更改当前公司(一)--ChangeCompany
  9. Halcon 找圆测量工具
  10. 爱情九十一课,留下好的你
  11. Educoder Matplotlib和Seaborn 三维图 第一关绘制三维图
  12. Redis高并发1-redis环境搭建
  13. NXP恩智浦智能车四轮组-- 2.电磁检波电路、运放模块原理图
  14. SC 防火墙防DOS工具机制
  15. matlab方波函数,matlab方波
  16. top和margin-top等的区别
  17. python 实现京东滑块验证码登录
  18. agd插值算法_多目标自适应和声搜索算法
  19. HDU4847-Wow! Such Doge!
  20. SciTE 常见问题及解决方法集锦

热门文章

  1. 基于贝叶斯推断的分类模型 机器学习你会遇到的“坑”
  2. 不属于jsp构成元素_JSP构成元素-JSP基础
  3. 算法提高 理财计划(java)
  4. mysql first value_开窗函数 First_Value 和 Last_Value
  5. (三)Netty之Channel通道
  6. 解决新版DBUtils使用连接池from DBUtils.PooledDB import PooledDB报错
  7. 解决报错OMP: Error #15: Initializing libiomp5.dylib, but found libomp.dylib already initialized
  8. python使用英汉大字典离线获取单词释义
  9. Mac 安装配置 chromedriver
  10. Scrapy爬虫抓取ZOL手机详情