一 触发rebalance的时机

# 有新的消费者加入

# 有消费者宕机或者下线

# 消费者主动退出消费者组

# 消费者组订阅的topic出现分区数量变化

# 消费者调用unsubscrible取消对某topic的订阅

二 rebalance过程分析

2.1 查找GroupCoordinator

在poll数据的时候,首先会调用ConsumerCoordinator#poll查找GroupCoordinator

# 调器事件的轮询。这确保了协调器是已知的,并且是消费者加入了这个群组(如果它使用的是组管理)。这也可以处理周期性的offset提交,如果启用了它们。

public void poll(long now) {
    // 调用offset 提交请求的回调函数,如果有
   
invokeCompletedOffsetCommitCallbacks();

// 判断订阅状态是否自动分配分区且coordinator不为空
   
if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
        // 确保coordinator已经准备好接收请求了
       
ensureCoordinatorReady();
        now = time.milliseconds();
    }
    // 判断是否需要重新join
   
if (needRejoin()) {
        // 由于初始元数据获取和初始重新平衡之间的竞争条件,我们需要确保元数据在开始加入之前是新鲜的。
        // 确保了我们在join之前至少有一次与集群的主题的模式匹配
       
if (subscriptions.hasPatternSubscription())
            // 刷新元数据
           
client.ensureFreshMetadata();
        // 确保协调器可用,心跳线程开启和组可用
       
ensureActiveGroup();
        now = time.milliseconds();
    }
    // 检测心跳线程的状态
   
pollHeartbeat(now);
    // 异步的自动提交offset
   
maybeAutoCommitOffsetsAsync(now);
}

# 判断GroupCoordinator是否存在且可连接

public boolean coordinatorUnknown() {return coordinator() == null;
}
protected synchronized Node coordinator() {if (coordinator != null && client.connectionFailed(coordinator)) {coordinatorDead();return null;}return this.coordinator;
}

# 如果存在则确认GroupCoordinator是否已经准备好了

// 判断对于这个组的coordinatior是否已经准备好接受请求,否则一直阻塞
public synchronized void ensureCoordinatorReady() {while (coordinatorUnknown()) {// 找GroupCoordinator,并返回一个请求结果RequestFuture<Void> future = lookupCoordinator();client.poll(future);// 异常处理if (future.failed()) {if (future.isRetriable())// 阻塞更新metadata中的集群元数据client.awaitMetadataUpdate();elsethrow future.exception();} else if (coordinator != null && client.connectionFailed(coordinator)) {// 如果连接不上GroupCoordinator,则退避一段时间,然后重试coordinatorDead();time.sleep(retryBackoffMs);}}
}

# 它会找GroupCoordinator,发送GroupCoordinatorRequest请求,并返回一个FutureRequest

protected synchronized RequestFuture<Void> lookupCoordinator() {if (findCoordinatorFuture == null) {// 查找集群负载最低的Node节点Node= this.client.leastLoadedNode();// 如果找到了,则调用sendGroupCoordinatorRequestif (node == null) {return RequestFuture.noBrokersAvailable();} elsefindCoordinatorFuture = sendGroupCoordinatorRequest(node);}return findCoordinatorFuture;
}
private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {// 创建GroupCoordinatorRequest请求log.debug("Sending coordinator request for group {} to broker {}", groupId, node);GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);// ConsumerNetworkClient将创建的GroupCoordinatorRequest请求放入一个unsent列表,等待发送// 并返回RequestFuture对象,返回的RequestFuture对象经过compose的适配return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest).compose(new GroupCoordinatorResponseHandler());
}

这里对返回的RequestFuture做了一些拦截处理:

private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {@Overridepublic void onSuccess(ClientResponse resp, RequestFuture<Void> future) {log.debug("Received group coordinator response {}", resp);// 创建GroupCoordinatorResponse对象GroupCoordinatorResponse= new GroupCoordinatorResponse(resp.responseBody());Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());clearFindCoordinatorFuture();if (error == Errors.NONE) {// 如果没有错误synchronized (AbstractCoordinator.this) {AbstractCoordinator.this.coordinator = new Node(// 构建一个Node对象赋给coordinatorInteger.MAX_VALUE - groupCoordinatorResponse.node().id(),groupCoordinatorResponse.node().host(),groupCoordinatorResponse.node().port());log.info("Discovered coordinator {} for group {}.", coordinator, groupId);client.tryConnect(coordinator);// 开始尝试和coordinator建立连接heartbeat.resetTimeouts(time.milliseconds());}future.complete(null);// 将正常收到的GroupCoordinatorResponse的事件传播出去} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {// 否则传播异常出去future.raise(new GroupAuthorizationException(groupId));} else {log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message());future.raise(error);}}@Overridepublic void onFailure(RuntimeException e, RequestFuture<Void> future) {clearFindCoordinatorFuture();super.onFailure(e, future);}
}

这个阶段的大致流程如下:

2.2 进入Join Group 阶段

当成功找到GroupCoordinator之后,则进入Join Group阶段,此阶段消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

我们先看一下JoinGroupRequest和 JoinGroupResponse的消息体格式:

JoinGroupRequest:

group_id: 消费者组的id

session_timeout: GroupCoordinator超过session_time指定时间没有收到心跳,认为消费者下线

member_id: GroupCoordinator分配给消费者的id

protocol_type: 消费者组实现的协议

group_protocols: 包含此消费者全部支持的PartitionAssignor类型

protocol_name:PartitionAssignor名字

protocol_metadata: 针对不同的PartitionAssignor,序列化后的消费者的订阅信息,包含用户自定义数据userData

JoinGroupResponse:

error_code: 错误码

generation_id: GroupCoordinator分配给Generation的id

group_protocol: GroupCoordinator选择的PartitionAssignor

leader_id: Leader的member_id

member_id: GroupCoordinator分配给消费者的id

members: 消费者组中所有消费者的订阅信息

member_metadata: 对应消费者的订阅信息

JoinGroupRequest大致的请求流程如下图所示:

详细的流程和源码如下:

1 消费者订阅模式是否是自动订阅(AUTO_TOPICS || AUTO_PATTERN),因为USER_ASSIGNED用户指定主题订阅是不需要进行rebalance操作的, 如果满足条件,确保coordinator已经准备好接收请求了

if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {// 确保coordinator已经准备好接收请求了ensureCoordinatorReady();now = time.milliseconds();
}

2 判断是否需要重新join

public boolean needRejoin() {// 如果不是自动分配partition,不如用户自己指定,则不需要rejoinif (!subscriptions.partitionsAutoAssigned())return false;// 如果我们执行了任务并且元数据改变了,我们需要重新加入if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))return true;// 如果我们的订阅自上一次join之后改变了,那么我们需要加入if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))return true;// 默认需要重新joinreturn super.needRejoin();
}

3 如果需要重新join,且属于AUTO_PATTERN订阅模式,则需要刷新Metadata; 然后确保协调器可用,心跳线程开启

if (needRejoin()) {// 由于初始元数据获取和初始重新平衡之间的竞争条件,我们需要确保元数据在开始加入之前是新鲜的。// 确保了我们在join之前至少有一次与集群的主题的模式匹配if (subscriptions.hasPatternSubscription())// 刷新元数据client.ensureFreshMetadata();// 确保协调器可用,心跳线程开启和组可用ensureActiveGroup();now = time.milliseconds();
}
public void ensureActiveGroup() {// 确保coordinator已经准备好接收请求ensureCoordinatorReady();// 开启心跳线程startHeartbeatThreadIfNeeded();// 加入组joinGroupIfNeeded();
}

4 是否需要做一些Join前的准备工作,如果需要则调用OnJoinPrepare方法

# 如果开启了自动提交,则在rebalance之前进行自动提交offset

# 执行注册在SubscriptionState上的ConsumerRebalanceListener的回调方法

# 准备工作完成之后,把needsJoinPrepare置为fasle

if (needsJoinPrepare) {// 进行发送JoinGroupRequest之前的准备onJoinPrepare(generation.generationId, generation.memberId);//  needsJoinPrepare置为false,表示已经准备好了needsJoinPrepare = false;
}
protected void onJoinPrepare(int generation, String memberId) {// 如果开启了自动提交,则在rebalance之前进行自动提交offsetmaybeAutoCommitOffsetsSync();// 执行注册在SubscriptionState上的ConsumerRebalanceListener的回调方法ConsumerRebalanceListener listener = subscriptions.listener();log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);try {Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());listener.onPartitionsRevoked(revoked);} catch (WakeupException e) {throw e;} catch (Exception e) {log.error("User provided listener {} for group {} failed on partition revocation",listener.getClass().getName(), groupId, e);}isLeader = false;// 重置该组的订阅,只包含该用户订阅的主题。subscriptions.resetGroupSubscription();
}

5 停止心跳线程,构造JoinGroupRequest,更改客户端状态为rebalance,

并且发送JoinGroupRequest,针对返回的RequestFuture,添加监听器,如果成功,则更改客户端状态为stable,并且开启心跳线程

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {// 我们存储join future,以防止我们在开始rebalance之后,就被用户唤醒// 这确保了我们不会错误地尝试在尚未完成的再平衡完成之前重新加入。if (joinFuture == null) {// 对心跳线程进行明确的隔离,这样它就不会干扰到连接组。// 注意,这个后必须调用onJoinPrepare因为我们必须能够继续发送心跳,如果回调需要一些时间。disableHeartbeatThread();// 更改状态state = MemberState.REBALANCING;// 发送JoinGroupRequest,返回RequestFuture对象joinFuture = sendJoinGroupRequest();// 针对RequestFuture添加监听器joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {@Overridepublic void onSuccess(ByteBuffer value) {// 如果成功,则更新状态为消费者客户端已成功加入synchronized (AbstractCoordinator.this) {log.info("Successfully joined group {} with generation {}", groupId, generation.generationId);state = MemberState.STABLE;// 然后开始心跳检测if (heartbeatThread != null)heartbeatThread.enable();}}@Overridepublic void onFailure(RuntimeException e) {synchronized (AbstractCoordinator.this) {state = MemberState.UNJOINED;}}});}return joinFuture;
}
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {// 检测coordinator是否可用if (coordinatorUnknown())return RequestFuture.coordinatorNotAvailable();// 创建JoinGroupRequestlog.info("(Re-)joining group {}", groupId);JoinGroupRequest request = new JoinGroupRequest(groupId,this.sessionTimeoutMs,this.rebalanceTimeoutMs,this.generation.memberId,protocolType(),metadata());log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);// 将这个请求放入unsent集合,等待被发送,并返回一个RequestFuture对象return client.send(coordinator, ApiKeys.JOIN_GROUP, request).compose(new JoinGroupResponseHandler());
}

6 阻塞等待Join Group的完成

client.poll(future);

7 重置JoinGroup的RequestFuture

resetJoinGroupFuture();

8 如果请求成功,则重置needsJoinPrepare状态,并且调用onJoinComplete做一些扫尾的工作
if (future.succeeded()){
    needsJoinPrepare = true;
    onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
}

protected void onJoinComplete(int generation, String memberId,String assignmentStrategy, ByteBuffer assignmentBuffer) {// 只有leader才会监控元数据的改变if (!isLeader)assignmentSnapshot = null;// 获取partition分配策略PartitionAssignor assignor = lookupAssignor(assignmentStrategy);if (assignor == null)throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);// 获取分配结果Assignment= ConsumerProtocol.deserializeAssignment(assignmentBuffer);// 设置标记刷新最近一次提交的offsetsubscriptions.needRefreshCommits();// 更新分区的分配subscriptions.assignFromSubscribed(assignment.partitions());// 给一个分区分配策略一个机会更新分配结果assignor.onAssignment(assignment);// reschedule the auto commit starting from nowthis.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;// 执行rebalance之后的回调ConsumerRebalanceListener listener = subscriptions.listener();log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);try {Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());listener.onPartitionsAssigned(assigned);} catch (WakeupException e) {throw e;} catch (Exception e) {log.error("User provided listener {} for group {} failed on partition assignment",listener.getClass().getName(), groupId, e);}
}

JoinGroupResponse返回的时候,我们会调用JoinGroupResponseHand

Ler#handle()方法

public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {// 获取响应是否有错误Errors error = Errors.forCode(joinResponse.errorCode());// 如果没有错误if (error == Errors.NONE) {log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());sensors.joinLatency.record(response.requestLatencyMs());synchronized (AbstractCoordinator.this) {// 如果客户端状态不是REBALANCINGif (state != MemberState.REBALANCING) {// 把这个异常广播出去future.raise(new UnjoinedGroupException());} else {// 解析JoinGroupResponseAbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),joinResponse.memberId(), joinResponse.groupProtocol());// 标记rejoinNeeded为falseAbstractCoordinator.this.rejoinNeeded = false;// 判断响应的节点是不是leader,如果是leader则进入onJoinLeader方法// 否则进入onJoinFollower方法if (joinResponse.isLeader()) {onJoinLeader(joinResponse).chain(future);} else {onJoinFollower().chain(future);}}}} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,coordinator());// backoff and retryfuture.raise(error);} else if (error == Errors.UNKNOWN_MEMBER_ID) {// reset the member id and retry immediatelyresetGeneration();log.debug("Attempt to join group {} failed due to unknown member id.", groupId);future.raise(Errors.UNKNOWN_MEMBER_ID);} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {// re-discover the coordinator and retry with backoffcoordinatorDead();log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message());future.raise(error);} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL|| error == Errors.INVALID_SESSION_TIMEOUT|| error == Errors.INVALID_GROUP_ID) {// log the error and re-throw the exceptionlog.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());future.raise(error);} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {future.raise(new GroupAuthorizationException(groupId));} else {// unexpected error, throw the exceptionfuture.raise(new KafkaException("Unexpected error in join group response: " + error.message()));}}
}

1 解析JoinGroupResponse,获取memberId,generationId,和protocol等信息

2 判断响应的节点是不是leader,如果是leader则进入onJoinLeader方法,否则进入onJoinFollower方法

onLeaderJoin: 根据JoinGroupResponse的分组信息,然后leader开始进行分区的分配;构造SyncGroupRequest并发送

onFollowerJoin: 不负责分配分区,只是发送同步请求

构造SyncGroupRequest并发送

private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {try {// 执行leader同步,并且根据JoinGroupResponse的分组信息,然后leader开始进行分区的分配Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),joinResponse.members());// 构造SyncGroupRequest并发送SyncGroupRequest request = new SyncGroupRequest(groupId, generation.generationId, generation.memberId, groupAssignment);log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);return sendSyncGroupRequest(request);} catch (RuntimeException e) {return RequestFuture.failure(e);}
}
protected Map<String, ByteBuffer> performAssignment(String leaderId,String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions) {// 查找分区分配使用的PartitionAssignorPartitionAssignor assignor = lookupAssignor(assignmentStrategy);if (assignor == null)throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);Set<String> allSubscribedTopics = new HashSet<>();Map<String, Subscription> subscriptions = new HashMap<>();// 根据JoinGroupResponse的分组信息(group_protocols)对Subscription和topic分类汇总for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {Subscription= ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());subscriptions.put(subscriptionEntry.getKey(), subscription);allSubscribedTopics.addAll(subscription.topics());}// the leader will begin watching for changes to any of the topics the group is interested in,// which ensures that all metadata changes will eventually be seen// leader会获取所有组内所有消费者的订阅的topicthis.subscriptions.groupSubscribe(allSubscribedTopics);metadata.setTopics(this.subscriptions.groupSubscription());// update metadata (if needed) and keep track of the metadata used for assignment so that// we can check after rebalance completion whether anything has changed// 更新metadataclient.ensureFreshMetadata();isLeader = true;// 记录快照assignmentSnapshot = metadataSnapshot;log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",groupId, assignor.name(), subscriptions);// 进行分区分配Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);log.debug("Finished assignment for group {}: {}", groupId, assignment);// 分区分配结果序列化并保存到到groupAssignment中,返回分区结果Map<String, ByteBuffer> groupAssignment = new HashMap<>();for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());groupAssignment.put(assignmentEntry.getKey(), buffer);}return groupAssignment;
}
private RequestFuture<ByteBuffer> onJoinFollower() {// send follower's sync group with an empty assignmentSyncGroupRequest request = new SyncGroupRequest(groupId, generation.generationId,generation.memberId, Collections.<String, ByteBuffer>emptyMap());log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);return sendSyncGroupRequest(request);
}

2.3 开始Synchronizing Group State阶段

完成分区分配后,就进入SynchronizingGroup State阶段,主要就是向GroupCoordinator发送SyncGroupRequest请求并处理SyncGroup

Response响应

我们先看一下SyncGroupRequest和SyncGroupResponse的消息体格式:

SyncGroupRequest:

group_id: 消费者组的id

generation_id: 消费者保存到generation信息

member_id: GroupCoordinator分配给消费者的id

member_assignment: 分区分配结果

SyncGroupResponse:

error_code: 错误码

member_assignment:分配给当前消费者的分区

在前面onLeaderJoin方法,我们知道在获取分区分配结果之后,Leader将其封装成SyncGroupRequest, 但是我们看到了onFollowerJoin方法里也封装了SyncGroupRequest,但是这里边的分区分配结果是空的

然后调用ConsumerNetworkClient#send方法将请求放入unsent请求等待集合,等待被发送

对于SyncGroupResponse:解析SyncGroupResponse,并且传播分区分配结果

public void handle(SyncGroupResponse syncResponse,RequestFuture<ByteBuffer> future) {Errors error = Errors.forCode(syncResponse.errorCode());if (error == Errors.NONE) { // 如果没有错误sensors.syncLatency.record(response.requestLatencyMs());// 传播分区分配结果future.complete(syncResponse.memberAssignment());} else {requestRejoin();//将rejoinNeeded置为trueif (error == Errors.GROUP_AUTHORIZATION_FAILED) {future.raise(new GroupAuthorizationException(groupId));} else if (error == Errors.REBALANCE_IN_PROGRESS) {log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId);future.raise(error);} else if (error == Errors.UNKNOWN_MEMBER_ID|| error == Errors.ILLEGAL_GENERATION) {log.debug("SyncGroup for group {} failed due to {}", groupId, error);resetGeneration();future.raise(error);} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {log.debug("SyncGroup for group {} failed due to {}", groupId, error);coordinatorDead();future.raise(error);} else {future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));}}
}

从SyncGroupResponse得到的分区分配结果最终由ConsumerCoordina

tor#onJoinComplete()方法处理

protected void onJoinComplete(int generation, String memberId,String assignmentStrategy, ByteBuffer assignmentBuffer) {// 只有leader才会监控元数据的改变if (!isLeader)assignmentSnapshot = null;// 获取partition分配策略PartitionAssignor assignor = lookupAssignor(assignmentStrategy);if (assignor == null)throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);// 通过反序列化,获取分配结果Assignment= ConsumerProtocol.deserializeAssignment(assignmentBuffer);// 设置标记刷新最近一次提交的offsetsubscriptions.needRefreshCommits();// 根据分配的分区,更新分区的分配subscriptions.assignFromSubscribed(assignment.partitions());// 当消费者组的成员接收leader的分区分配的一个回调函数assignor.onAssignment(assignment);// reschedule the auto commit starting from nowthis.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;// 执行rebalance之后的回调ConsumerRebalanceListener listener = subscriptions.listener();log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);try {Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());listener.onPartitionsAssigned(assigned);} catch (WakeupException e) {throw e;} catch (Exception e) {log.error("User provided listener {} for group {} failed on partition assignment",listener.getClass().getName(), groupId, e);}
}

我们在这里分析的是加入组的情况,其实当消费者离开组的时候,也会触发rebalance操作。

消费者rebalance机制分析相关推荐

  1. java每隔 消费队列数据_消费者Rebalance机制

    本文深入的分析了RocketMQ的Rebalance机制,主要包括以下内容:Rebalance必要的元数据信息的维护 Broker协调通知机制: 消费者/启动/运行时/停止时Rebalance触发时机 ...

  2. kafka消费者Rebalance机制

    目录 1.Rebalance机制 2.消费者Rebalance分区分配策略 3.Rebalance过程 1.Rebalance机制 rebalance就是说如果消费组里的消费者数量有变化或消费的分区数 ...

  3. 深入理解RocketMQ Rebalance机制

    本文深入的分析了RocketMQ的Rebalance机制,主要包括以下内容: Rebalance必要的元数据信息的维护 Broker协调通知机制: 消费者/启动/运行时/停止时Rebalance触发时 ...

  4. RocketMQ(五)-消费者启动机制、Rebalance机制

    消费者启动机制 DefaultMQPullConsumer 核心属性 核心方法 DefaultMQPushConsumer 消费者启动流程 DefaultMQPullConsumerImpl启动流程 ...

  5. 聊聊 Kafka: Consumer 源码解析之 Rebalance 机制

    一.前言 我们上一篇分析了 Consumer 如何加入 Consumer Group,其实上一篇是一个很宏观的东西,主要讲 ConsumerCoordinator 怎么与 GroupCoordinat ...

  6. java消费者模式_基于Java 生产者消费者模式(详细分析)

    生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗.虽然它们任务不同,但处理的资源是相 ...

  7. Kafka的rebalance机制

    这是针对Kafka的消费者的机制,以下场景将发生rebalance: 消费者组新增消费实例或者有消费实例退出group: group消费超时: group订阅的topic个数发生变化: group订阅 ...

  8. RocketMQ(八)——Rebalance机制介绍

    Rebalance机制 前提:集群消费模式 介绍: Rebalance指的是:将下一个Topic的多个Queue在同一个Consumer Group中的多个Consumer间进行重新分配的过程 该机制 ...

  9. 【Kafka】Kafka的Rebalance机制可能造成的影响及解决方案

    一.kafka的rebalance机制 在Kafka中,当有新消费者加入或者订阅的Topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个 ...

最新文章

  1. 潘建伟团队又创世界纪录!实现500公里量级现场光纤量子通信
  2. JMM内存模型如何为并发保驾护航
  3. docker 容器安装conposer_docker学习笔记(二)docker-composer
  4. 拉杰尔安卓服务器注册上限,拉结尔多开养小号刷副本 用多多云手机离线能升级...
  5. C# winform 上传文件 (多种方案)
  6. 虚拟资源拳王公社:什么都不会做什么副业赚钱?最容易上手的兼职副业是什么
  7. 原来在首席架构眼里MySQL果然如此不一样!
  8. The name `AssetDatabase' does not exist in the current context
  9. 百度云盘在线解析不限速下载网盘网站源码
  10. 企业微信群:机器人实现定时提醒功能
  11. 这十大推广引流渠道,你做了几个?(二)
  12. 用二维数组进行学生的成绩排序和计算
  13. Kafka源码-发送器Sender类型的的sendProducerData 模版方法
  14. 喜报|Authing 入选 CNCF Landscape 云原生技术图谱
  15. 常见的浏览器有哪些?其核心分别是什么 ?
  16. 肖申克的救赎 经典语录
  17. ImageConverter引起的 invalid address or address of corrupt block 0xb7feab58 passed to dlfree
  18. java编译器:必须对其进行捕获或声明以便抛出
  19. sanf()、kbhit()、getch()获取键盘信息与peekMessage()获取鼠标信息
  20. Linux详细安装教程(Centos)

热门文章

  1. java方法中与参数怎么调用_与Java方法调用中的类型参数有关的问题
  2. Zookeeper集群详解
  3. OpenCV计算机视觉实战(Python版)_002图像基本操作
  4. 抽象类java启动线程_java 线程复习笔记
  5. ifconfig没有ip地址_没有宽带也可以实现全家上网
  6. java识别音调_你如何创建一个音调发生器,其音调可以在java中“实时”或动态地操作?...
  7. Java 蓝桥杯 芯片测试
  8. 主从切换_Mysql 复制如何进行主从库切换(计划内)
  9. pandas dataframe绘制并保存图像
  10. 带有哨兵的双向循环链表