tips:本文源码对应的kafka版本是2.3.1,源码分支2.3

分区重分配及相关源码分析

  • 简单示例
  • kafka消费者分区重分配
    • 消费者分区重分配简介
      • 分区重分配的策略
        • RangeAssignor
        • RoundRobinAssignor
        • StickyAssignor
        • 自定义Assignor
    • 消费者心跳线程
    • kafka消费者状态
    • 分区重分配触过程

简单示例

public class KafkaConsumerTest {public static void main(String[] args) {PropertyConfigurator.configure("/***/***/****/***/****/****/****/***/***/***/log4j.properties");Thread thread = new kafkaConsumerThread();thread.start();}public static class kafkaConsumerThread extends Thread {@Overridepublic void run() {Properties prop = new Properties();prop.put("bootstrap.servers", "127.0.0.1:9092");prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");prop.put("group.id","test");KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(prop);kafkaConsumer.subscribe(Collections.singleton("test"));while (true){ConsumerRecords<String, String> records =  kafkaConsumer.poll(Duration.ofMillis(100));records.forEach(one -> {System.out.println(one.offset()+"----"+one.value());//TODO  业务逻辑处理});}}}
}

kafka消费者分区重分配

消费者分区重分配简介

kafka不同于rabbitmq只能对master节点进行读写,为了支持更高的并发,其将读写权限分配到了集群中所有的broker上,内部则是采用了topic partition来实现。设置好主题的分区后,所有的分区会均匀的分配到不同的broker。每个分区还会有对应的副本分配到不同的broker上,这里的副本就和rabbitmq的队列镜像一样是为了备份、故障恢复。主备的同步逻辑也基本一致。
那么kafka引入主题分区后的消费情况如何呢?如下图所示kafka消费者启动后会分配partition并进行消息消费,group1此时有两个消费者实例,每个实例分配两个分区进行消费,group2则有三个消费者实例,consumer1分配了两个分区,其他两个消费者各分配了一个分区。我们在真实的业务中场景肯定有应用的发布与重启、水平扩容,如果此时group2中一个消费者实例下线,会导致原有分配到改消费者的partition停止消费从而产生堆积么?或者group1消费过慢导致消息堆积,水平扩容consumer3会由于没有分区分配而不能消费消息么?kafka作为业界最受欢迎的消息中间件之一,当然不会有这样明显的缺陷。

相信大家看了上面的示例也对分区重分配有了一定的了解。分区重分配就是当kafka的消费者或者是主题数发生变化时,按照当前分区数与消费者数量进行主题分区重新分配的过程。

分区重分配的策略

当分区重分配发生时,kafka需要一定的策略来对分区进行分配。我们可以在kafkaconsumer初始化时进行分配策略的配置。默认配置是RangeAssignor

//默认配置
org.apache.kafka.clients.consumer.ConsumerConfig#CONFIGstatic { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,Type.LIST,Collections.emptyList(),new ConfigDef.NonNullValidator(),Importance.HIGH,CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)..//我是省略号..define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,Type.LIST,Collections.singletonList(RangeAssignor.class),new ConfigDef.NonNullValidator(),Importance.MEDIUM,PARTITION_ASSIGNMENT_STRATEGY_DOC)..//我是省略号.}

kafka客户端提供了三种分配策略的实现,分别为RangeAssignor、RoundRobinAssignor以及StickyAssignor。相关类图如下所示:

RangeAssignor
   org.apache.kafka.clients.consumer.RangeAssignor#assign@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<>());for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();List<String> consumersForTopic = topicEntry.getValue();Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)continue;Collections.sort(consumersForTopic);//每个消费者能得到的分区数int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();// 需要额外分配一个分区的消费者数int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i = 0, n = consumersForTopic.size(); i < n; i++) {//一次性分配消费者的所有分区。int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));}}return assignment;}

range分区分配器对每个主题的可用分区数与消费者(字典序)进行排序。然后将分区数除以消费者数计算得到每个消费者分配的分区数,如果不可均分则前几个消费者将分配到一个额外的分区。例如Topic A有5个分区,消费组1有三个消费者,则产生的分配方案是

consumer1:0,1
consumer2:2,3
consumer3:4

这样分配的缺点是如果一个消费者消费了好几个topic都是这样的分区数,那么字典序靠前的消费者都会分配一些topic,导致部分消费者负载过高。例如三个consumer订阅了主题t0、t1,他们各有4个分区,根据range分配器的分配结果为

consumer1:t0p0,t0p1,t1p0,t1p1
consumer2:t0p2,t1p2
consumer3:t0p3,t1p3
RoundRobinAssignor
//org.apache.kafka.clients.consumer.RoundRobinAssignor#assign @Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<>());// consumer的遍历器CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {final String topic = partition.topic();//如果该消费者没有订阅该主题while (!subscriptions.get(assigner.peek()).topics().contains(topic))assigner.next();//循环分配主题分区assignment.get(assigner.next()).add(partition);}return assignment;}

循环分区分配器获取所有的主题分区数和消费者数后将所有主题分区进行排序,然后循环分配主题分区和消费者。这样就能够保证订阅多个topic的一组消费者之间分配的分区数差值不超过1。解决了RangeAssignor的不足。但是当消费者订阅的主题不一样时,同样存在着部分消费者负载过高的情况。例如当前有三个消费者consumer1-3与三个主题t0-2,主题的分区数分别为1,2,3,订阅关系则是consumer1订阅主题0,consumer2订阅主题t0,t1。consumer3订阅主题t0,t1,t2。从上述代码中可以看到分配时如果当前消费者没有订阅该主题则会跳过。那么产生的分配方案将会如下所示,也导致了部分消费者的高负载。这时我们可能会思考是不是在分配时去遍历消费者的分配情况,然后平衡一下,将t1p1分配到consumer2就OK了?想法是好的,那么最的StickyAssignor分配器能解决这个问题么?他又是怎么做的呢?

consumer1:t0p0
consumer2:t1p0,
consumer3:t1p1,t2p0,t2p1, t2p2
StickyAssignor
// org.apache.kafka.clients.consumer.StickyAssignor#assignpublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();Map<TopicPartition, ConsumerGenerationPair> prevAssignment = new HashMap<>();partitionMovements = new PartitionMovements();//当前分配方案与重分配之前的分配方案,黏性重分配的关键信息prepopulateCurrentAssignments(subscriptions, currentAssignment, prevAssignment);boolean isFreshAssignment = currentAssignment.isEmpty();// 主题分区对应的所有能消费改分区的consumer集合final Map<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>();// consumer对应的所有能消费的主题分区数集合final Map<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>();// 初始化partition2AllPotentialConsumers所有的主题分区keyfor (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {for (int i = 0; i < entry.getValue(); ++i)partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>());}//根据服务端组服务器返回的订阅信息来装填mapfor (Entry<String, Subscription> entry: subscriptions.entrySet()) {String consumer = entry.getKey();consumer2AllPotentialPartitions.put(consumer, new ArrayList<>());entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> {for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {TopicPartition topicPartition = new TopicPartition(topic, i);consumer2AllPotentialPartitions.get(consumer).add(topicPartition);partition2AllPotentialConsumers.get(topicPartition).add(consumer);}});if (!currentAssignment.containsKey(consumer))currentAssignment.put(consumer, new ArrayList<>());}// 当前分区分配Map<TopicPartition, String> currentPartitionConsumer = new HashMap<>();for (Map.Entry<String, List<TopicPartition>> entry: currentAssignment.entrySet())for (TopicPartition topicPartition: entry.getValue())currentPartitionConsumer.put(topicPartition, entry.getKey());List<TopicPartition> sortedPartitions = sortPartitions(currentAssignment, prevAssignment.keySet(), isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions);// 未分配的主题分区,如果存在之前的分区则尽量保留原有分配方案,视为已分配,并从该map移除List<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions);for (Iterator<Map.Entry<String, List<TopicPartition>>> it = currentAssignment.entrySet().iterator(); it.hasNext();) {Map.Entry<String, List<TopicPartition>> entry = it.next();if (!subscriptions.containsKey(entry.getKey())) {// if a consumer that existed before (and had some partition assignments) is now removed, remove it from currentAssignmentfor (TopicPartition topicPartition: entry.getValue())currentPartitionConsumer.remove(topicPartition);it.remove();} else {// otherwise (the consumer still exists)for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) {TopicPartition partition = partitionIter.next();if (!partition2AllPotentialConsumers.containsKey(partition)) {// if this topic partition of this consumer no longer exists remove it from currentAssignment of the consumerpartitionIter.remove();currentPartitionConsumer.remove(partition);} else if (!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {partitionIter.remove();} elseunassignedPartitions.remove(partition);}}}TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment));sortedCurrentSubscriptions.addAll(currentAssignment.keySet());//平衡分配balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions,consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer);return currentAssignment;}

StickyAssignor的设计主要是为了达成两个目标。其中目标一是主要目标。

  1. 保证分区的分配达到是最大可能的平衡。
  2. 分区重分配发生后尽可能的保留上一次的分配结果。
    对于第一点StickyAssignor是如何克服艰难险阻完成了RoundRobinAssignor的使命的呢?在上述StickyAssignor中附上了部分中文注解,不是很细致。由于这不是这篇文章重点,所以就不做详细解读了,欢迎评论区讨论。
    对于第二点,StickyAssignor在rebalance分配时,利用消费者组的generation(组协调器生成,随着重分配的次数递增)机制来区分历史分配结果,然后对比现在的订阅主题分区情况和消费者情况来做一个黏性分配。由于可以最大限度地保留上一次的分配结果,所以在重分配后可以通过自定义listener来大大减少缓存的清理、offset的提交与初始化等工作。伪代码如下

class TheNewRebalanceListener implements ConsumerRebalanceListener {Collection<TopicPartition> lastAssignment = Collections.emptyList();void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {commitOffsets(partition);}}//注意difference只清理新分配或移除的分区,原有逻辑是移除所有void onPartitionsAssigned(Collection<TopicPartition> assignment) {for (TopicPartition partition : difference(lastAssignment, assignment))cleanupState(partition);for (TopicPartition partition : difference(assignment, lastAssignment))initializeState(partition);for (TopicPartition partition : assignment)initializeOffset(partition);this.lastAssignment = assignment;}
}

然后配置消费者的重分配监听器即可。

consumer.subscribe(topics, new TheNewRebalanceListener());
自定义Assignor

kafka作为一个优秀的开源消息中间件,自然也留有拓展能满足大家各种各样的“无理”要求。我们只需要实现PartitionAssignor接口,然后在consumer配置实现类的全限定名即可。实际操作中继承抽象类AbstractPartitionAssignor是更好的选择。

消费者心跳线程

既然消费者在重启或者扩容时会引发消费者分区重分配,那broker端是如何知道有新的消费者加入或者老的消费者下线的呢?加入组协调器后kafka通过心跳来获悉消费者状态,放弃了之前zk的临时节点方案。而且后续的版本也将移除broker对于zk的依赖。kafka逐步去掉zk的依赖主要原因有2个:

  1. 运维方面:kafka作为消息中间件强依赖zk,增加运维复杂性,合理性也值得商榷。
  2. 性能方面:zk是强一致性的,kafka原数据同步慢,自身选举时间长且选举期间不可用。consumer位移数据存储在zk中,位移提交需要写zk,当消费者数量上去后,zk恐成为瓶颈。而且在分区重分配过程中,consumer下的node数据可能出现变化,导致同一时刻各消费者读到的数据不一致从而出现脑裂问题。

组协调器是如何通过心跳来完成消费者状态检查的呢?消费者初始化之后会开始拉取消息,拉取消息的时候进行原数据的检查时会通过startHeartbeatThreadIfNeeded方法启动心跳线程。

org.apache.kafka.clients.consumer.internals.AbstractCoordinator#startHeartbeatThreadIfNeededprivate synchronized void startHeartbeatThreadIfNeeded() {if (heartbeatThread == null) {heartbeatThread = new HeartbeatThread();heartbeatThread.start();}}

心跳线程源码如下,其有enabled与colsed两个重要参数,colsed用于退出消费线程,一般在消费者退出时会进行调用,包括启动失败时的退出。enabled用于线程是否继续发送心跳,当消费者处于非stable状态时,会设置enabled会false,并调用wait方法进入线程等待状态。直到消费者进组完成更改消费者状态。

//消费者入组成功private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {if (joinFuture == null) {..//我是省略号.joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {@Overridepublic void onSuccess(ByteBuffer value) {synchronized (AbstractCoordinator.this) {log.info("Successfully joined group with generation {}", generation.generationId);//重置消费者状态,并启用心跳线程。state = MemberState.STABLE;rejoinNeeded = false;if (heartbeatThread != null)heartbeatThread.enable();}}..//我是省略号.});}return joinFuture;}

接下来是一系列判断来确定消费者是否还存活,主要是通过heartbeat对象内的各种时间以及配置来对比。heartbeat则是初始化kafka consumer时创建的,主要参数有

  1. ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG
  2. ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
  3. ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG

首先判断session是否超时,检查当前时间是否大于session的超时时间,每次收到heart beat后更新session超时时间为当前时间加上配置参数SESSION_TIMEOUT_MS_CONFIG。消息拉取时间以及心跳线程的超时时间同理。这里需要注意的是如果心跳间隔时间大于session超时时间则每下一次心跳将会直接触发session超时。具体代码及注释如下。

//消费者线程
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThreadprivate class HeartbeatThread extends KafkaThread implements AutoCloseable {private boolean enabled = false;private boolean closed = false;private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);private HeartbeatThread() {super(HEARTBEAT_THREAD_PREFIX + (groupId.isEmpty() ? "" : " | " + groupId), true);}//重新启用心跳线程,并唤起线程public void enable() {synchronized (AbstractCoordinator.this) {log.debug("Enabling heartbeat thread");this.enabled = true;heartbeat.resetTimeouts();AbstractCoordinator.this.notify();}}public void disable() {synchronized (AbstractCoordinator.this) {log.debug("Disabling heartbeat thread");this.enabled = false;}}..//我是省略号.@Overridepublic void run() {try {log.debug("Heartbeat thread started");while (true) {synchronized (AbstractCoordinator.this) {if (closed)return;//enable为false时,调用wait方法,暂停心跳请求if (!enabled) {AbstractCoordinator.this.wait();continue;}//判断消费者状态if (state != MemberState.STABLE) {disable();continue;}client.pollNoWakeup();long now = time.milliseconds();if (coordinatorUnknown()) {//消费者不可用if (findCoordinatorFuture != null || lookupCoordinator().failed())AbstractCoordinator.this.wait(retryBackoffMs);} else if (heartbeat.sessionTimeoutExpired(now)) {//session超时,每次收到heartbeat返回后会重置。markCoordinatorUnknown();} else if (heartbeat.pollTimeoutExpired(now)) {//拉取消息超时,两次间隔未收到返回,可能导致消费者下线log.warn("This member will leave the group because consumer poll timeout has expired. This " +"means the time between subsequent calls to poll() was longer than the configured " +"max.poll.interval.ms, which typically implies that the poll loop is spending too " +"much time processing messages. You can address this either by increasing " +"max.poll.interval.ms or by reducing the maximum size of batches returned in poll() " +"with max.poll.records.");maybeLeaveGroup();} else if (!heartbeat.shouldHeartbeat(now)) {// 判断是否到了心跳时间,时间间隔是否达到配置AbstractCoordinator.this.wait(retryBackoffMs);} else {heartbeat.sentHeartbeat(now);sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {@Overridepublic void onSuccess(Void value) {synchronized (AbstractCoordinator.this) {heartbeat.receiveHeartbeat();}}@Overridepublic void onFailure(RuntimeException e) {synchronized (AbstractCoordinator.this) {if (e instanceof RebalanceInProgressException) {heartbeat.receiveHeartbeat();} else if (e instanceof FencedInstanceIdException) {log.error("Caught fenced group.instance.id {} error in heartbeat thread", groupInstanceId);heartbeatThread.failed.set(e);heartbeatThread.disable();} else {heartbeat.failHeartbeat();// wake up the thread if it's sleeping to reschedule the heartbeatAbstractCoordinator.this.notify();}}}});}}}} catch (AuthenticationException e) {..//我是省略号.} finally {log.debug("Heartbeat thread has closed");}}}

HeartbeatThread经过一系列校验后便可发送心跳请求了,在sendHeartbeatRequest()方法中组装了一个HeartbeatResponseHandler用来处理心跳请求的返回。

    // 发送心跳请求并添加futuresynchronized RequestFuture<Void> sendHeartbeatRequest() {log.debug("Sending Heartbeat request to coordinator {}", coordinator);HeartbeatRequest.Builder requestBuilder =new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId(groupId).setMemberId(this.generation.memberId).setGroupInstanceId(this.groupInstanceId.orElse(null)).setGenerationId(this.generation.generationId));return client.send(coordinator, requestBuilder).compose(new HeartbeatResponseHandler());}

在HeartbeatResponseHandler中,主要是对于心跳请求返回状态码的一些判断。主要的情况有消费者不可用、重平衡、generation等。其中generation代数,可以直接理解为消费者分区重分配的次数,主要是为了区分offset的提交。从一定程度上防止消息丢失或者重复消费的情况。

   private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {@Overridepublic void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {sensors.heartbeatLatency.record(response.requestLatencyMs());Errors error = heartbeatResponse.error();if (error == Errors.NONE) {log.debug("Received successful Heartbeat response");future.complete(null);} else if (error == Errors.COORDINATOR_NOT_AVAILABLE|| error == Errors.NOT_COORDINATOR) {log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid.",coordinator());markCoordinatorUnknown();future.raise(error);} else if (error == Errors.REBALANCE_IN_PROGRESS) {log.info("Attempt to heartbeat failed since group is rebalancing");requestRejoin();future.raise(Errors.REBALANCE_IN_PROGRESS);} else if (error == Errors.ILLEGAL_GENERATION) {log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);resetGeneration();future.raise(Errors.ILLEGAL_GENERATION);} else if (error == Errors.FENCED_INSTANCE_ID) {log.error("Received fatal exception: group.instance.id gets fenced");future.raise(error);} else if (error == Errors.UNKNOWN_MEMBER_ID) {log.info("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);resetGeneration();future.raise(Errors.UNKNOWN_MEMBER_ID);} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {future.raise(new GroupAuthorizationException(groupId));} else {future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));}}}

kafka消费者状态

上一节心跳线程中有提到过消费者的状态若不是stable则会停用心跳线程,那么消费正的状态共有哪些,又是如何转换的呢?
在协调器中的内部类MemberState中我们可以看到协调器的三种状态,分别是未注册、重分配、稳定状态。

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.MemberStateprivate enum MemberState {UNJOINED,    // the client is not part of a groupREBALANCING, // the client has begun rebalancingSTABLE,      // the client has joined and is sending heartbeats}

上述消费端的三种状态的转换如下图所示

对于kafka服务端的组则有五种状态Empth、PreparingRebalance、CompletingRebalance、Stable、Dead。他们的状态转换如下图所示。

分区重分配触过程

对于正常的业务使用场景中常见的情况就是重启或者扩容导致的分区重分配。这里以新增消费者为例,假设当前存在consumer1与consumer2订阅了主题t0,t0有三个分区,分区器是RangeAssignor,则当前分配情况是

consumer1:t0p0,t0p1
consumer2:t0p2

新起消费者consumer3后,服务端的组协调器接收到到新的memberId后对所有的消费者心跳线程返回重分配的错误码。并将消费者协调器中的rejoinNeeded置为true。消费者则会在下一次拉取消息时触发入组请求。具体代码及注释如下

  boolean joinGroupIfNeeded(final Timer timer) {//读取心跳线程设置的标志位while (rejoinNeededOrPending()) {if (!ensureCoordinatorReady(timer)) {return false;}if (needsJoinPrepare) {onJoinPrepare(generation.generationId, generation.memberId);needsJoinPrepare = false;}//发送入组请求final RequestFuture<ByteBuffer> future = initiateJoinGroup();..//我是省略号.}return true;}

sendJoinGroupRequest方法中,添加了JoinGroupResponseHandler来处理请求的返回

org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendJoinGroupRequest

JoinGroupResponseHandler的核心代码和注释如下,有两个重要点是保存消费组的版本信息generation,如果成为消费着组分配的leader消费者,则还需要负责生成分区分配规则。

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.JoinGroupResponseHandlerprivate class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {@Overridepublic void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {Errors error = joinResponse.error();if (error == Errors.NONE) {log.debug("Received successful JoinGroup response: {}", joinResponse);sensors.joinLatency.record(response.requestLatencyMs());synchronized (AbstractCoordinator.this) {if (state != MemberState.REBALANCING) {future.raise(new UnjoinedGroupException());} else {//保存消费端分配的generationAbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),joinResponse.data().memberId(), joinResponse.data().protocolName());if (joinResponse.isLeader()) {//leader消费者onJoinLeader(joinResponse).chain(future);} else {onJoinFollower().chain(future);}}}} ..//我是省略号.}}

leader consumer源码及注释如下,主要负责调用配置的分区分配器生成分配规则,以及将分配规则同步到服务端。

private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {try {// 生成分配规则Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),joinResponse.data().members());..//我是省略号.//将分配规则同步到服务端return sendSyncGroupRequest(requestBuilder);} catch (RuntimeException e) {return RequestFuture.failure(e);}}

Follower则从服务端拉取leader上传的分配规则。

   private RequestFuture<ByteBuffer> onJoinFollower() {..//我是省略号.//拉取分配规则return sendSyncGroupRequest(requestBuilder);}

leader与follower都同步完成后joinGroupIfNeeded方法则会将消费者状态置为stable并开启心跳线程,源码见上述initiateJoinGroup代码片。

-------接下来我们如何运用当前知识来解决实际问题呢,未完待续。。。

kafka-分区重分配及相关源码分析相关推荐

  1. 【kafka】Kafka中的动态配置源码分析

    1.概述 2.源码分析 Broker启动加载动态配置 KafkaServer.startup 启动加载动态配置总流程 2.1 动态配置初始化 config.dynamicConfig.initiali ...

  2. 【Android 电量优化】JobScheduler 相关源码分析 ( JobSchedulerService 源码分析 | 任务检查 | 任务执行 )

    文章目录 一.回调 StateChangedListener 接口 二.JobHandler 处理 ( 任务检查 ) 三.maybeRunPendingJobsH 方法 四.assignJobsToC ...

  3. Ansroid系统(262)---MTK安卓sim卡相关源码分析

    MTK安卓sim卡相关源码分析 原文地址:http://m.blog.csdn.net/article/details?id=50039589 最近由于一个sim卡相关的需求,就去了解了一下Andro ...

  4. freeRTOS滴答时钟相关源码分析

    最近学习白问网韦东山老师在B站开源的freeRTOS课程,网址:韦东山直播公开课:RTOS实战项目之实现多任务系统 第1节:裸机程序框架和缺陷_哔哩哔哩_bilibili和7天物联网训练营[第2期]7 ...

  5. 【kafka】kafka Producer Metadata概述及源码分析

    1.概述 转载:Kafka源码阅读(二):Producer Metadata概述及源码分析 2.Metadata 什么是metadata? metadata指Kafka集群的元数据,包含了Kafka集 ...

  6. 【Android 电量优化】JobScheduler 相关源码分析 ( ConnectivityController 底层源码分析 | 构造函数 | 追踪任务更新 | 注册接收者监听连接变化 )

    文章目录 一.ConnectivityController 连接控制器引入 二.ConnectivityController 构造方法解析 ( 注册接收者 ) 三.mConnectivityRecei ...

  7. 【Android 电量优化】JobScheduler 相关源码分析 ( JobSchedulerService 源码分析 | Android 源码在线网址推荐 )

    文章目录 一.JobScheduler 提交任务 schedule 方法源码分析 二.schedule(JobInfo job, int uId) 方法 三.scheduleAsPackage 方法 ...

  8. 详述 Spring MVC 启动流程及相关源码分析

    文章目录 Web 应用部署初始化过程(Web Application Deployement) Spring MVC 启动过程 Listener 的初始化过程 Filter 的初始化 Servlet ...

  9. Vue中的methods配置项中的箭头函数this指向及相关源码分析

    之前在使用Vue时遇到一个问题,我们知道在Vue的methods中定义函数时,要想使用到Vue实例或者组件实例的this时,我们就不能使用箭头函数定义方法,因为箭头函数中的this是在函数定义时生成的 ...

最新文章

  1. 一个技术总监的忠告:精通那么多技术有毛用啊,你还不是不被重用?
  2. 引擎讲解2--主要是MyISAM和InnoDB的区别
  3. 基于TableStore的海量气象格点数据解决方案实战
  4. android dip转px
  5. 1081. Rational Sum (20) -最大公约数
  6. 计算机网络 --- 数据交换方式
  7. 和整数相乘_小学数学基础概念归纳总结:整数篇
  8. 0514JS操作document对象、事件、(this)
  9. 循环增加li id_循环老化对于锂离子电池中锂和电解液分布的影响
  10. iphone:解析html的第三库hpple初试
  11. 无法定位程序输入点 InitializeCriticalSectionEx 于动态链接库 Kernel32.dll 上 问题解决方法
  12. a标签下载pdf文档
  13. apple pay扫银联二维码原理
  14. EasyRecovery2022真正不收费的数据恢复软件
  15. html 图片查看 ie7,兼容ie6跟ie7 的16进制码流在html中显示为图片代码(base64)
  16. JavaScript入门学习指南
  17. 我学习python的体会
  18. 《C#高级编程第6版》 读书笔记 (张迅雷闪击C#系列)
  19. 自然语言处理(NLP)在医疗领域的应用
  20. Linux学习路线浅谈

热门文章

  1. python如何画3个相切的圆_使用python绘制4个相切的圆形
  2. word-spacing无效
  3. iQOONeo6SE和iQOONeo5SE区别 哪个好 iQOONeo6SE和iQOONeo5SE哪个值得买 两者配置对比
  4. 医学统计学中RR、OR和HR三个关于比值的概念
  5. Scratch3.0----数据类型
  6. 嵌入式面试准备一---USART、IIC、SPI、CAN
  7. 《信息物理融合系统(CPS)设计、建模与仿真——基于 Ptolemy II 平台》——1.9 案例研究...
  8. Windows CMD 检擦电脑是否被入侵[简单办法]
  9. 第三方支付频频被罚款,市场驱动下或是故意为之?
  10. Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-reso