ConsumerCoordinator实现了AbstractCoordinator,主要负责和服务器端GroupCoordinator进行交互

一 核心字段

// 心跳任务辅助类

Heartbeat heartbeat;

// 当前消费者所属的消费者组的group id

String groupId;

// 负责网络通信

ConsumerNetworkClient client;

Time time;

long retryBackoffMs;

// 进行心跳检测的线程类

HeartbeatThread heartbeatThread = null;

// 是否需要重新发送JoinGroupRequest

boolean rejoinNeeded = true;

// 是否需要执行发送JoinGroupRequest请求前的准备工作

boolean needsJoinPrepare = true;

// 消费者状态,默认是没有加入消费者组的

MemberState state = MemberState.UNJOINED;

// ConsumerNetworkClient发送JoinGroupRequest请求的结果

RequestFuture<ByteBuffer> joinFuture = null;

// GroopCoordinator所在的Node

Node coordinator = null;

// 服务器端GroopCoordinator返回的信息,用于区分两次rebalance操作,由于网络延迟等问题,在执行rebalance时可能收到上次rebalnace

//过程的请求,为了避免着各种干扰,每一次rebalance操作都会递增generation的值

Generation generation = Generation.NO_GENERATION;

// 分配分区策略的列表,消费者在发送JoinGroupRequest的时候包含自身的PartitionAssignor信息,GroupCoordinator

// 从所有消费者都支持的PartitionAssignor选择一个通知leader使用此分配策略进行分区分配,

// 我们可以通过partition.assignment.strategy设置

List<PartitionAssignor> assignors;

// 记录kafka集群的元数据

Metadata metadata;

ConsumerCoordinatorMetrics sensors;

// 一个跟踪消费者的主题列表,分区列表和offsets的类

SubscriptionState subscriptions;

// 提交offset 的回调类

OffsetCommitCallback defaultOffsetCommitCallback;

// 是否开启了自动提交offset的功能

boolean autoCommitEnabled;

// 自动提交的间隔时间

int autoCommitIntervalMs;

ConsumerInterceptors<?, ?> interceptors;

// 是否排除内部的topic列表

boolean excludeInternalTopics;

// 这个集合必须是线程安全的,因为他会被offset提交请求的response handler修改,他会被心跳线程调用

ConcurrentLinkedQueue<OffsetCommitCompletion>completedOffsetCommits;

// 是否是leader

boolean isLeader = false;

Set<String> joinedSubscription;

// 用来记录元数据变化的新快照

MetadataSnapshot metadataSnapshot;

// 用来存储Metadata的快照信息,不过是用来检测Partition分配过程中有没有发生分区数量的变化

MetadataSnapshot assignmentSnapshot;

// 下一次自动提交的最后期限

long nextAutoCommitDeadline;

二 核心方法

# 判断对于这个组的coordinatior是否已经准备好接受请求,否则一直阻塞

public synchronized void ensureCoordinatorReady() {
    while (coordinatorUnknown()) {
        RequestFuture<Void> future = lookupCoordinator();
        client.poll(future);
        // 异常处理
       
if (future.failed()) {
            if (future.isRetriable())
                // 阻塞更新metadata中的集群元数据
               
client.awaitMetadataUpdate();
            else
                throw future.exception();
        } else if (coordinator != null && client.connectionFailed(coordinator)) {
            // 如果连接不上GroupCoordinator,则退避一段时间,然后重试
           
coordinatorDead();
            time.sleep(retryBackoffMs);
        }
    }
}

# 查找GroupCoordinator,并返回一个请求结果

// 查找GroupCoordinator,并返回一个请求结果
protected synchronized RequestFuture<Void> lookupCoordinator() {if (findCoordinatorFuture == null) {// 查找集群负载最低的Node节点Node node = this.client.leastLoadedNode();// 如果找到了,则调用sendGroupCoordinatorRequestif (node == null) {return RequestFuture.noBrokersAvailable();} elsefindCoordinatorFuture = sendGroupCoordinatorRequest(node);}return findCoordinatorFuture;
}

# 检测心跳线程的状态,他必须加入组后定期调用以确保组成员还在

protected synchronized void pollHeartbeat(long now) {if (heartbeatThread != null) {// 如果心跳线程失败,将心跳线程设为空,然后抛出异常,下一次调用ensureActiveGroup在创建一个心跳线程if (heartbeatThread.hasFailed()) {// set the heartbeat thread to null and raise an exception. If the user catches it,// the next call to ensureActiveGroup() will spawn a new heartbeat thread.RuntimeException cause = heartbeatThread.failureCause();heartbeatThread = null;throw cause;}// 更新一下heartbeat的lastpoll 时间戳heartbeat.poll(now);}
}

# 确保组可用

public void ensureActiveGroup() {// 确保coordinator已经准备好接收请求ensureCoordinatorReady();// 开启心跳线程startHeartbeatThreadIfNeeded();// 加入组joinGroupIfNeeded();
}

# 进行Join Group操作会初始化JoinGroupRequest,并且发送请求到服务器端

void joinGroupIfNeeded() {// 是否允许重新加入while (needRejoin() || rejoinIncomplete()) {// 再次检测coordinator是否已经准备好ensureCoordinatorReady();if (needsJoinPrepare) {// 进行发送JoinGroupRequest之前的准备onJoinPrepare(generation.generationId, generation.memberId);//  needsJoinPrepare置为false,表示已经准备好了needsJoinPrepare = false;}// 初始化RequestFuture<ByteBuffer> future = initiateJoinGroup();// 阻塞等待join group的完成client.poll(future);// 重置JoinGroup的RequestFutureresetJoinGroupFuture();// 如果future成功,则重置needsJoinPrepare状态if (future.succeeded()) {needsJoinPrepare = true;onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());} else {RuntimeException exception = future.exception();if (exception instanceof UnknownMemberIdException ||exception instanceof RebalanceInProgressException ||exception instanceof IllegalGenerationException)continue;else if (!future.isRetriable())throw exception;time.sleep(retryBackoffMs);}}
}

# 初始化JoinGroupRequest请求,然后发送该请求

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {// 我们存储join future,以防止我们在开始rebalance之后,就被用户唤醒// 这确保了我们不会错误地尝试在尚未完成的再平衡完成之前重新加入。if (joinFuture == null) {// 对心跳线程进行明确的隔离,这样它就不会干扰到连接组。// 注意,这个后必须调用onJoinPrepare因为我们必须能够继续发送心跳,如果回调需要一些时间。disableHeartbeatThread();// 更改状态state = MemberState.REBALANCING;// 发送JoinGroupRequest,返回RequestFuture对象joinFuture = sendJoinGroupRequest();// 针对RequestFuture添加监听器joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {@Overridepublic void onSuccess(ByteBuffer value) {// 如果成功,则更新状态为消费者客户端已成功加入synchronized (AbstractCoordinator.this) {log.info("Successfully joined group {} with generation {}", groupId, generation.generationId);state = MemberState.STABLE;// 然后开始心跳检测if (heartbeatThread != null)heartbeatThread.enable();}}@Overridepublic void onFailure(RuntimeException e) {synchronized (AbstractCoordinator.this) {state = MemberState.UNJOINED;}}});}return joinFuture;
}

# 发送JoinGroupRequest

private RequestFuture<ByteBuffer> sendJoinGroupRequest() {// 检测coordinator是否可用if (coordinatorUnknown())return RequestFuture.coordinatorNotAvailable();// 创建JoinGroupRequestlog.info("(Re-)joining group {}", groupId);JoinGroupRequest request = new JoinGroupRequest(groupId,this.sessionTimeoutMs,this.rebalanceTimeoutMs,this.generation.memberId,protocolType(),metadata());log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);// 将这个请求放入unsent集合,等待被发送,并返回一个RequestFuture对象return client.send(coordinator, ApiKeys.JOIN_GROUP, request).compose(new JoinGroupResponseHandler());
}

# 发送GroupCoordinator请求

private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {// 创建GroupCoordinatorRequest请求log.debug("Sending coordinator request for group {} to broker {}", groupId, node);GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);// ConsumerNetworkClient将创建的GroupCoordinatorRequest请求放入一个unsent列表,等待发送// 并返回RequestFuture对象,返回的RequestFuture对象经过compose的适配return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest).compose(new GroupCoordinatorResponseHandler());
}

# 发送离开组的请求

public synchronized void maybeLeaveGroup() {if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) {// this is a minimal effort attempt to leave the group. we do not// attempt any resending if the request fails or times out.LeaveGroupRequest request = new LeaveGroupRequest(groupId, generation.memberId);client.send(coordinator, ApiKeys.LEAVE_GROUP, request).compose(new LeaveGroupResponseHandler());client.pollNoWakeup();}resetGeneration();
}

# 给Metadata添加监听器

private void addMetadataListener() {this.metadata.addListener(new Metadata.Listener() {@Overridepublic void onMetadataUpdate(Cluster cluster) {// if we encounter any unauthorized topics, raise an exception to the userif (!cluster.unauthorizedTopics().isEmpty())throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));if (subscriptions.hasPatternSubscription())updatePatternSubscription(cluster);// check if there are any changes to the metadata which should trigger a rebalanceif (subscriptions.partitionsAutoAssigned()) {MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);if (!snapshot.equals(metadataSnapshot))metadataSnapshot = snapshot;}}});
}

# 查找指定的分区分配策略

// 查找指定的分区分配策略
private PartitionAssignor lookupAssignor(String name) {for (PartitionAssignor assignor : this.assignors) {if (assignor.name().equals(name))return assignor;}return null;
}

# JoinGroupRequest请求完成之后,需要执行的动作

protected void onJoinComplete(int generation, String memberId,String assignmentStrategy, ByteBuffer assignmentBuffer) {// 只有leader才会监控元数据的改变if (!isLeader)assignmentSnapshot = null;// 获取partition分配策略PartitionAssignor assignor = lookupAssignor(assignmentStrategy);if (assignor == null)throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);// 获取分配结果Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);// 设置标记刷新最近一次提交的offsetsubscriptions.needRefreshCommits();// 更新分区的分配subscriptions.assignFromSubscribed(assignment.partitions());// 给一个分区分配策略一个机会更新分配结果assignor.onAssignment(assignment);// reschedule the auto commit starting from nowthis.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;// 执行rebalance之后的回调ConsumerRebalanceListener listener = subscriptions.listener();log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);try {Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());listener.onPartitionsAssigned(assigned);} catch (WakeupException e) {throw e;} catch (Exception e) {log.error("User provided listener {} for group {} failed on partition assignment",listener.getClass().getName(), groupId, e);}
}

# 对协调器事件的轮询。这确保了协调器是已知的,并且是消费者加入了这个群组(如果它使用的是组管理)。这也可以处理周期性的offset提交,如果启用了它们。

public void poll(long now) {// 调用offset 提交请求的回调函数,如果有invokeCompletedOffsetCommitCallbacks();// 判断订阅状态是否自动分配分区且coordinator不为空if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {// 确保coordinator已经准备好接收请求了ensureCoordinatorReady();now = time.milliseconds();}// 判断是否需要重新joinif (needRejoin()) {// 由于初始元数据获取和初始重新平衡之间的竞争条件,我们需要确保元数据在开始加入之前是新鲜的。// 确保了我们在join之前至少有一次与集群的主题的模式匹配if (subscriptions.hasPatternSubscription())// 刷新元数据client.ensureFreshMetadata();// 确保协调器可用,心跳线程开启和组可用ensureActiveGroup();now = time.milliseconds();}// 检测心跳线程的状态pollHeartbeat(now);// 异步的自动提交offsetmaybeAutoCommitOffsetsAsync(now);
}

# 为消费者组进行分配,主要用于leader向组内所有consumer推送状态

protected Map<String, ByteBuffer> performAssignment(String leaderId,String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions) {// 根据partition分配策略PartitionAssignor assignor = lookupAssignor(assignmentStrategy);if (assignor == null)throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);Set<String> allSubscribedTopics = new HashSet<>();Map<String, Subscription> subscriptions = new HashMap<>();for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());subscriptions.put(subscriptionEntry.getKey(), subscription);allSubscribedTopics.addAll(subscription.topics());}// the leader will begin watching for changes to any of the topics the group is interested in,// which ensures that all metadata changes will eventually be seenthis.subscriptions.groupSubscribe(allSubscribedTopics);metadata.setTopics(this.subscriptions.groupSubscription());// update metadata (if needed) and keep track of the metadata used for assignment so that// we can check after rebalance completion whether anything has changedclient.ensureFreshMetadata();isLeader = true;assignmentSnapshot = metadataSnapshot;log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",groupId, assignor.name(), subscriptions);Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);log.debug("Finished assignment for group {}: {}", groupId, assignment);Map<String, ByteBuffer> groupAssignment = new HashMap<>();for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());groupAssignment.put(assignmentEntry.getKey(), buffer);}return groupAssignment;
}

# 判断是否需要进行重新join

public boolean needRejoin() {// 如果不是自动分配partition,不如用户自己指定,则不需要rejoinif (!subscriptions.partitionsAutoAssigned())return false;// 如果我们执行了任务并且元数据改变了,我们需要重新加入if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))return true;// 如果我们的订阅自上一次join之后改变了,那么我们需要加入if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))return true;// 默认需要重新joinreturn super.needRejoin();
}

# 对于一套partition,从协调器中获取当前已提交的偏移量

public void refreshCommittedOffsetsIfNeeded() {if (subscriptions.refreshCommitsNeeded()) {// 对于分配的partition,从协调器中获取当前已提交的偏移量Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());// 遍历TopicPartitionfor (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {TopicPartition tp = entry.getKey();// 验证分配是否是有效的,如果有效,提交offsetif (subscriptions.isAssigned(tp))this.subscriptions.committed(tp, entry.getValue());}// 刷新提交this.subscriptions.commitsRefreshed();}
}

# 进行发送JoinGroupRequest之前的准备

protected void onJoinPrepare(int generation, String memberId) {// 如果开启了自动提交,则在rebalance之前进行自动提交offsetmaybeAutoCommitOffsetsSync();// 执行注册在SubscriptionState上的ConsumerRebalanceListener的回调方法ConsumerRebalanceListener listener = subscriptions.listener();log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);try {Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());listener.onPartitionsRevoked(revoked);} catch (WakeupException e) {throw e;} catch (Exception e) {log.error("User provided listener {} for group {} failed on partition revocation",listener.getClass().getName(), groupId, e);}isLeader = false;// 重置该组的订阅,只包含该用户订阅的主题。subscriptions.resetGroupSubscription();
}

ConsumerCoordinator分析相关推荐

  1. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  2. 消费者rebalance机制分析

    一 触发rebalance的时机 # 有新的消费者加入 # 有消费者宕机或者下线 # 消费者主动退出消费者组 # 消费者组订阅的topic出现分区数量变化 # 消费者调用unsubscrible取消对 ...

  3. Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析

    文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...

  4. kafka原理和性能分析测试

    1.Kafka写数据流程: producer先从zookeeper的broker-list的节点找到partition(分区)的leader: producer将消息发送给该leader的partit ...

  5. 你绝对能看懂的Kafka源代码分析-KafkaConsumer类代码分析

    目录: <Kafka Producer设计分析> <KafkaProducer类代码分析> <RecordAccumulator类代码分析> <Sender类 ...

  6. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  7. 2022-2028年中国自动驾驶系统行业现状调研分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...

  8. 2022-2028年中国阻尼涂料市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...

  9. 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...

最新文章

  1. android wear升级方法,LG G Watch官方工具包刷Android wear5.1.1教程(附刷机包)
  2. SQLCLR(五)聚合
  3. 基于DSP技术的多路语音实时采集与压缩处理系统
  4. cxGrid, 和AfterScroll
  5. composer install 时遇到 Composer\Downloader\TransportException ...
  6. long long c语言_带你打开C语言的大门之C语言的变量
  7. C 11实现的100行线程池
  8. RBSP、SODB、EBSP三者的区别和联系 SPS: sequence parameter sets
  9. 抖音上热门的小技巧,不看后悔
  10. MATLAB绘制图中图
  11. 服务器 显示w3wp.exe,w3wp.exe占用cpu过高的解决方法
  12. vivo平台sdk php说明书,vivo
  13. 【C++】加油站加油
  14. Uncaught SyntaxError: The requested module ‘/node_modules/.vite/vue.js?v=bd1817bb‘ does not provide
  15. word无法验证服务器,Win8系统打开office文件提示“无法验证此产品的许可证”如何解决...
  16. MATLAB进行不定积分和定积分的求解
  17. Navicat Premium 使用技巧
  18. 补偿matlab,Type-III 补偿参数计算的 matlab 脚本
  19. python代码练习,微信登入并生成头像大图
  20. 【修真院pm小课堂】登录注册的触发场景

热门文章

  1. 如何查看Windows 10的具体版本号?
  2. autocomplete触发事件_修改jQuery.autocomplete中遇到的键盘事件
  3. python安装matplotlib需要c编译_在Python 3.9上安装matplotlib提示需要FreeType更高版本的解决...
  4. c++ linux 获取毫秒_Linux下gettimeofday()函数和clock()函数:精确到毫秒级的时间
  5. dcp9030cdn定影_兄弟Brother DCP-9030CDN打印机驱动(修复DCP-9030CDN打印机连接故障)V1.0 正式版...
  6. 跨系统服务器data,oracle 新增pdb环境,并通过database link实现跨服务器在线克隆
  7. 奇瑞a3中控按键图解_实拍奇瑞全新瑞虎e 十万元级纯电SUV新选择
  8. mysql 实施索引_MySQL 索引实现
  9. 源服务器未能找到目标资源的表示或者不愿,java - 源服务器没有找到目标资源的当前表示,或者不愿意透露一个存在。 关于部署到 tomcat - 堆栈内存溢出...
  10. Java 算法 素数对猜想