Kafka生产者和消费者分区策略部分源码解析
之前我在看其他的博客时,发现对于kafka consumer的RoundRobin的缺点分析中,有两种观点,一种认为缺点在于如果消费者组中消费者消费的主题不同,或者消费者线程数不同,那么会造成消费者消费分区数目的倾斜;另一种观点认为缺点在于消费者会消费到不属于自己主题的内容,所以这篇文章就是在这种背景下写的,如果有写错,还请指正。
PS:我认为第一种观点是对的,具体看后续的源码解析,不知道第二种观点是写错了,还是老版本源码如此。
文章目录
- 生产者分区策略分析
- 消费者分区策略解析
生产者分区策略分析
消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成
1)分区的原因
(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以Partition为单位读写了。
2)分区的原则
我们需要将producer发送的数据封装成一个ProducerRecord对象。
ProducerRecord类有如下的构造函数
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
ProducerRecord(String topic, Integer partition, K key, V value)
ProducerRecord(String topic, K key, V value)
ProducerRecord(String topic, V value)
1、指明partition的情况下,直接将指明的值作为partition值;
2、没有指明partition但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
3、既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin(轮询)算法;(默认)
以下是DefaultPartitioner类源,我们也可以模仿他实现Partition接口实现我们自己的分区器:
package org.apache.kafka.clients.producer.internals;/*** The default partitioning strategy:默认的分区策略:如果给定了分区,使用他如果没有分区但是有个key,就是就根据key的hash值取分区如果分区和key值都没有,就采样轮询* <ul>* <li>If a partition is specified in the record, use it* <li>If no partition is specified but a key is present choose a partition based on a hash of the key* <li>If no partition or key is present choose a partition in a round-robin fashion*/
public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}public int partition(String topic, // 主题Object key, // 给定的keybyte[] keyBytes, // key序列化后的值Object value, // 要放入的值byte[] valueBytes, // 序列化后的值Cluster cluster) { // 当前集群List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 对应主题的分区数int numPartitions = partitions.size();// 如果key为nullif (keyBytes == null) {// 获取主题轮询的下一个partition值,但还没取模int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {// 把上面的partition值取模得到真正的分区值int part = Utils.toPositive(nextValue) % availablePartitions.size();// 得到对应的分区return availablePartitions.get(part).partition();} else {// 没有分区// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// 输入了key值,直接对key的hash值取模就可以得到分区号了return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}// 自增return counter.getAndIncrement();}public void close() {}
}
消费者分区策略解析
一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。
Kafka 有三种分配策略,一是 RoundRobin,一是 Range , 最后一个是Sticky(新版本才有)。
触发分区策略条件:
- 同一个 Consumer Group 内新增消费者;
- 订阅的主题新增分区;(没有减少分区)
- 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes。
触发时机:消费者组里个数发生变化时。(包括消费者启动时和发生改变时)
1) RoundRobin
把所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
具体案例:
假如有3个Topic :T0(三个分区P0-0,P0-1,P0-2),T1(两个分区P1-0,P1-1),T2(四个分区P2-0,P2-1,P2-2,P2-3)
有三个消费者:C0(订阅了T0,T1),C1(订阅了T1,T2),C2(订阅了T0,T2)
那么分区过程如下所示:
轮询关注的是组
分区将会按照一定的顺序(hashcode排序)排列起来,消费者将会组成一个环状的结构,然后开始轮询。
结果可能是这样的:
C0: P0-0,P0-2,P1-1
C1:P1-0,P2-0,P2-2
C2:P0-1,P2-1,P2-3
优点:
多个消费者之间消息条数差距在1以内;(前提是消费者组中消费者消费主题相同,且不同消费者的消费线程数要相同)
缺点:
如果消费者组中,消费者订阅的主题不同,可能会出现一个消费者消费多个分区,而其他消费者消费分区很少的情况。
//举例:如果消费者组中消费者消费主题不同:
比如有3个消费者
C0, C1, C2
3个主题,3个主题分别有1个、2个和3个分区
t0, t1, t2,
那么得到的主题分区关系如下
t0p0, t1p0, t1p1, t2p0, t2p1, t2p2.假设:
C0 订阅 t0;
C1 订阅 t0, t1;
C2 订阅 t0, t1, t2.最终分配结果如下:
C0: [t0p0]
C1: [t1p0]
C2: [t1p1, t2p0, t2p1, t2p2]所以这种分配方式的问题在于,如果消费者之间订阅的主题不相同时,则会造成资源分配不均衡。//RoundRobin分区部分源码如下:public class RoundRobinAssignor extends AbstractPartitionAssignor {public RoundRobinAssignor() {}public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {Map<String, List<TopicPartition>> assignment = new HashMap();//订阅的主题;Iterator var4 = subscriptions.keySet().iterator();while(var4.hasNext()) {String memberId = (String)var4.next();assignment.put(memberId, new ArrayList());}//将消费者集合进行排序,构建一个消费者环, 内部通过索引位置+1对总数取余的方式实现的环;CircularIterator<String> assigner = new CircularIterator(Utils.sorted(subscriptions.keySet()));Iterator var9 = this.allPartitionsSorted(partitionsPerTopic, subscriptions).iterator();while(var9.hasNext()) {TopicPartition partition = (TopicPartition)var9.next();//当前主题;String topic = partition.topic();//这里循环遍历看看消费者有没有订阅该topic,否则一直next到下一个消费者,主要的作用是跳过;//没有订阅该主题的消费者;while(!((Subscription)subscriptions.get(assigner.peek())).topics().contains(topic)) {assigner.next();}//为当前消费者添加分区信息;((List)assignment.get(assigner.next())).add(partition);}return assignment;}......
场景:所以应该在当前消费者组订阅的topic相同的情况下时使用;
2)Range(默认策略)
范围分区策略是对每个 topic 而言的,只关注单个的消费者
首先对同一个 topic 里面的分区按照序号进行排序,并对消费者(不是消费者组)按照字母顺序进行排序。通过 partitions数/consumer数,来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。
range跟组没什么关系,只给订阅了的消费者发,而不是给订阅了的消费者组发
缺点:随着主题数的增多,不同消费者消费分区的数量差距可能会越来越大;(一个主题差1个,多个主题就会差很多了)
场景:不同消费者订阅的topic不同;
注意:在这种分区策略下:同一消费者组中消费者个数是可以大于分区数的,但是这样会产生闲置的consumer;
3)Sticky,这种分配策略是在kafka的0.11.X版本才开始引入的,是目前最复杂也是最优秀的分配策略。
它的设计主要实现了两个目的:
- 分区的分配要尽可能的均匀;(分配给这些消费者的主题分区的数目尽可能的小)
- 分区的分配尽可能的与上次分配的保持相同。(当分区再均衡发生时,它保留了尽可能多的现有赋值。)
如果这两个目的发生了冲突,优先实现第一个目的。
添加内容:
//这里就是Sticky策略中,判断是否为最佳分区策略的那部分核心源码,我还没看完整体,后续复习完补上,可以先看看;
private boolean canParticipateInReassignment(TopicPartition partition,Map<TopicPartition, List<String>> partition2AllPotentialConsumers) {// if a partition has two or more potential consumers it is subject to reassignment.return partition2AllPotentialConsumers.get(partition).size() >= 2;}private boolean canParticipateInReassignment(String consumer,Map<String, List<TopicPartition>> currentAssignment,Map<String, List<TopicPartition>> consumer2AllPotentialPartitions,Map<TopicPartition, List<String>> partition2AllPotentialConsumers) {List<TopicPartition> currentPartitions = currentAssignment.get(consumer);int currentAssignmentSize = currentPartitions.size();int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size();if (currentAssignmentSize > maxAssignmentSize)log.error("The consumer " + consumer + " is assigned more partitions than the maximum possible.");if (currentAssignmentSize < maxAssignmentSize)// if a consumer is not assigned all its potential partitions it is subject to reassignmentreturn true;for (TopicPartition partition: currentPartitions)// if any of the partitions assigned to a consumer is subject to reassignment the consumer itself// is subject to reassignmentif (canParticipateInReassignment(partition, partition2AllPotentialConsumers))return true;return false;}
我们举例进行分析:
目的1:
3个消费者
C0
, C1
, C2
3个主题
t0
, t1
, t2
3个主题分别有1个、2个和3个分区,得到结果如下
t0p0
, t1p0
, t1p1
, t2p0
,t2p1
, t2p2
假设:
C0
订阅 t0
C1
订阅 t0
, t1
C2
订阅 t0
, t1
, t2
按照RoundRobin的分配结果如下:
C0: [t0p0]
C1: [t1p0]
C2: [t1p1, t2p0, t2p1, t2p2]
Sticky的分配结果则如下:
C0 [t0p0]
C1 [t1p0, t1p1]
C2 [t2p0, t2p1, t2p2]
目的2:
比如我们有3个消费者(C0,C1,C2),都订阅了2个主题(T0 和 T1)并且每个主题都有 3 个分区(p0、p1、p2),那么所订阅的所有分区可以标识为T0p0、T0p1、T0p2、T1p0、T1p1、T1p2。此时使用Sticky分配策略后,得到的分区分配结果如下:
消费者线程 | 消费分区序号 |
---|---|
C0 | T0p0、T1p0 |
C1 | T0p1、T1p1 |
C2 | T0p2、T1p2 |
此时会发现,这里的分区结果和RoundRobin分区策略很类似,但其实底层并不相同,如果是RoundRobin分区策略,那么结果为:
消费者线程 | 消费分区序号 |
---|---|
C0 | T0p0、T0p2、T1p1 |
C1 | T0p1、T1p0、T1p2 |
而如果是Sticky分区策略,那么结果为:
消费者线程 | 消费分区序号 |
---|---|
C0 | T0p0、T1p0、T0p2 |
C1 | T0p1、T1p1、T1p2 |
仔细观察重分区后的消费分区序号会发现,C0中还是包含T0p0、T1p0分区,而C1中还是包含T0p1、T1p1分区;然后原本属于C2的T0p2、T1p2分区,分配给C0和C1消费者,此时是不会把两个分区都分给C0或者C1的,因为这违反了第一条件;
为什么要这么设计呢?这是因为发生分区重分配后,对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再
次处理一遍,这时就会浪费系统资源。
Kafka生产者和消费者分区策略部分源码解析相关推荐
- Kafka 生产者及消费者详解
一.Kafka 生产者 1.1 分区策略 1)分区的原因 (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群 ...
- Glide 4.9源码解析-缓存策略
本文Glide源码基于4.9,版本下载地址如下:Glide 4.9 前言 在分析了Glide的图片加载流程后,更加发觉到Glide的强大,于是这篇文章将继续深入分析Glide的缓存策略.不过今天的文章 ...
- Redis源码解析(15) 哨兵机制[2] 信息同步与TILT模式
Redis源码解析(1) 动态字符串与链表 Redis源码解析(2) 字典与迭代器 Redis源码解析(3) 跳跃表 Redis源码解析(4) 整数集合 Redis源码解析(5) 压缩列表 Redis ...
- 深入分析Kafka生产者和消费者
深入Kafka生产者和消费者 Kafka生产者 消息发送的流程 发送方式 发送并忘记 同步发送 异步发送 生产者属性配置 序列化器 分区器 自定义分区器 Kafka消费者 消费者属性配置 消费者基础概 ...
- Kafka生产者与消费者详解
什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...
- pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者
https://pykafka.readthedocs.io/en/latest/api/producer.html 说明文档 </div><h2 class="heade ...
- Kafka 生产者、消费者命令行操作
Kafka 生产者.消费者命令行操作 1.查看操作生产者命令参数 bin/kafka-console-producer.sh 参数 --bootstrap-server <String: ser ...
- 【kafka】Kafka 源码解析:Group 协调管理机制
1.概述 转载:Kafka 源码解析:Group 协调管理机制 在 Kafka 的设计中,消费者一般都有一个 group 的概念(当然,也存在不属于任何 group 的消费者),将多个消费者组织成一个 ...
- 爆火的Java面试题-kafka源码解析与实战豆瓣
1 基础 为什么 Java 中只有值传递? int 范围?float 范围? hashCode 与 equals,什么关系? String StringBuffer 和 StringBuilder 的 ...
最新文章
- 百度发的208亿春晚红包,靠这样的技术送到了你手上 | 解读
- UVA116 单向TSP Unidirectional TSP(多阶段决策问题、输出字典序最小的方案、DAG上DP)
- Spring Cloud Alibaba 学无止境:下一代微服务架构的规划与展望
- 如何提高一个研发团队的“代码速度”?
- python源码编译安装 gb18030_源代码编译安装Python3.5.2
- .NET 6 使用 string.Create 提升字符串创建和拼接性能
- 螺旋桨设计软件_欧洲斥巨资研发的A400M螺旋桨运输机,为啥就没人买啊?| 图说...
- pillow python 划线_Python-PIL(pillow)图片处理入门(一)
- 线程池状态以及转换java_JAVA线程池总结一下
- SpringCloud Sentinel 结合OpenFeign的使用介绍
- QT解析嵌套JSON表达式
- [翻译]XNA 3.0 Game Programming Recipes之forty-seven
- java获取前台值_SpringMVC接收前台传递过来的值的实例
- Android实战——Activity超详细学习笔记
- oracle左连接没用_Oracle左连接,右连接
- webpack 无法加载文件 C:\Users\User\AppData\Roaming\npm\webpack.ps1,因为在此系统上禁止运行脚本。
- KVM/QEMU(virt-manager)使用iso镜像安装macOS bigsur 11.4
- 如何申请美国、加拿大、英国的电话号码
- JavaWeb-16 (E家园项目案例1)
- STM32/APM32 用DMA采集ADC1多通道--标准库