kafka源码分析-consumer的分区策略

  • 1、AbstractPartitionAssignor
  • 2、RangeAssignor
  • 3、RoundRobinAssignor
  • 4、StickyAssignor策略

本文源码是kafka 2.0.1

1、AbstractPartitionAssignor

consumer有三种分区策略,分别是RangeAssignor、RoundRobinAssignor和StickyAssignor,这三个策略都继承了AbstractPartitionAssignor,实现了其assign方法。该方法有两个参数:

  • partitionsPerTopic-每个topic的分区数量
  • subscriptions-每个 consumerId 与其所订阅的 topic 列表的关系,可以理解成每个consumer可能被分配到的topic
/**
* Perform the group assignment given the partition counts and member subscriptions
* @param partitionsPerTopic The number of partitions for each subscribed topic. Topics not in metadata will be excluded
*                           from this map.
* @param subscriptions Map from the memberId to their respective topic subscription
* @return Map from each member to the list of partitions assigned to them.
*/
public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions);

2、RangeAssignor

先看代码实现

public class RangeAssignor extends AbstractPartitionAssignor {@Overridepublic String name() {return "range";}private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {Map<String, List<String>> res = new HashMap<>();for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {String consumerId = subscriptionEntry.getKey();for (String topic : subscriptionEntry.getValue().topics())put(res, topic, consumerId);}return res;}// partitionsPerTopic-每个topic的分区数量// subscriptions-每个 consumerId 与其所订阅的 topic 列表的关系@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {// 将subscriptions转换成key为topic,value为consumerId的mapMap<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);// 存储rebalace方案的数据结构Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<TopicPartition>());// 遍历consumersPerTopicfor (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();// 和这个topic相关的consumer列表List<String> consumersForTopic = topicEntry.getValue();// 该topic的分区数Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)continue;Collections.sort(consumersForTopic);// 取商int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();// 取余数int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();// 【以下关键分配步骤】// 生成TopicPartition列表List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i = 0, n = consumersForTopic.size(); i < n; i++) {// 本consumer分配到的初始位置int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);// 长度int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);// 当consumersWithExtraPartition 不是0时,优先给前面的consumer多分配一个partition,能整除部分各consumer均分assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));}}return assignment;}}

重点关注源码的【关键分配步骤】,该步骤是将所有topic的分区平均分配给每个consumer,对于不能平均分配的分配给前consumersWithExtraPartition个consumer,也就是前consumersWithExtraPartition个consumer分配到的分区数会比后面的多一个。
【例1】
假设有一个topic有7个partition,有三个consumer都订阅了该topic,那么通过RangeAssignor的分配方案为:

  • consumer0:start=0,length=3,分配到的partition为:p0、p1、p2
  • consumer1:start=3,length=2,分配到的partition为:p3、p4
  • consumer2:start=5,length=2,分配到的partition为:p5、p6

【例2】
在看一个多topic分配的例子,假设consumer0订阅了topic0、topic1、topic2,consumer1订阅了topic0、topic1,consumer2订阅了topic2,topic0、topic1、topic2分别有3、2、1个partition,那么分配的结果是:

consumer 分配到的partition
consumer0 t0p0、t0p1、t1p0、t2p0
consumer1 t0p2、t1p1
consumer2

可以看到分配结果并不均匀,甚至有consumer闲置。

3、RoundRobinAssignor

先看源码

public class RoundRobinAssignor extends AbstractPartitionAssignor {@Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {// 存储数据结构Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<TopicPartition>());// 【关键分配步骤】// 将consumer排序并生成环状迭代器CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));// 遍历所有的partitionfor (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {final String topic = partition.topic();// 找到订阅这个partition对应topic的消费者while (!subscriptions.get(assigner.peek()).topics().contains(topic))// 返回迭代器当前位置的元素,并将迭代器计数+1assigner.next();// 将partition分配给消费者assignment.get(assigner.next()).add(partition);}return assignment;}// 获取所有的partition,并且同一个topic的partition是相连的public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {SortedSet<String> topics = new TreeSet<>();for (Subscription subscription : subscriptions.values())topics.addAll(subscription.topics());List<TopicPartition> allPartitions = new ArrayList<>();for (String topic : topics) {Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic != null)allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));}return allPartitions;}@Overridepublic String name() {return "roundrobin";}}

重点关注【关键分配步骤】它的分配规则是遍历所有的topic-partition,对每个consumer以环状的形式进行分配,从当前位置往后找到第一个可以分配的consumer,将当前partition分配给找到的consumer,并将位置+1,继续下一个partition的分配。
看2中的【例2】使用RoundRobinAssignor的分配方案

分配轮次 consumer0 consumer1 consumer2
1 t0p0 t0p1
2 t0p2 t1p0
3 t1p1 t2p0

可以看到相比于RangeAssignor分配更加均匀,但是这种方式并不能完全解决平均分配的问题,看一下下面的例子
【例3】
假设consumer0订阅了topic0、topic1、topic2,consumer1订阅了topic0、topic1,consumer2订阅了topic2,topic0、topic1、topic2分别有4、3、2个partition,那么分配的结果是:

consumer 分配到的partition
consumer0 t0p0、t0p2、t1p0、t1p2、t2p1
consumer1 t0p1、t0p3、t1p1
consumer2 t2p0

可以看到将t2p1分配给consumer2的话,分配结果更加平均。

4、StickyAssignor策略

sticky-粘性的,非常形象的名字,StickyAssignor从0.11版本才开始引入的,主要有两个目的

  • 目的1:分区的分配要尽可能均匀
  • 目的2:分区的分配要尽可能与上次分配的保持相同

目的1和目的2冲突时,目的1优于目的2。
StickyAssignor的代码较多,先看下面的流程图

以3中的【例3】为例来看一下StickyAssignor策略的分配结果

consumer 分配到的partition
consumer0 t0p0、t0p2、t1p0、t1p2
consumer1 t0p1、t0p3、t1p1
consumer2 t2p0、t2p1

相比于RoundRobinAssignor,分配结果达到均衡。假设新加入了consumer3,其订阅了topic0,那么再看一下再均衡的过程:
(1)所有partition的排序结果:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p1、t2p0、t2p1
(2)初始的consumer排序:consumer3、consumer2、consumer1、consumer0
(3)开始再均衡,显然t0p0是可以再分配的,重新分配后的结果及consuemr顺序是:

consumer(排序) 分配到的partition
consumer3 t0p0
consumer2 t2p0、t2p1
consumer0 t0p2、t1p0、t1p2
consumer1 t0p1、t0p3、t1p1

继续处理t0p1,显然也可以再分配,由consumer1调整到consumer3

consumer(排序) 分配到的partition
consumer2 t2p0、t2p1
consumer3 t0p0、t0p1
consumer1 t0p3、t1p1
consumer0 t0p2、t1p0、t1p2

(4)完成再分配过程,返回结果
流程图中省略了很多细节,相关内容可见下面源码分析:

public class StickyAssignor extends AbstractPartitionAssignor {private static final Logger log = LoggerFactory.getLogger(StickyAssignor.class);// these schemas are used for preserving consumer's previously assigned partitions// list and sending it as user data to the leader during a rebalanceprivate static final String TOPIC_PARTITIONS_KEY_NAME = "previous_assignment";private static final String TOPIC_KEY_NAME = "topic";private static final String PARTITIONS_KEY_NAME = "partitions";private static final Schema TOPIC_ASSIGNMENT = new Schema(new Field(TOPIC_KEY_NAME, Type.STRING),new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));private static final Schema STICKY_ASSIGNOR_USER_DATA = new Schema(new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT)));private List<TopicPartition> memberAssignment = null;private PartitionMovements partitionMovements;public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();partitionMovements = new PartitionMovements();// 先构建出当前的分配状态:currentAssignment// 这个方法里用的userData是自定义信息,但是是怎么用的??prepopulateCurrentAssignments(subscriptions, currentAssignment);// 判断是否是全新的分配,true-是boolean isFreshAssignment = currentAssignment.isEmpty();// 记录partirion可以分配给哪些consumerfinal Map<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>();// 记录consumer能够被分配到哪些partitionfinal Map<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>();// 初始化partition2AllPotentialConsumers,value是空Listfor (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {for (int i = 0; i < entry.getValue(); ++i)partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<String>());}// 遍历subscriptionsfor (Entry<String, Subscription> entry: subscriptions.entrySet()) {String consumer = entry.getKey();// 初始化consumer2AllPotentialPartitions的每个consumerconsumer2AllPotentialPartitions.put(consumer, new ArrayList<TopicPartition>());for (String topic: entry.getValue().topics()) {for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {TopicPartition topicPartition = new TopicPartition(topic, i);// 将这个consumer订阅的topic的所有partition存入consumer2AllPotentialPartitionsconsumer2AllPotentialPartitions.get(consumer).add(topicPartition);// 将这个topic的所有partition都记录上consumerpartition2AllPotentialConsumers.get(topicPartition).add(consumer);}}// 当前consumer的分配方案不存在if (!currentAssignment.containsKey(consumer))// 初始化当前consumer的分配存储结构currentAssignment.put(consumer, new ArrayList<TopicPartition>());}// 以上完成partition2AllPotentialConsumers和consumer2AllPotentialPartitions的初始化// 记录当前分配方案中的partition和consumer的关系Map<TopicPartition, String> currentPartitionConsumer = new HashMap<>();for (Map.Entry<String, List<TopicPartition>> entry: currentAssignment.entrySet())for (TopicPartition topicPartition: entry.getValue())currentPartitionConsumer.put(topicPartition, entry.getKey());// 对有效分区进行排序,以便它们在潜在的重新分配阶段以适当的顺序进行处理,从而使消费者之间的分区移动最小(因此尊重最大粘性)// 详细解析见下面sortPartitions的源码List<TopicPartition> sortedPartitions = sortPartitions(currentAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions);// 将排序后的partition存到unassignedPartitionsList<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions);// 遍历当前的分配方案,进行删除处理for (Iterator<Map.Entry<String, List<TopicPartition>>> it = currentAssignment.entrySet().iterator(); it.hasNext();) {Map.Entry<String, List<TopicPartition>> entry = it.next();if (!subscriptions.containsKey(entry.getKey())) {// 之前的consumer不在存在,删除该consumer订阅的所有partition,并从当前分配方案中删除该consumerfor (TopicPartition topicPartition: entry.getValue())currentPartitionConsumer.remove(topicPartition);it.remove();} else {// otherwise (the consumer still exists)for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) {TopicPartition partition = partitionIter.next();if (!partition2AllPotentialConsumers.containsKey(partition)) {// 这个partition不再存在,从当前分配方案删除partiton,并从currentPartitionConsumer删除partitionpartitionIter.remove();currentPartitionConsumer.remove(partition);} else if (!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {// 该consumer不再订阅该partition对应的topic,从当前分配方案删除partitonpartitionIter.remove();} else// 该partition已经被分配过,从未分配的partition中删除partitionunassignedPartitions.remove(partition);}}}// at this point we have preserved all valid topic partition to consumer assignments and removed// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions// to consumers so that the topic partition assignments are as balanced as possible.// 根据已经分配给消费者的主题分区的数量,对消费者进行升序排序TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment));sortedCurrentSubscriptions.addAll(currentAssignment.keySet());balance(currentAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions,consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer);return currentAssignment;}// 省略一些方法
}
/*** Sort valid partitions so they are processed in the potential reassignment phase in the proper order* that causes minimal partition movement among consumers (hence honoring maximal stickiness)** @param currentAssignment the calculated assignment so far* @param isFreshAssignment whether this is a new assignment, or a reassignment of an existing one* @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers* @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions they can consumer from* @return sorted list of valid partitions*/
private List<TopicPartition> sortPartitions(Map<String, List<TopicPartition>> currentAssignment,boolean isFreshAssignment,Map<TopicPartition, List<String>> partition2AllPotentialConsumers,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) {List<TopicPartition> sortedPartitions = new ArrayList<>();if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions)) {// 如果不是新的分配,且每个consumer所可能分配到的partition都是一样的// 复制一份当前的分配方案Map<String, List<TopicPartition>> assignments = deepCopy(currentAssignment);// 将不属于当前consumer的partition从当前分配方案中删除for (Entry<String, List<TopicPartition>> entry: assignments.entrySet()) {List<TopicPartition> toRemove = new ArrayList<>();for (TopicPartition partition: entry.getValue())if (!partition2AllPotentialConsumers.keySet().contains(partition))toRemove.add(partition);for (TopicPartition partition: toRemove)entry.getValue().remove(partition);}// SubscriptionComparator是根据两个key对应value的长度进行比较,长度相同根据key进行字符串排序TreeSet<String> sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments));sortedConsumers.addAll(assignments.keySet());// 将partition进行排序,是按consumer的顺序倒序取出,也就是分配到更多partition的consumerwhile (!sortedConsumers.isEmpty()) {String consumer = sortedConsumers.pollLast();List<TopicPartition> remainingPartitions = assignments.get(consumer);if (!remainingPartitions.isEmpty()) {sortedPartitions.add(remainingPartitions.remove(0));sortedConsumers.add(consumer);}}// 将不属于consumer的partition放入到排序结果中for (TopicPartition partition: partition2AllPotentialConsumers.keySet()) {if (!sortedPartitions.contains(partition))sortedPartitions.add(partition);}} else {// PartitionComparator是根据两个key对应value的长度进行比较,如果长度相同则根据key进行字符串排序,在相同就根据topic的partition数排序TreeSet<TopicPartition> sortedAllPartitions = new TreeSet<>(new PartitionComparator(partition2AllPotentialConsumers));sortedAllPartitions.addAll(partition2AllPotentialConsumers.keySet());// 顺序取出partition进行排序,越少可以被consumer分配到的partition越在前面while (!sortedAllPartitions.isEmpty())sortedPartitions.add(sortedAllPartitions.pollFirst());}return sortedPartitions;
}
/*** Balance the current assignment using the data structures created in the assign(...) method above.*/
private void balance(Map<String, List<TopicPartition>> currentAssignment,List<TopicPartition> sortedPartitions,List<TopicPartition> unassignedPartitions,TreeSet<String> sortedCurrentSubscriptions,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions,Map<TopicPartition, List<String>> partition2AllPotentialConsumers,Map<TopicPartition, String> currentPartitionConsumer) {// 是否是初始化分配,如果最大数量partition的consumer所分配到的partition都是空的,则是新的分配boolean initializing = currentAssignment.get(sortedCurrentSubscriptions.last()).isEmpty();boolean reassignmentPerformed = false;// 分配还未分配的partition// 分配给最少partition的consumer,并重新将该consumer放入sortedCurrentSubscriptions,为了重新将consumer排序for (TopicPartition partition: unassignedPartitions) {// skip if there is no potential consumer for the partitionif (partition2AllPotentialConsumers.get(partition).isEmpty())continue;assignPartition(partition, sortedCurrentSubscriptions, currentAssignment,consumer2AllPotentialPartitions, currentPartitionConsumer);}// fixedPartitions记录只能被分配给唯一consumer的partition,并将这部分partition从sortedPartitions中移除// 到目前为止所有的所有的partition已经被分配了,剔除掉不可能被重新分配的partition// fixedPartitions在后面并没有什么用途,单纯的中间变量Set<TopicPartition> fixedPartitions = new HashSet<>();for (TopicPartition partition: partition2AllPotentialConsumers.keySet())if (!canParticipateInReassignment(partition, partition2AllPotentialConsumers))fixedPartitions.add(partition);sortedPartitions.removeAll(fixedPartitions);// 将已经分配结束的consumer从sortedCurrentSubscriptions删除,并将其对应的partition存入fixedAssignmentsMap<String, List<TopicPartition>> fixedAssignments = new HashMap<>();for (String consumer: consumer2AllPotentialPartitions.keySet())// canParticipateInReassignment:consumer是否可以被重新分配,true-可以// 1、没满:满即所有可能分配给这个consumer的partition已全部分配给该consumer,返回true// 2、满了,但是该consumer的partition有任一partition可以被分配给多个consumer,返回trueif (!canParticipateInReassignment(consumer, currentAssignment,consumer2AllPotentialPartitions, partition2AllPotentialConsumers)) {sortedCurrentSubscriptions.remove(consumer);fixedAssignments.put(consumer, currentAssignment.remove(consumer));}// create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later// 备份Map<String, List<TopicPartition>> preBalanceAssignment = deepCopy(currentAssignment);Map<TopicPartition, String> preBalancePartitionConsumers = new HashMap<>(currentPartitionConsumer);// reassignmentPerformed-true,意味着被重新分配过reassignmentPerformed = performReassignments(sortedPartitions, currentAssignment, sortedCurrentSubscriptions,consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer);// getBalanceScore-计算平衡分数,分数越小越好,分数是正的// 如何计算:遍历所有consumer的partition数和其他consumer的partition数相减的绝对值累加,已经遍历过的consumer要删除(即后面的partition计算时不会在用到前面已处理过的consumer)if (!initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment)) {// 重新分配的方案不好,重置回原来的方案deepCopy(preBalanceAssignment, currentAssignment);currentPartitionConsumer.clear();currentPartitionConsumer.putAll(preBalancePartitionConsumers);}// 将前面不需要重新分配的consumer重新放到当前分配方案和排序中for (Entry<String, List<TopicPartition>> entry: fixedAssignments.entrySet()) {String consumer = entry.getKey();currentAssignment.put(consumer, entry.getValue());sortedCurrentSubscriptions.add(consumer);}fixedAssignments.clear();
}
private boolean performReassignments(List<TopicPartition> reassignablePartitions,Map<String, List<TopicPartition>> currentAssignment,TreeSet<String> sortedCurrentSubscriptions,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions,Map<TopicPartition, List<String>> partition2AllPotentialConsumers,Map<TopicPartition, String> currentPartitionConsumer) {boolean reassignmentPerformed = false;boolean modified;// repeat reassignment until no partition can be moved to improve the balancedo {modified = false;// reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)// until the full list is processed or a balance is achieved// 重新分配所有可重新分配的分区(从潜在使用者最少的分区开始,如果需要),或者达到平衡Iterator<TopicPartition> partitionIterator = reassignablePartitions.iterator();// isBalanced-判断当前方案已经平衡// 1、当前分配方案分配到最少partition的consumer的数量大于等于最多的数量-1,返回ture// 2、数量比较少的consumer,没有其他consumer的partition有可能分配给该consumer,返回truewhile (partitionIterator.hasNext() && !isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions)) {TopicPartition partition = partitionIterator.next();// the partition must have at least two consumersif (partition2AllPotentialConsumers.get(partition).size() <= 1)log.error("Expected more than one potential consumer for partition '" + partition + "'");// the partition must have a current consumerString consumer = currentPartitionConsumer.get(partition);if (consumer == null)log.error("Expected partition '" + partition + "' to be assigned to a consumer");// check if a better-suited consumer exist for the partition; if so, reassign itfor (String otherConsumer: partition2AllPotentialConsumers.get(partition)) {if (currentAssignment.get(consumer).size() > currentAssignment.get(otherConsumer).size() + 1) {// 参照下面源码解析reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions);reassignmentPerformed = true;modified = true;break;}}}} while (modified);return reassignmentPerformed;
}
private void reassignPartition(TopicPartition partition,Map<String, List<TopicPartition>> currentAssignment,TreeSet<String> sortedCurrentSubscriptions,Map<TopicPartition, String> currentPartitionConsumer,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) {String consumer = currentPartitionConsumer.get(partition);// sortedCurrentSubscriptions是按consumer已分配的partition数量升序,所以找到第一可以分配到该partition的consumer就是新的可分配consumerString newConsumer = null;for (String anotherConsumer: sortedCurrentSubscriptions) {if (consumer2AllPotentialPartitions.get(anotherConsumer).contains(partition)) {newConsumer = anotherConsumer;break;}}assert newConsumer != null;// 找到准确的需要被重新分配的partition,为了粘性分配,详情参考下面getTheActualPartitionToBeMoved源码TopicPartition partitionToBeMoved = partitionMovements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer);// 将需要移动的partition进行重新分配// 1、将新旧consumer从排序结果中删除,为了重新存入排序// 2、更新partitionMovements和partitionMovementsByTopic// 3、更新新旧consumer的分配方案和currentPartitionConsumer// 4、对新旧consumer的分配结果重新排序processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer);return;
}
// partitionMovementsByTopic-存储partition上一次的调整关系(由srcConsumer调整到destConsumer)
// partitionMovementsForThisTopic-存储topic维度下ConsumerPair(新旧consumer)调整的partition
private TopicPartition getTheActualPartitionToBeMoved(TopicPartition partition, String oldConsumer, String newConsumer) {String topic = partition.topic();// 该topic的所有partition从未被调整过,直接返回该partitionif (!partitionMovementsByTopic.containsKey(topic))return partition;// 该partition被调整过if (partitionMovements.containsKey(partition)) {// this partition has previously moved// 这次调整的旧consumer一定是上次调整的新consumerassert oldConsumer.equals(partitionMovements.get(partition).dstMemberId);// 旧consumer赋值为上次调整的旧consumer// 这是为了下面更大限度保证粘性,比如partiiton0上次调整是A->C,这次是C->B,但是存在一个partiiton1是从B调整到A的,// 经过这个赋值,将会用partition1代替partition0进行调整,这样B趋于平衡,A趋于不平衡,这样下次调整就有可能将A调整到C的partiiton调整会A// 这个设计比较绕oldConsumer = partitionMovements.get(partition).srcMemberId;}Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);// 新旧consumer的“反consumer对”ConsumerPair reversePair = new ConsumerPair(newConsumer, oldConsumer);// 不存在“反consumer对”,直接返回该partitionif (!partitionMovementsForThisTopic.containsKey(reversePair))return partition;// 返回该“反consumer对”之前调整的一个partition,为了满足StickyAssignor的目的2,尽可能保证分配和上次相同// 举个例子,比如这次要将partition0从consumerA调整到consumerB,但是在这之前曾将partition1从consumerB调整到consumerA,那么需要用partiiton1代替partiiton0// 这时候partiion0和partition1都是属于consumerA的,调整partiion0和partition1都可以使分配方案趋于平衡,但是调整partition1更符合粘性策略return partitionMovementsForThisTopic.get(reversePair).iterator().next();}

本文主要对kafka的三种分区策略进行源码分析,RangeAssignor、RoundRobinAssignor都不能保证完全的均衡分配,StickyAssignor虽然实现复杂,但是相比于其他两种分配策略,均衡效果更好,而且可以减少不必要的分区调整。

kafka源码分析-consumer的分区策略相关推荐

  1. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  2. Kafka 源码分析之网络层(二)

    上一篇介绍了概述和网络层模型实现<Kafka 源码分析之网络层(一)>,本编主要介绍在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送. PS:丰富的 ...

  3. apache kafka源码分析-Producer分析---转载

    原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读 1.Kafka提供了Producer类作为java producer的api,此类有几种发 ...

  4. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  5. s-sgdisk源码分析 “--set-alignment=value分区对齐参数”

    文章目录 边界对齐子命令使用 源码分析 sgdisk.cc main函数入口 gptcl.cc DoOptions解析并执行具体命令函数 gpt.cc CreatePartition创建分区函数,设置 ...

  6. Kafka 源码分析之网络层(一)

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据.小编会给大家带来几期 Kafka 相关的源码分析文章.这一系列文章是基于kafka 0.9.1版本,今天 ...

  7. Android进阶——ExoPlayer源码分析之宽带预测策略的算法详解

    前言 由于国内基础设施非常优秀,在平时的开发中,很少会关注网络情况,很容易忽略弱网情况下的网络状况,如果项目属于国外App,则需要考虑到当前的基础设施和网络情况,特别是播放视频的时候,需要通过动态调整 ...

  8. kafka源码分析(二)Metadata的数据结构与读取、更新策略

    一.基本思路 异步发送的基本思路就是:send的时候,KafkaProducer把消息放到本地的消息队列RecordAccumulator,然后一个后台线程Sender不断循环,把消息发给Kafka集 ...

  9. kafka源码分析之副本管理-ReplicaManager

    原文地址:https://blog.csdn.net/u014393917/article/details/52043040 ReplicaManager 说明,此组件用于管理kafka中各parti ...

最新文章

  1. Datawhale组队学习周报(第013周)
  2. org.springframework.security.web.util.TextEscapeUtils
  3. python单例模式解析_Python下简易的单例模式详解
  4. CSDN-markdown编辑器使用说明
  5. Qt之线程同步(生产者消费者模式 - QWaitCondition)
  6. linux shell 命令执行结果,如何通过程序执行shell命令并获取命令执行结果?
  7. 数据可视:让科幻走向现实
  8. java基本数据类型_资深大厂Java程序员,由浅入深Java学习资料,高清视频
  9. STL源码剖析(侯杰)——读书笔记
  10. HTML5开发实战之网易微博
  11. 关于计算机ps读后感,ps心得体会4篇
  12. Java集合,泛型,枚举详解
  13. c语言float m1 m2什么意思,M0、M1、M2的涵义及其作用
  14. 2.Collection、Iterator迭代器、泛型、斗地主案例
  15. Could not retrieve mirrorlist
  16. Improvement of AUTO sampling statistics gathering feature in Oracle 11g
  17. 入行数据分析要知道什么是统计
  18. 谷歌广告联盟电汇收款指南
  19. iqc工作职责和工作内容_iqc是什么意思 iqc的工作职责是什么
  20. mac MAMP使用

热门文章

  1. XDOJ.T81_字符串查找
  2. 操作系统之上下文切换
  3. 聚美优品张川:如何搭建秒杀场景下的运维架构
  4. EN 13967防水用柔性薄板—CE认证
  5. 国产蓝牙耳机有什么品牌好用?元旦值得入手高性价比蓝牙耳机推荐
  6. 【李宏毅机器学习笔记】 17、迁移学习(Transfer Learning)
  7. 阿里云轻量应用服务器最新价格表2023版(2核8G/4核8G/4核16G/8核16G/8核32G)
  8. stringbuilder调用tostring常量池_彻底弄懂java中的常量池
  9. idea日常使用快捷键汇总
  10. 1504: ZZ的橱柜