完成了前期准备工作之后,消费者将正式开始执行分区再分配,这是一个客户端与服务端交互配合的过程,消费者需要构造并发送 JoinGroupResult 请求到对应的 GroupCoordinator 实例所在节点申请加入目标 group。

这一过程位于 AbstractCoordinator#initiateJoinGroup 方法中,该方法的主要工作就是切换当前消费者的状态为 REBALANCING,创建并缓存 JoinGroupRequest 请求,并处理申请加入的结果。如果申请加入成功,则会切换当前消费者的状态为 STABLE,并重启心跳机制(为了避免心跳机制干扰分区再分配,在开始执行分区再分配之前会临时关闭心跳机制);如果申请加入失败,则会切换当前消费者的状态为 UNJOINED。

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {// we store the join future in case we are woken up by the user after beginning the// rebalance in the call to poll below. This ensures that we do not mistakenly attempt// to rejoin before the pending rebalance has completed.if (joinFuture == null) {// fence off the heartbeat thread explicitly so that it cannot interfere with the join group.// Note that this must come after the call to onJoinPrepare since we must be able to continue// sending heartbeats if that callback takes some time.disableHeartbeatThread();// 切换当前消费者的状态为 REBALANCINGstate = MemberState.REBALANCING;// a rebalance can be triggered consecutively if the previous one failed,// in this case we would not update the start time.if (lastRebalanceStartMs == -1L)lastRebalanceStartMs = time.milliseconds();// 创建并缓存 JoinGroupRequest 请求,并处理申请加入的结果joinFuture = sendJoinGroupRequest();joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {@Overridepublic void onSuccess(ByteBuffer value) {// handle join completion in the callback so that the callback will be invoked// even if the consumer is woken up before finishing the rebalancesynchronized (AbstractCoordinator.this) {if (generation != Generation.NO_GENERATION) {log.info("Successfully joined group with generation {}", generation.generationId);// 果申请加入成功,则会切换当前消费者的状态为 STABLE,并重启心跳机制state = MemberState.STABLE;rejoinNeeded = false;// record rebalance latencylastRebalanceEndMs = time.milliseconds();sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);lastRebalanceStartMs = -1L;if (heartbeatThread != null)heartbeatThread.enable();} else {log.info("Generation data was cleared by heartbeat thread. Rejoin failed.");recordRebalanceFailure();}}}@Overridepublic void onFailure(RuntimeException e) {// we handle failures below after the request finishes. if the join completes// after having been woken up, the exception is ignored and we will rejoinsynchronized (AbstractCoordinator.this) {recordRebalanceFailure();}}private void recordRebalanceFailure() {state = MemberState.UNJOINED;sensors.failedRebalanceSensor.record();}});}return joinFuture;
}

sendJoinGroupRequest

JoinGroupRequest 请求中包含了当前消费者的 ID,消费者所属 group 的 ID、消费者支持的分区策略、协议类型、以及会话超时时间等信息。构造、发送,以及处理 JoinGroupRequest 请求及其响应的过程位于 AbstractCoordinator#sendJoinGroupRequest 方法中,实现如下:

RequestFuture<ByteBuffer> sendJoinGroupRequest() {if (coordinatorUnknown())// 如果目标 GroupCoordinator 节点不可达,则返回异常return RequestFuture.coordinatorNotAvailable();// send a join group request to the coordinatorlog.info("(Re-)joining group");// 构建 JoinGroupRequest 请求JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(new JoinGroupRequestData().setGroupId(rebalanceConfig.groupId).setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs).setMemberId(this.generation.memberId).setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)).setProtocolType(protocolType()).setProtocols(metadata()).setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs));log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);// Note that we override the request timeout using the rebalance timeout since that is the// maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);// 发送 JoinGroupRequest 请求,并注册结果处理器 JoinGroupResponseHandlerreturn client.send(coordinator, requestBuilder, joinGroupTimeoutMs).compose(new JoinGroupResponseHandler());
}

JoinGroupResponseHandler

消费者通过注册结果处理器 JoinGroupResponseHandler 对请求的响应结果进行处理,如果是正常响应则会执行分区分配操作,核心逻辑实现如下:

private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {@Overridepublic void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {Errors error = joinResponse.error();if (error == Errors.NONE) {log.debug("Received successful JoinGroup response: {}", joinResponse);sensors.joinSensor.record(response.requestLatencyMs());synchronized (AbstractCoordinator.this) {if (state != MemberState.REBALANCING) {// 确认当前状态是不是 REBALANCING// 例如消费者因某种原理离开了所属的 group,这种情况下不应该再继续执行下去。// if the consumer was woken up before a rebalance completes, we may have already left// the group. In this case, we do not want to continue with the sync group.future.raise(new UnjoinedGroupException());} else {// 基于响应,更新 group 的年代信息AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),joinResponse.data().memberId(), joinResponse.data().protocolName());// 如果当前消费者是 group 中的 leader 角色if (joinResponse.isLeader()) {/** 基于分区分配策略执行分区分配,leader 需要关注当前 group 中所有消费者订阅的 topic,* 依据 GroupCoordinator 最终确定的分区分配策略为当前 group 名下所有的消费者分配分区,* 并发送 SyncGroupRequest 请求向对应的 GroupCoordinator 节点反馈最终的分区分配结果。*/onJoinLeader(joinResponse).chain(future);} else {// 如果是 follower 消费者,则只关注自己订阅的 topic,这一步仅发送 SyncGroupRequest 请求// 响应 JoinGroupRequest 请求的逻辑只是构造一个包含空的分区分配结果的 SyncGroupRequest 请求,并附带上所属的 group 和自身 ID,以及 group 年代信息,发送给对应的 GroupCoordinator 节点,// 如果此时所属的 group 已经处于正常运行的状态,则该消费者会拿到分配给自己的分区信息。onJoinFollower().chain(future);}}}} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());// backoff and retry// 在接收到响应之前,消费者的状态发生变更(可能已经从所属 group 离开),抛出异常future.raise(error);} else if (error == Errors.UNKNOWN_MEMBER_ID) {// reset the member id and retry immediatelyresetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error);log.debug("Attempt to join group failed due to unknown member id.");future.raise(error);} else if (error == Errors.COORDINATOR_NOT_AVAILABLE|| error == Errors.NOT_COORDINATOR) {// re-discover the coordinator and retry with backoffmarkCoordinatorUnknown();log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());future.raise(error);} else if (error == Errors.FENCED_INSTANCE_ID) {log.error("Received fatal exception: group.instance.id gets fenced");future.raise(error);} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL|| error == Errors.INVALID_SESSION_TIMEOUT|| error == Errors.INVALID_GROUP_ID|| error == Errors.GROUP_AUTHORIZATION_FAILED|| error == Errors.GROUP_MAX_SIZE_REACHED) {// log the error and re-throw the exceptionlog.error("Attempt to join group failed due to fatal error: {}", error.message());if (error == Errors.GROUP_MAX_SIZE_REACHED) {future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId +" already has the configured maximum number of members."));} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));} else {future.raise(error);}} else if (error == Errors.UNSUPPORTED_VERSION) {log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" +"to see if the problem resolves");future.raise(error);} else if (error == Errors.MEMBER_ID_REQUIRED) {// Broker requires a concrete member id to be allowed to join the group. Update member id// and send another join group request in next cycle.synchronized (AbstractCoordinator.this) {AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,joinResponse.data().memberId(), null);AbstractCoordinator.this.resetStateAndRejoin();}future.raise(error);} else {// unexpected error, throw the exceptionlog.error("Attempt to join group failed due to unexpected error: {}", error.message());future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));}}
}

AbstractCoordinator#onJoinLeader 
        onJoinLeader基于分区分配策略分配分区并返回结果给服务端。

private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {try {// perform the leader synchronization and send back the assignment for the group// 基于分区分配策略分配分区Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),joinResponse.data().members());List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(assignment.getKey()).setAssignment(Utils.toArray(assignment.getValue())));}// 创建 SyncGroupRequest 请求,反馈分区分配结果给 GroupCoordinator 节点SyncGroupRequest.Builder requestBuilder =new SyncGroupRequest.Builder(new SyncGroupRequestData().setGroupId(rebalanceConfig.groupId).setMemberId(generation.memberId).setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)).setGenerationId(generation.generationId).setAssignments(groupAssignmentList));log.debug("Sending leader SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);// 发送 SyncGroupRequest 请求return sendSyncGroupRequest(requestBuilder);} catch (RuntimeException e) {return RequestFuture.failure(e);}
}

分配分区的具体过程位于 ConsumerCoordinator#performAssignment 方法中

protected Map<String, ByteBuffer> performAssignment(String leaderId,String assignmentStrategy,List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {// 从消费者支持的分区分配策略集合中选择指定策略对应的分区分配器ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);if (assignor == null)throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);// 解析封装 topic 订阅信息Set<String> allSubscribedTopics = new HashSet<>();// 记录 group 名下所有消费者订阅的 topic 集合Map<String, Subscription> subscriptions = new HashMap<>(); // Map<String, ByteBuffer> -> Map<String, Subscription>// collect all the owned partitionsMap<String, List<TopicPartition>> ownedPartitions = new HashMap<>();for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));subscriptions.put(memberSubscription.memberId(), subscription);allSubscribedTopics.addAll(subscription.topics());ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions());}// the leader will begin watching for changes to any of the topics the group is interested in,// which ensures that all metadata changes will eventually be seen// 分区再分配之后,检测是否需要更新集群元数据信息,如果需要则立即更新updateGroupSubscription(allSubscribedTopics);// 标记当前消费者为 leader 角色isLeader = true;log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions);/** 基于分区分配器(range/round-robin)执行分区分配,* 返回结果:key 是消费者 ID,value 是对应的分区分配结果*/Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();if (protocol == RebalanceProtocol.COOPERATIVE) {validateCooperativeAssignment(ownedPartitions, assignments);}// user-customized assignor may have created some topics that are not in the subscription list// and assign their partitions to the members; in this case we would like to update the leader's// own metadata with the newly added topics so that it will not trigger a subsequent rebalance// when these topics gets updated from metadata refresh.//// TODO: this is a hack and not something we want to support long-term unless we push regex into the protocol//       we may need to modify the ConsumerPartitionAssignor API to better support this case.// 记录所有完成分配的 topic 集合Set<String> assignedTopics = new HashSet<>();for (Assignment assigned : assignments.values()) {for (TopicPartition tp : assigned.partitions())assignedTopics.add(tp.topic());}// 如果 group 中存在一些已经订阅的 topic 并未分配,则日志记录if (!assignedTopics.containsAll(allSubscribedTopics)) {Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);notAssignedTopics.removeAll(assignedTopics);log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics);}// 如果分配的 topic 集合包含一些未订阅的 topic 集合if (!allSubscribedTopics.containsAll(assignedTopics)) {// 日志记录这些未订阅的 topicSet<String> newlyAddedTopics = new HashSet<>(assignedTopics);newlyAddedTopics.removeAll(allSubscribedTopics);log.info("The following not-subscribed topics are assigned, and their metadata will be " +"fetched from the brokers: {}", newlyAddedTopics);// 将这些已分配但是未订阅的 topic 添加到 group 集合中allSubscribedTopics.addAll(assignedTopics);// 更新元数据信息updateGroupSubscription(allSubscribedTopics);}// 更新本地缓存的元数据信息快照assignmentSnapshot = metadataSnapshot;log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments);// 对分区分配结果进行序列化,后续需要反馈给集群Map<String, ByteBuffer> groupAssignment = new HashMap<>();for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet()) {ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());groupAssignment.put(assignmentEntry.getKey(), buffer);}return groupAssignment;
}

Kafka 消费者模块(三):rebalance的发送JoinGroupResult请求相关推荐

  1. 【Kafka】Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor

    文章目录 1. 分配策略 1.1 Range(默认策略) 1.2 RoundRobin RoundRobin的两种情况 1.3 StickyAssignor 2. Range策略演示 参考 相关文章 ...

  2. 深入浅出系列之 -- kafka消费者的三种语义模型

    本文主要详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once,at-least-once,和exact-once message,生产者的使用等. 创建主题 ...

  3. python接口自动化测试三:代码发送HTTP请求

    get请求: 1.get请求(无参数): 2.get请求(带参数): 接口地址:http://japi.juhe.cn/qqevaluate/qq 返回格式:json 请求方式:get post 请求 ...

  4. Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor

    一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消 ...

  5. kafka消费者Rebalance机制

    目录 1.Rebalance机制 2.消费者Rebalance分区分配策略 3.Rebalance过程 1.Rebalance机制 rebalance就是说如果消费组里的消费者数量有变化或消费的分区数 ...

  6. 聊聊Kafka(三)Kafka消费者与消费组

    Kafka消费者与消费组 简介 消费者 概念入门 消费者.消费组 心跳机制 消息接收 必要参数配置 订阅 反序列化 位移提交 消费者位移管理 再均衡 避免重平衡 消费者拦截器 消费组管理 什么是消费者 ...

  7. Kafka 消费者组 Rebalance 详解

    Rebalance作用 Rebalance 本质上是一种协议,主要作用是为了保证消费者组(Consumer Group)下的所有消费者(Consumer)消费的主体分区达成均衡. 比如:我们有10个分 ...

  8. 深入分析Kafka架构(三):消费者消费方式、三种分区分配策略、offset维护

    本文目录 一.前言 二.消费者消费方式 三.分区分配策略 3.1.分配分区的前提条件 3.2.Range分配策略 3.3.RoundRobin分配策略 3.4.Sticky分配策略 四.offset维 ...

  9. Kafka 消费者组管理模块(六):GroupCoordinator 处理成员入组

    Rebalance 的流程大致分为两大步:加入组(JoinGroup)和组同步(SyncGroup).         加入组,是指消费者组下的各个成员向 Coordinator 发送 JoinGro ...

  10. Kafka(三):kafka消费者

    文章目录 1. 消费方式 2. 消费者总体工作流程 2.1 消费者组 2.2 消费者组初始化流程 2.3 消费者组详细消费流程 3 消费者重要参数 4. 分区的分配以及再平衡 4.1 Range以及再 ...

最新文章

  1. tensorflow随笔-检测浮点数类型check_numerics
  2. 剑三服务器文件在哪里,剑三服务器同步设置在哪
  3. 第二章作业-第3题-万世想
  4. using语句之-释放资源和异常处理
  5. robot1,Mechanical structure
  6. python pcm 分贝_语音文件 pcm 静默(静音)判断
  7. django mysql5.7_GitHub - qiubiteme/DjangoBloger: 一个Django2.0+mysql57,实现的响应式博客
  8. Openlayer 3 的画图测量面积
  9. 如何利用计算机系统原理做文件保护,计算机系统设计原理(影印版).docx
  10. C++笔记---函数声明(prototype)
  11. Python下opencv(图像的阈值处理)
  12. (一)vmware中Linux共享文件夹设置
  13. javascript原生代码实现轮播图片
  14. 电子烟行业的十大进销存软件app,强推第一个
  15. 在线二进制转文本字符工具
  16. 2 数据可视化大屏 - 布局
  17. 键盘怎么按出计算机,怎么在电脑键盘上打出艾特@键? 原来是这样的
  18. 数据驱动的瑞幸咖啡未来会能赚!
  19. win10更新后,浏览器打开网页一直加载 甚至打不开,错误代码:ERR_TIMED_OUT
  20. JMP数据分析峰会2021圆满落幕,重温大会高光时刻

热门文章

  1. 吴恩达深度学习工程师系列课程笔记(Deep Learning Specialization - deeplearning.ai)
  2. GIS应用技巧之景观格局分析(四)
  3. Chromium Portable实时更新github下载 最新版本Chromium Portable(谷歌浏览器) 下载
  4. java pointer_Java Pointer.pointerToCString方法代码示例
  5. linux tpp模式,tpp 'exec'命令任意代码执行漏洞
  6. 价值几百元的EMlog仿大表哥资源网模版
  7. __gxx_personality_v0详解
  8. 电路实习报告:简易收音机的焊接
  9. PHP基础: CLI模式开发不需要任何一种Web服务器
  10. 合唱队形(线性DP)