本文来书说下kafka中生产者和消费者的分区问题

文章目录

  • 概述
  • 主题的分区数设置
  • 分区与生产者
  • 分区与消费者
    • range
    • roundrobin(轮询)
  • 本文参考
  • 本文小结

概述

我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名义订阅),而主题下是分区,消息是存储在分区中的,所以事实上生产者发送消息到分区,消费者则从分区读取消息,那么,这里问题来了,生产者将消息投递到哪个分区?消费者组中的消费者实例之间是怎么分配分区的呢?接下来,就围绕着这两个问题一探究竟。


主题的分区数设置

在server.properties配置文件中可以指定一个全局的分区数设置,这是对每个主题下的分区数的默认设置,默认是1。


当然每个主题也可以自己设置分区数量,如果创建主题的时候没有指定分区数量,则会使用server.properties中的设置。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1

在创建主题的时候,可以使用**–partitions**选项指定主题的分区数量

[root@localhost kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abc
Topic:abc       PartitionCount:2        ReplicationFactor:1     Configs:Topic: abc      Partition: 0    Leader: 0       Replicas: 0     Isr: 0Topic: abc      Partition: 1    Leader: 0       Replicas: 0     Isr: 0

分区与生产者

首先提出一个问题:生产者将消息投递到分区有没有规律?如果有,那么它是如何决定一条消息该投递到哪个分区的呢

The default partitioning strategy:

  • If a partition is specified in the record, use it
  • If no partition is specified but a key is present choose a partition based on a hash of the key
  • If no partition or key is present choose a partition in a round-robin fashion

默认的分区策略是

  • 如果在发消息的时候指定了分区,则消息投递到指定的分区
  • 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
  • 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区
/*** Compute the partition for the given record.** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes serialized key to partition on (or null if no key)* @param value The value to partition on or null* @param valueBytes serialized value to partition on or null* @param cluster The current cluster metadata*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}
}

通过源代码可以更加作证这一点


分区与消费者

消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,那么消费者实例和分区之前的对应关系是怎样的呢?换句话说,就是组中的每一个消费者负责那些分区,这个分配关系是如何确定的呢?


同一时刻,一条消息只能被组中的一个消费者实例消费

消费者组订阅这个主题,意味着主题下的所有分区都会被组中的消费者消费到,如果按照从属关系来说的话就是,主题下的每个分区只从属于组中的一个消费者,不可能出现组中的两个消费者负责同一个分区。

那么,问题来了。如果分区数大于或者等于组中的消费者实例数,那自然没有什么问题,无非一个消费者会负责多个分区,(PS:当然,最理想的情况是二者数量相等,这样就相当于一个消费者负责一个分区);但是,如果消费者实例的数量大于分区数,那么按照默认的策略(PS:之所以强调默认策略是因为你也可以自定义策略),有一些消费者是多余的,一直接不到消息而处于空闲状态。

话又说回来,假设多个消费者负责同一个分区,那么会有什么问题呢?

我们知道,Kafka它在设计的时候就是要保证分区下消息的顺序,也就是说消息在一个分区中的顺序是怎样的,那么消费者在消费的时候看到的就是什么样的顺序,那么要做到这一点就首先要保证消息是由消费者主动拉取的(pull),其次还要保证一个分区只能由一个消费者负责。倘若,两个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,则会造成很多浪费,因为这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异。


消费者分区分配策略。org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor。如果是自定义分配策略的话可以继承AbstractPartitionAssignor这个类,它默认有3个实现。

range

range策略对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor。这是默认的分配策略,可以通过消费者配置中partition.assignment.strategy参数来指定分配策略,它的值是类的全路径,是一个数组

/*** The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order* and the consumers in lexicographic order. We then divide the number of partitions by the total number of* consumers to determine the number of partitions to assign to each consumer. If it does not evenly* divide, then the first few consumers will have one extra partition.** For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,* resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.** The assignment will be:* C0: [t0p0, t0p1, t1p0, t1p1]* C1: [t0p2, t1p2]*/

range策略是基于每个主题的。对于每个主题,我们以数字顺序排列可用分区,以字典顺序排列消费者。然后,将分区数量除以消费者总数,以确定分配给每个消费者的分区数量。如果没有平均划分(PS:除不尽),那么最初的几个消费者将有一个额外的分区。

简而言之,就是,

  • range分配策略针对的是主题(PS:也就是说,这里所说的分区指的某个主题的分区,消费者值的是订阅这个主题的消费者组中的消费者实例)
  • 首先,将分区按数字顺序排行序,消费者按消费者名称的字典序排好序
  • 然后,用分区总数除以消费者总数。如果能够除尽,则皆大欢喜,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区

例如,假设有两个消费者C0和C1,两个主题t0和t1,并且每个主题有3个分区,分区的情况是这样的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,基于以上信息,最终消费者分配分区的情况是这样的:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

为什么是这样的结果呢?

因为,对于主题t0,分配的结果是C0负责P0和P1,C1负责P2;对于主题t2,也是如此,综合起来就是这个结果

上面的过程用图形表示的话大概是这样的:

阅读代码,更有助于理解:

public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {//    主题与消费者的映射                                                            Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<TopicPartition>());for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();    //    主题List<String> consumersForTopic = topicEntry.getValue();    //    消费者列表//    partitionsPerTopic表示主题和分区数的映射//    获取主题下有多少个分区Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)continue;//    消费者按字典序排序Collections.sort(consumersForTopic);//    分区数量除以消费者数量int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();//    取模,余数就是额外的分区int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i = 0, n = consumersForTopic.size(); i < n; i++) {int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);//    分配分区assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));}}return assignment;
}

roundrobin(轮询)

roundronbin分配策略的具体实现是org.apache.kafka.clients.consumer.RoundRobinAssignor

/*** The round robin assignor lays out all the available partitions and all the available consumers. It* then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer* instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts* will be within a delta of exactly one across all consumers.)** For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,* resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.** The assignment will be:* C0: [t0p0, t0p2, t1p1]* C1: [t0p1, t1p0, t1p2]** When subscriptions differ across consumer instances, the assignment process still considers each* consumer instance in round robin fashion but skips over an instance if it is not subscribed to* the topic. Unlike the case when subscriptions are identical, this can result in imbalanced* assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2,* with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0,* t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.** Tha assignment will be:* C0: [t0p0]* C1: [t1p0]* C2: [t1p1, t2p0, t2p1, t2p2]*/

轮询分配策略是基于所有可用的消费者和所有可用的分区的

与前面的range策略最大的不同就是它不再局限于某个主题

如果所有的消费者实例的订阅都是相同的,那么这样最好了,可用统一分配,均衡分配

例如,假设有两个消费者C0和C1,两个主题t0和t1,每个主题有3个分区,分别是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,最终分配的结果是这样的:

C0: [t0p0, t0p2, t1p1]

C1: [t0p1, t1p0, t1p2]

用图形表示大概是这样的:


假设,组中每个消费者订阅的主题不一样,分配过程仍然以轮询的方式考虑每个消费者实例,但是如果没有订阅主题,则跳过实例。当然,这样的话分配肯定不均衡。

什么意思呢?也就是说,消费者组是一个逻辑概念,同组意味着同一时刻分区只能被一个消费者实例消费,换句话说,同组意味着一个分区只能分配给组中的一个消费者。事实上,同组也可以不同订阅,这就是说虽然属于同一个组,但是它们订阅的主题可以是不一样的。

例如,假设有3个主题t0,t1,t2;其中,t0有1个分区p0,t1有2个分区p0和p1,t2有3个分区p0,p1和p2;有3个消费者C0,C1和C2;C0订阅t0,C1订阅t0和t1,C2订阅t0,t1和t2。那么,按照轮询分配的话,C0应该负责

首先,肯定是轮询的方式,其次,比如说有主题t0,t1,t2,它们分别有1,2,3个分区,也就是t0有1个分区,t1有2个分区,t2有3个分区;有3个消费者分别从属于3个组,C0订阅t0,C1订阅t0和t1,C2订阅t0,t1,t2;那么,按照轮询分配的话,C0应该负责t0p0,C1应该负责t1p0,其余均由C2负责。

上述过程用图形表示大概是这样的:


为什么最后的结果是

C0: [t0p0]

C1: [t1p0]

C2: [t1p1, t2p0, t2p1, t2p2]

这是因为,按照轮询t0p1由C0负责,t1p0由C1负责,由于同组,C2只能负责t1p1,由于只有C2订阅了t2,所以t2所有分区由C2负责,综合起来就是这个结果

细想一下可以发现,这种情况下跟range分配的结果是一样的


本文参考

  • http://kafka.apache.org/documentation/#consumerconfigs
  • https://blog.csdn.net/feelwing1314/article/details/81097167
  • https://blog.csdn.net/OiteBody/article/details/80595971
  • https://blog.csdn.net/YChenFeng/article/details/74980531

本文小结

本文详细介绍了kafka中分区,生产者,消费者这3者之间的关系和区别。

kafka中生产者和消费者的分区问题相关推荐

  1. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  2. 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

    1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...

  3. kafka中生产者是如何把消息投递到哪个分区的?消费者又是怎么选择分区的?...

    作者 | 废物大师兄 来源 | https://www.cnblogs.com/cjsblog/p/9664536.html 1. 前言 我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名 ...

  4. Java中生产者与消费者问题的演变

    想要了解更多关于Java生产者消费者问题的演变吗?那就看看这篇文章吧,我们分别用旧方法和新方法来处理这个问题. 生产者消费者问题是一个典型的多进程同步问题. 对于大多数人来说,这个问题可能是我们在学校 ...

  5. 彻底理解kafka中partition和消费者对应关系

    1个partition只能被同组的一个consumer消费,同组的consumer则起到均衡效果 消费者多于partition topic: test 只有一个partition 创建一个topic- ...

  6. Java中生产者和消费者总结

    生产者和消费者问题是线程模型中的经典问题,生产者和消费者在同一时间段共用同一个存储空间,这个存储空间是一个缓冲区的仓库,生产者可以将产品放入仓库,消费者可以从仓库中取出产品. 生产者/消费者模型是基于 ...

  7. Disruptor框架中生产者、消费者的各种复杂依赖场景下的使用总结-我见过最好的Disruptor

    更多高并发知识请访问 www.itkc8.com 非常感谢 https://www.cnblogs.com/pku-liuqiang/p/8544700.html Disruptor是一个优秀的并发框 ...

  8. day20Java-Thread-多线程中生产者和消费者

    博客 Java-(高级) 文章目录 多线程生产者和消费者 多线程生产者消费者代码版本1 多线程生产者消费者代码版本2-同步解决问题 多线程生产者消费者代码版本3-等待唤醒机制解决问题 多线程生产者消费 ...

  9. kafka(四)生产者和消费者配置优化

    1.生产者producer 1.1 producer 参数acks设置 在消息被认为是"已提交"之前,producer需要leader确认的produce请求的应答数.该参数用于控 ...

最新文章

  1. 技术图文:如何实现汉诺塔问题?
  2. 系统架构师笔记(2)
  3. 以Settings.APPLICATION_DEVELOPMENT_SETTINGS打开开发者面板出错总结
  4. 前端学习(2525):实现过滤功能
  5. 20178.27 万径人踪灭 思考记录
  6. C++中栈区 堆区 常量区
  7. [转载] java中对象作为参数传递给一个方法,到底是值传递,还是引用传递
  8. 里皮正式告别国足:我尽了最大努力让球队成长
  9. 对shell的简单认识
  10. semaphore的几种用法
  11. 【算法篇】汉诺塔问题
  12. 计算机sci多少页,sci论文一般多少页
  13. postgresql注入笔记
  14. Markdown编辑器使用-yellowcong
  15. Python | 人脸识别系统 — 活体检测
  16. MySQL数据库与数据表的创建
  17. 基于Netty的Android局域网IP电话
  18. 《简明Python教程》读书笔记
  19. Linux字体美化实战(Fontconfig配置)(转)
  20. ngnix有版本要求吗_魔兽世界:暴雪疯了?新版本老玩家遭重大削弱,这是逼人AFK?...

热门文章

  1. iOS:Xcode8以下真机测试iOS10.0和iOS10.1配置包
  2. 当你已经23~男生女生都该看
  3. 安装运行symfony框架编写的edusoho开源程序
  4. python面试必备-基础篇
  5. 1636: Pascal山脉
  6. 林洋能源:布局能源互联网 分布式光伏龙头再扬帆
  7. Linux MTD子系统 _从模型分析到Flash驱动模板
  8. 重载跟重写--笔记2
  9. Lync和Exchange 2013集成PART4:配置统一存档
  10. 与小弟子交谈:引申的思考笔记[第一次编辑]