目录

  • 一.消费方式
  • 二.消费者的分配模式
    • 1.分配时机?
    • 2.Range策略
    • 2.RoundRobin 策略
  • 三.代码解释
    • RangeAssignor:
    • RoundRobinAssignor

一.消费方式

consumer 采用 pull(拉)模式从 broker 中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。 它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,

二.消费者的分配模式

一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及 到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。
Kafka 有两种分配策略,一是 RoundRobin,一是 Range。

1.分配时机?

在 Kafka 内部存在两种默认的分区分配策略:Range 和 RoundRobin。当以下事件发生时,Kafka 将会进行一次分区分配:

  1. 同一个 Consumer Group 内新增消费者
  2. 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
  3. 订阅的主题新增分区

将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance),如何rebalance就涉及到下面提到的分区分配策略。下面我们将详细介绍 Kafka 内置的两种分区分配策略。本文假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据。

2.Range策略

Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者排完序将会是C1, C2。然后将partitions的个数除于消费者的总数来决定每个消费者消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

几个例子:

  • 情况1:有10个分区,2个消费者, 10 / 2 = 5,那么消费者 C1和消费者C2 将会消费同样多的分区,所以最后分区分配的结果看起来是这样的:

C1 将消费 0, 1, 2, 3, 4 分区

C2 将消费 5, 6, 7, 8, 9 分区

  • 情况2:有11个分区,那么最后分区分配的结果看起来是这样的:

C1 将消费 0, 1, 2, 3, 4, 5 分区

C2 将消费 6, 7, 8, 9, 10分区

  • 情况3:有2个主题(T1和T2),分别有11个分区(0,1,2,…10),那么最后分区分配的结果看起来是这样的:

C1 将消费 T1主题的 0, 1, 2, 3, 4, 5 分区以及 T2主题的 0, 1, 2, 3, 4, 5分区,加起来一共

C2 将消费 T1主题的 6, 7, 8, 9, 10 分区以及 T2主题的 5, 6, 7, 8, 9, 10分区

可以看出,C1 消费者比C2消费者多消费了2个分区,这就是Range strategy的一个很明显的弊端。

2.RoundRobin 策略

使用RoundRobin策略有两个前提条件必须满足:

  1. 同一个Consumer Group里面的所有消费者的num.streams必须相等;
  2. 每个消费者订阅的主题必须相同。

例子:

假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:

C1-0 将消费 T1-5, T1-2, T1-6 分区;

C1-1 将消费 T1-3, T1-1, T1-9 分区;

C2-0 将消费 T1-0, T1-4 分区;

C2-1 将消费 T1-8, T1-7 分区;

三.代码解释

RangeAssignor:

  • 该策略会把主题的若干个连续的分区分配给消费者(kafka默认用该策略)。

假设消费者 C1 和消费者 C2 同时 订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这 两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题 拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消 费者更多的分区。只要使用了 RangeAssignor 策略,而且分区数量无法被消费者数量整除,就会 出现这种情况。

        public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, List<String>> subscriptions) {Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();//key为consumerID,value为分配给该consumer的TopicPartitionfor (String memberId : subscriptions.keySet())//对于每一个consumerassignment.put(memberId, new ArrayList<TopicPartition>());//初始化//对于每一个topic,进行分配for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();List<String> consumersForTopic = topicEntry.getValue();//这个topic的partition数量Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)//partition数量为null,直接跳过,忽略continue;Collections.sort(consumersForTopic);//对consumer进行排序//计算每个consumer分到的partition数量int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();//计算平均以后剩余partition数量int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();//从0开始作为Partition Index, 构造TopicPartition对象List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i = 0, n = consumersForTopic.size(); i < n; i++) {//对于当前这个topic的每一个consumer//一定是前面几个consumer会被分配一个额外的TopicPartitiionint start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));}}return assignment;}

RoundRobinAssignor

该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobinAssignor 策略来给消费者 C1 和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区) 。

     @Overridepublic Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, List<String>> subscriptions) {Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<TopicPartition>());//初始化分配规则CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));//assigner存放了所有的consumerfor (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {//所有的consumer订阅的所有的TopicPartition的Listfinal String topic = partition.topic();while (!subscriptions.get(assigner.peek()).contains(topic))// 如果当前这个assigner(consumer)没有订阅这个topic,直接跳过assigner.next();//跳出循环,表示终于找到了订阅过这个TopicPartition对应的topic的assigner//将这个partition分派给对应的assignerassignment.get(assigner.next()).add(partition);}return assignment;}

RoundRobinAssignor与RangeAssignor最大的区别,是进行分区分配的时候不再逐个topic进行,即不是为某个topic完成了分区分派以后,再进行下一个topic的分区分派, 而是首先将这个group中的所有consumer订阅的所有的topic-partition按顺序展开,然后,依次对于每一个topic-partition,在consumer进行round robin,为这个topic-partition选择一个consumer。

Kafka学习-----Kafka消费者Consumer:消费方式,分区分配策略,RangeRoundRobin相关推荐

  1. 【Kafka】Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor

    文章目录 1. 分配策略 1.1 Range(默认策略) 1.2 RoundRobin RoundRobin的两种情况 1.3 StickyAssignor 2. Range策略演示 参考 相关文章 ...

  2. Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor

    一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消 ...

  3. Kafka Range、RoundRobin、Sticky 三种 分区分配策略区别

    Kafka Range RoundRobin 和Sticky 三种 分区分配策略 一.Kafka默认分区分配策略 1.1 consumer 订阅 1 topic ( 7 partition ) 按照K ...

  4. 9.Kafka 分区分配策略(Range分配策略 RoundRobin分配策略)

    前言 在 Kafka 实际生产过程中,每个 topic 都会有 多个 partitions.   1.多个Partitions有什么好处? ①多个 partition ,能够对 broker 上的数据 ...

  5. kafka partition分配_kafka的分区分配策略

    用过 Kafka 的同学应该都知道,每个 Topic 一般会有很多个 partitions.为了使得我们能够及时消费消息,我们也可能会启动多个 Consumer 去消费,而每个 Consumer 又会 ...

  6. Kafka分区分配策略(Partition Assignment Strategy)

    问题 用过 Kafka 的同学用过都知道,每个 Topic 一般会有很多个 partitions.为了使得我们能够及时消费消息,我们也可能会启动多个 Consumer 去消费,而每个 Consumer ...

  7. kafka分区分配策略

    kafka的分区分配策略大概可以分为一下几步: 前置条件: a.假设消费者组到对应的server的GroupCordinator是已知的,这个groupCordinator和消费组对应的_offset ...

  8. Kafka-消费者组三种分区分配策略Range Assignor、RoundRobin Assignor、Sticky Assignor详细解析

    Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor 文章目录 Kafka消费者组三种分区分配策略roundrobin,range,StickyAssign ...

  9. 深入分析Kafka架构(三):消费者消费方式、三种分区分配策略、offset维护

    本文目录 一.前言 二.消费者消费方式 三.分区分配策略 3.1.分配分区的前提条件 3.2.Range分配策略 3.3.RoundRobin分配策略 3.4.Sticky分配策略 四.offset维 ...

最新文章

  1. cc.AudioSource
  2. 联想无线网卡 linux驱动,ubuntu14.04手动安装博通官方无线网卡驱动时报错,
  3. MySQL设置表的字段值自动增加
  4. [wxWidgets]_[0基础]_[不常见但有用的类wxStandardPaths]
  5. 原子微型结构信息应用到局部图形信息存储的猜想
  6. python2.7卸载出问题原因分析_怎么卸载python2.7
  7. Charles的iOS应用程序
  8. Flutter进阶—质感设计之弹出菜单
  9. html圆圈复选框的代码,单选、复选框Demo
  10. 声网3D空间音频技术解析:3D空间音效+空气衰减模拟+人声模糊
  11. 怎么快速在计算机植入病毒,怎样给别人的电脑植入病毒
  12. Spring AOP动态代理的两种实现方式
  13. 数学----两个或者多个函数相乘求它们的导数
  14. vue-cli安装了,却说vue不是命令解决方法
  15. 小小白的Android入门之计算器学习
  16. XDF赵海英老师C语言课程——考研考级专用(推荐)
  17. 【学习笔记】行人异常行为检测的综述
  18. linux掩码,linux文件权限掩码umask
  19. 分区表类型:MBR和GUID区别
  20. oracle 中fuser,关于fuser的使用方法

热门文章

  1. 有一个人说网址前面的www是主机名?主机?我懵了
  2. MWC大会来临,YunOS会有哪些表现?
  3. scp 报错: Permission denied, please try again(publickey,password)
  4. JAVA Pattern.matches的使用
  5. 三维实景地图,挺酷的
  6. MQTT——EMQX学习笔记07——topic、topic filter和主题通配符
  7. 用HTML+CSS+JS做一个漂亮简单的游戏网页——全屏游戏美术大赛作品(4个滚动页面)
  8. vue 时间戳转日期
  9. 中央空调,大金、日立、海信们的年轻人争夺战
  10. php解决红白黑三种球,2020版iPhone SE或周五推出 有红白黑三种颜色