一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。

Kafka有两种分配策略,一是roundrobin,一是range。最新还有一个StickyAssignor策略

将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)。当以下事件发生时,Kafka 将会进行一次分区分配:

  • 同一个 Consumer Group 内新增消费者

  • 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes

  • 订阅的主题新增分区

目前我们还不能自定义分区分配策略,只能通过partition.assignment.strategy参数选择 range 或 roundrobin。partition.assignment.strategy参数默认的值是range。

Kafka提供了消费者客户端参数partition.assignment.strategy用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka中还提供了另外两种分配策略: RoundRobinAssignor和StickyAssignor。消费者客户端参数partition.asssignment.strategy可以配置多个分配策略,彼此之间以逗号分隔。

本文假设我们有个名为T1的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据,而且C1的num.streams = 1,C2的num.streams = 2。

1.Range(默认策略)#

2.3.x版本API介绍:http://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

0.10版本API介绍: http://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。

假如有10个分区,3个消费者线程,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者线程为C1-0,C2-0,C2-1,那么用partition数除以消费者线程的总数来决定每个消费者线程消费几个partition,如果除不尽,前面几个消费者将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程,10/3 = 3,而且除除不尽,那么消费者线程C1-0将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

C1-0:0,1,2,3
C2-0:4,5,6
C2-1:7,8,9

如果有11个分区将会是:

C1-0:0,1,2,3
C2-0:4,5,6,7
C2-1:8,9,10

假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

C1-0:T1(0,1,2,3) T2(0,1,2,3)
C2-0:T1(4,5,6) T2(4,5,6)
C2-1:T1(7,8,9) T2(7,8,9)

可以看出, C1-0消费者线程比其他消费者线程多消费了2个分区

如上,只是针对 1 个 topic 而言,C1-0消费者多消费1个分区影响不是很大。如果有 N 多个 topic,那么针对每个 topic,消费者 C1-0 都将多消费 1 个分区,topic越多,C1-0 消费的分区会比其他消费者明显多消费 N 个分区。这就是 Range 范围分区的一个很明显的弊端了

2.RoundRobin#

0.10版本API:http://kafka.apache.org/0102/javadoc/allclasses-noframe.html

2.3.x版本API:http://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html

RoundRobin介绍

RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。RoundRobinAssignor策略对应的partition.assignment.strategy参数值为:org.apache.kafka.clients.consumer.RoundRobinAssignor。

使用RoundRobin策略有两个前提条件必须满足:

  1. 同一个消费者组里面的所有消费者的num.streams(消费者消费线程数)必须相等;
  2. 每个消费者订阅的主题必须相同。

所以这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,这里文字可能说不清,看下面的代码应该会明白:

val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>info("Consumer %s rebalancing the following partitions for topic %s: %s".format(ctx.consumerId, topic, partitions))partitions.map(partition => {TopicAndPartition(topic, partition)})
}.toSeq.sortWith((topicPartition1, topicPartition2) => {/** Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending* up on one consumer (if it has a high enough stream count).*/topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
})

最后按照round-robin风格将分区分别分配给不同的消费者线程。

在我们的例子里面,加入按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:

C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;

RoundRobin的两种情况

  1. 如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor策略的分区分配会是均匀的。

    举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

    消费者C0:t0p0、t0p2、t1p1
    消费者C1:t0p1、t1p0、t1p2
    
  2. 如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个topic,那么在分配分区的时候此消费者将分配不到这个topic的任何分区。

    举例,假设消费组内有3个消费者C0、C1和C2,它们共订阅了3个主题:t0、t1、t2,这3个主题分别有1、2、3个分区,即整个消费组订阅了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,那么最终的分配结果为:

    消费者C0:t0p0
    消费者C1:t1p0
    消费者C2:t1p1、t2p0、t2p1、t2p2
    

可以看到RoundRobinAssignor策略也不是十分完美,这样分配其实并不是最优解,因为完全可以将分区t1p1分配给消费者C1。

3.StickyAssignor#

我们再来看一下StickyAssignor策略,“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

  1. 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;
  2. 分区的分配尽可能的与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。我们举例来看一下StickyAssignor策略的实际效果。

假设消费组内有3个消费者:C0、C1和C2,它们都订阅了4个主题:t0、t1、t2、t3,并且每个主题有2个分区,也就是说整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区。最终的分配结果如下:

消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1

这样初看上去似乎与采用RoundRobinAssignor策略所分配的结果相同,但事实是否真的如此呢?

此时假设消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下:

消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1

如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为:

消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1

可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。

如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。

到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。

举例,同样消费组内有3个消费者:C0、C1和C2,集群中有3个主题:t0、t1和t2,这3个主题分别有1、2、3个分区,也就是说集群中有t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。消费者C0订阅了主题t0,消费者C1订阅了主题t0和t1,消费者C2订阅了主题t0、t1和t2。

如果此时采用RoundRobinAssignor策略,那么最终的分配结果如下所示(和讲述RoundRobinAssignor策略时的一样,这样不妨赘述一下):

消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2

如果此时采用的是StickyAssignor策略,那么最终的分配结果为:

消费者C0:t0p0
消费者C1:t1p0、t1p1
消费者C2:t2p0、t2p1、t2p2

可以看到这是一个最优解(消费者C0没有订阅主题t1和t2,所以不能分配主题t1和t2中的任何分区给它,对于消费者C1也可同理推断)。

假如此时消费者C0脱离了消费组,那么RoundRobinAssignor策略的分配结果为:

消费者C1:t0p0、t1p1
消费者C2:t1p0、t2p0、t2p1、t2p2

可以看到RoundRobinAssignor策略保留了消费者C1和C2中原有的3个分区的分配:t2p0、t2p1和t2p2(针对结果集1)。而如果采用的是StickyAssignor策略,那么分配结果为:

消费者C1:t1p0、t1p1、t0p0
消费者C2:t2p0、t2p1、t2p2

可以看到StickyAssignor策略保留了消费者C1和C2中原有的5个分区的分配:t1p0、t1p1、t2p0、t2p1、t2p2。

从结果上看StickyAssignor策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂。

4.Range策略演示#

package com.cw.kafka.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;/**** @author 陈小哥cw* @date 2020/6/19 17:07*/
public class CustomOffsetConsumer {public static void main(String[] args) {Properties properties = new Properties();// kafka集群,broker-listproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "cm1:9092,cm2:9092,cm3:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 消费者组,只要group.id相同,就属于同一个消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 关闭自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 1.创建一个消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 消费者订阅topicconsumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {// 重新分配完分区之前调用@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("==============回收的分区=============");for (TopicPartition partition : partitions) {System.out.println("partition = " + partition);}}// 重新分配完分区后调用@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.out.println("==============重新得到的分区==========");for (TopicPartition partition : partitions) {System.out.println("partition = " + partition);}}});while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());commitOffset(topicPartition, record.offset() + 1);}}}private static void commitOffset(TopicPartition topicPartition, long l) {}private static Long getPartitionOffset(TopicPartition partition) {return null;}
}

此时先启动一次程序,此时结果为

==============回收的分区=============
==============重新得到的分区==========
partition = first-2
partition = first-1
partition = first-0

此时在不关闭已开启的程序的情况下,再启动一次程序

第一次运行的程序结果

==============回收的分区=============
partition = first-2
partition = first-1
partition = first-0
==============重新得到的分区==========
partition = first-2

第二次运行的程序结果

==============回收的分区=============
==============重新得到的分区==========
partition = first-1
partition = first-0

这是因为两次运行的程序的消费者组id都是test,为同一个消费者组,当第二次运行程序时,对原来的分区进行回收,进行了分区的rebalance重新分配(默认range分配)。

Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor相关推荐

  1. 【Kafka】Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor

    文章目录 1. 分配策略 1.1 Range(默认策略) 1.2 RoundRobin RoundRobin的两种情况 1.3 StickyAssignor 2. Range策略演示 参考 相关文章 ...

  2. Kafka-消费者组三种分区分配策略Range Assignor、RoundRobin Assignor、Sticky Assignor详细解析

    Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor 文章目录 Kafka消费者组三种分区分配策略roundrobin,range,StickyAssign ...

  3. 深入分析Kafka架构(三):消费者消费方式、三种分区分配策略、offset维护

    本文目录 一.前言 二.消费者消费方式 三.分区分配策略 3.1.分配分区的前提条件 3.2.Range分配策略 3.3.RoundRobin分配策略 3.4.Sticky分配策略 四.offset维 ...

  4. Kafka Range、RoundRobin、Sticky 三种 分区分配策略区别

    Kafka Range RoundRobin 和Sticky 三种 分区分配策略 一.Kafka默认分区分配策略 1.1 consumer 订阅 1 topic ( 7 partition ) 按照K ...

  5. 深入浅出系列之 -- kafka消费者的三种语义模型

    本文主要详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once,at-least-once,和exact-once message,生产者的使用等. 创建主题 ...

  6. 9.Kafka 分区分配策略(Range分配策略 RoundRobin分配策略)

    前言 在 Kafka 实际生产过程中,每个 topic 都会有 多个 partitions.   1.多个Partitions有什么好处? ①多个 partition ,能够对 broker 上的数据 ...

  7. 【kafka】Kafka消费者分区分配策略详解

    文章目录 1.概述 2.RoundRobinAssignor详解 3.RangeAssignor详解 4.StickyAssignor详解 5.CooperativeStickyAssignor详解 ...

  8. Kafka分区分配策略以及重平衡过程总结

    Kafka自身提供了三种分区分配策略,通过消费者端配置参数partition.assignment.strategy来控制. 1.RangeAssignor分配策略(kafka默认的分区策略) 通过配 ...

  9. Kafka学习-----Kafka消费者Consumer:消费方式,分区分配策略,RangeRoundRobin

    目录 一.消费方式 二.消费者的分配模式 1.分配时机? 2.Range策略 2.RoundRobin 策略 三.代码解释 RangeAssignor: RoundRobinAssignor 一.消费 ...

最新文章

  1. mysql 出现Cannot delete or update a parent row: a...
  2. php导出excel失败原因,PHPExcel导出Excel文件报找不到该文件错误
  3. 蜗轮蜗杆计算软件_齿轮传动计算软件
  4. linux环境下用TcpDump抓包分析总结
  5. dll放在unity哪个文件夹下_unity调用C#dll文件
  6. Windows API-GDI入门基础知识详解(1)
  7. 一种基于labview的类Office XP风格的菜单控件的用户开发界面
  8. 【转载】Linux下用dd命令扇区读写SD卡
  9. 【AGC035F】Two Histograms
  10. 谷歌浏览器(Chrome)最新v80版本下载
  11. 汉子字符转换成大写英文字母开头。。
  12. Eclypse-Z7 + Zmod ADC 1410 基础环境搭建(SDK部分)
  13. DSP视频教程第2期:系统介绍ARM DSP数字信号处理库以及超简单的移植方法分享(2022-01-27)
  14. execve 执行遇到的问题-已解决
  15. 不用找了,50个备课网站一网打尽
  16. Python 绘制热力图
  17. Mysql-8.0.26-winx64下载和安装
  18. 设计一个数字运算游戏
  19. 数据科学、管理科学系课程教学课件——FineReport实验指导书节选====明细表、分组表、交叉表
  20. 优秀开源项目之一:视频监控系统iSpy

热门文章

  1. Ext JS - renderer 函数
  2. form submit提交的几种方法
  3. 微型计算机原理daa,西安交通大学18年3月课程考试《微机原理与接口技术》作业考核试...
  4. 支付宝支付时出现:页面出错了,别着急。请看下面的帮助信息:您所访问的内容不存在或出现故障。
  5. 稚晖君刚拿了百度投资,估值被曝已超独角兽!
  6. 【有效】2016/2013/2010 软件的安装解决方法
  7. jdk8 新特性汇总
  8. linux wifi名称设置中文乱码,无线wifi名称怎样能设置中文而且不乱码
  9. jzoj1481. 偷懒的西西
  10. 关于B树的学习总结和B+树,B*树的简介