文章目录

  • 1. 分配策略
    • 1.1 Range(默认策略)
    • 1.2 RoundRobin
      • RoundRobin的两种情况
    • 1.3 StickyAssignor
  • 2. Range策略演示
  • 参考

相关文章
《Rebalance机制、分区分配策略》
《Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor》

1. 分配策略

一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。

Kafka有两种分配策略,一是roundrobin,一是range。最新还有一个StickyAssignor策略

将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)。当以下事件发生时,Kafka 将会进行一次分区分配:

  • 同一个 Consumer Group 内新增消费者

  • 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes

  • 订阅的主题新增分区

目前我们还不能自定义分区分配策略,只能通过partition.assignment.strategy参数选择 range 或 roundrobin。 partition.assignment.strategy参数默认的值是range。

Kafka提供了消费者客户端参数partition.assignment.strategy用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka中还提供了另外两种分配策略: RoundRobinAssignor和StickyAssignor。消费者客户端参数partition.asssignment.strategy可以配置多个分配策略,彼此之间以逗号分隔。

partition.assignment.strategy
A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. Available options are:org.apache.kafka.clients.consumer.'RangeAssignor' The default assignor, which works on a per-topic basis.
org.apache.kafka.clients.consumer.'RoundRobinAssignor': Assigns partitions to consumers in a round-robin fashion.
org.apache.kafka.clients.consumer.'StickyAssignor': Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible.
org.apache.kafka.clients.consumer.'CooperativeStickyAssignor': Follows the same StickyAssignor logic, but allows for cooperative rebalancing.
Implementing the org.apache.kafka.clients.consumer.ConsumerPartitionAssignor interface allows you to plug in a custom assignment strategy.Type: list
Default:    class org.apache.kafka.clients.consumer.RangeAssignor
Valid Values:   non-null string
Importance: medium

如上面红色字体所示,存在众多的策略类型

本文假设我们有个名为T1的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据,而且C1的num.streams = 1,C2的num.streams = 2。

1.1 Range(默认策略)

2.3.x版本API介绍:http://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
0.10版本API介绍: http://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。

假如有10个分区,3个消费者线程,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者线程为C1-0,C2-0,C2-1,那么用partition数除以消费者线程的总数来决定每个消费者线程消费几个partition,如果除不尽,前面几个消费者将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程,10/3 = 3,而且除除不尽,那么消费者线程C1-0将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

C1-0:0,1,2,3
C2-0:4,5,6
C2-1:7,8,9

如果有11个分区将会是:

C1-0:0,1,2,3
C2-0:4,5,6,7
C2-1:8,9,10

假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

C1-0:T1(0,1,2,3) T2(0,1,2,3)
C2-0:T1(4,5,6) T2(4,5,6)
C2-1:T1(7,8,9) T2(7,8,9)

可以看出, C1-0消费者线程比其他消费者线程多消费了2个分区

如上,只是针对 1 个 topic 而言,C1-0消费者多消费1个分区影响不是很大。如果有 N 多个 topic,那么针对每个 topic,消费者 C1-0 都将多消费 1 个分区,topic越多,C1-0 消费的分区会比其他消费者明显多消费 N 个分区。这就是 Range 范围分区的一个很明显的弊端了

1.2 RoundRobin

0.10版本API:http://kafka.apache.org/0102/javadoc/allclasses-noframe.html
2.3.x版本API:http://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html

RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。RoundRobinAssignor策略对应的partition.assignment.strategy参数值为:org.apache.kafka.clients.consumer.RoundRobinAssignor。

使用RoundRobin策略有两个前提条件必须满足:

  • 同一个消费者组里面的所有消费者的num.streams(消费者消费线程数)必须相等;
  • 每个消费者订阅的主题必须相同。

所以这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,这里文字可能说不清,看下面的代码应该会明白:

val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>info("Consumer %s rebalancing the following partitions for topic %s: %s".format(ctx.consumerId, topic, partitions))partitions.map(partition => {TopicAndPartition(topic, partition)})
}.toSeq.sortWith((topicPartition1, topicPartition2) => {/** Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending* up on one consumer (if it has a high enough stream count).*/topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
})
最后按照round-robin风格将分区分别分配给不同的消费者线程。在我们的例子里面,加入按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;

RoundRobin的两种情况

如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor策略的分区分配会是均匀的。

举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

消费者C0:t0p0、t0p2、t1p1
消费者C1:t0p1、t1p0、t1p2

如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。 如果某个消费者没有订阅消费组内的某个topic,那么在分配分区的时候此消费者将分配不到这个topic的任何分区。

举例,假设消费组内有3个消费者C0、C1和C2,它们共订阅了3个主题:t0、t1、t2,这3个主题分别有1、2、3个分区,即整个消费组订阅了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,那么最终的分配结果为:

消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2

可以看到RoundRobinAssignor策略也不是十分完美,这样分配其实并不是最优解,因为完全可以将分区t1p1分配给消费者C1。

1.3 StickyAssignor

我们再来看一下StickyAssignor策略,“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

  • 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;
  • 分区的分配尽可能的与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。我们举例来看一下StickyAssignor策略的实际效果。

假设消费组内有3个消费者:C0、C1和C2,它们都订阅了4个主题:t0、t1、t2、t3,并且每个主题有2个分区,也就是说整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区。最终的分配结果如下:

消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1

这样初看上去似乎与采用RoundRobinAssignor策略所分配的结果相同,但事实是否真的如此呢?

此时假设消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下:

消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1

如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为:

消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1

可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。

如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。

到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。

举例,同样消费组内有3个消费者:C0、C1和C2,集群中有3个主题:t0、t1和t2,这3个主题分别有1、2、3个分区,也就是说集群中有t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。消费者C0订阅了主题t0,消费者C1订阅了主题t0和t1,消费者C2订阅了主题t0、t1和t2。

如果此时采用RoundRobinAssignor策略,那么最终的分配结果如下所示(和讲述RoundRobinAssignor策略时的一样,这样不妨赘述一下):

消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2

如果此时采用的是StickyAssignor策略,那么最终的分配结果为:

消费者C0:t0p0
消费者C1:t1p0、t1p1
消费者C2:t2p0、t2p1、t2p2

可以看到这是一个最优解(消费者C0没有订阅主题t1和t2,所以不能分配主题t1和t2中的任何分区给它,对于消费者C1也可同理推断)。

假如此时消费者C0脱离了消费组,那么RoundRobinAssignor策略的分配结果为:

消费者C1:t0p0、t1p1
消费者C2:t1p0、t2p0、t2p1、t2p2

可以看到RoundRobinAssignor策略保留了消费者C1和C2中原有的3个分区的分配:t2p0、t2p1和t2p2(针对结果集1)。而如果采用的是StickyAssignor策略,那么分配结果为:

消费者C1:t1p0、t1p1、t0p0
消费者C2:t2p0、t2p1、t2p2

可以看到StickyAssignor策略保留了消费者C1和C2中原有的5个分区的分配:t1p0、t1p1、t2p0、t2p1、t2p2。

从结果上看StickyAssignor策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂。

2. Range策略演示

package com.cw.kafka.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;/**** @author 陈小哥cw* @date 2020/6/19 17:07*/
public class CustomOffsetConsumer {public static void main(String[] args) {Properties properties = new Properties();// kafka集群,broker-listproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "cm1:9092,cm2:9092,cm3:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 消费者组,只要group.id相同,就属于同一个消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 关闭自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 1.创建一个消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 消费者订阅topicconsumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {// 重新分配完分区之前调用@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("==============回收的分区=============");for (TopicPartition partition : partitions) {System.out.println("partition = " + partition);}}// 重新分配完分区后调用@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.out.println("==============重新得到的分区==========");for (TopicPartition partition : partitions) {System.out.println("partition = " + partition);}}});while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());commitOffset(topicPartition, record.offset() + 1);}}}private static void commitOffset(TopicPartition topicPartition, long l) {}private static Long getPartitionOffset(TopicPartition partition) {return null;}
}

此时先启动一次程序,此时控制台为:

==============回收的分区=============
==============重新得到的分区==========
partition = first-2
partition = first-1
partition = first-0

此时在不关闭已开启的程序的情况下,再启动一次程序

第一次运行的程序结果(控制台新打印的):

==============回收的分区=============
partition = first-2
partition = first-1
partition = first-0
==============重新得到的分区==========
partition = first-2

第二次运行的程序结果:

==============回收的分区=============
==============重新得到的分区==========
partition = first-1
partition = first-0

这是因为两次运行的程序的消费者组id都是test,为同一个消费者组,当第二次运行程序时,对原来的分区进行回收,进行了分区的rebalance重新分配(默认range分配)。

参考

Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor

【Kafka】Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor相关推荐

  1. Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor

    一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消 ...

  2. Kafka-消费者组三种分区分配策略Range Assignor、RoundRobin Assignor、Sticky Assignor详细解析

    Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor 文章目录 Kafka消费者组三种分区分配策略roundrobin,range,StickyAssign ...

  3. 深入分析Kafka架构(三):消费者消费方式、三种分区分配策略、offset维护

    本文目录 一.前言 二.消费者消费方式 三.分区分配策略 3.1.分配分区的前提条件 3.2.Range分配策略 3.3.RoundRobin分配策略 3.4.Sticky分配策略 四.offset维 ...

  4. Kafka Range、RoundRobin、Sticky 三种 分区分配策略区别

    Kafka Range RoundRobin 和Sticky 三种 分区分配策略 一.Kafka默认分区分配策略 1.1 consumer 订阅 1 topic ( 7 partition ) 按照K ...

  5. 9.Kafka 分区分配策略(Range分配策略 RoundRobin分配策略)

    前言 在 Kafka 实际生产过程中,每个 topic 都会有 多个 partitions.   1.多个Partitions有什么好处? ①多个 partition ,能够对 broker 上的数据 ...

  6. Kafka分区分配策略以及重平衡过程总结

    Kafka自身提供了三种分区分配策略,通过消费者端配置参数partition.assignment.strategy来控制. 1.RangeAssignor分配策略(kafka默认的分区策略) 通过配 ...

  7. 【kafka】Kafka消费者分区分配策略详解

    文章目录 1.概述 2.RoundRobinAssignor详解 3.RangeAssignor详解 4.StickyAssignor详解 5.CooperativeStickyAssignor详解 ...

  8. Kafka学习-----Kafka消费者Consumer:消费方式,分区分配策略,RangeRoundRobin

    目录 一.消费方式 二.消费者的分配模式 1.分配时机? 2.Range策略 2.RoundRobin 策略 三.代码解释 RangeAssignor: RoundRobinAssignor 一.消费 ...

  9. kafka partition分配_kafka的分区分配策略

    用过 Kafka 的同学应该都知道,每个 Topic 一般会有很多个 partitions.为了使得我们能够及时消费消息,我们也可能会启动多个 Consumer 去消费,而每个 Consumer 又会 ...

最新文章

  1. idea关闭页面显示的浏览器图标
  2. [cerc2012][Gym100624B]20181013
  3. GdiPlus[47]: IGPMatrix 矩阵(二)
  4. 2017-06-27
  5. post url 后面跟参数_都2019年了,还问GET和POST的区别
  6. Messes in Reading Source Coding of SSD
  7. 10 signs that you’re not cut out to be an IT manager
  8. HTML代码实现简易购物车-web前端教程
  9. 100套计算机毕设源码+论文 免费分享 【2020最新版】
  10. 杂题 P1640 [SCOI2010]连续攻击游戏
  11. JAVA微信小程序医院预约挂号小程序系统毕业设计 开题报告
  12. 可汗学院公开课:金融学笔记
  13. MATLAB指纹识别技术[完美运行,详细解释,GUI界面,万字文稿]
  14. mysql 怎么同时删除两张表的数据库,mysql怎样删除多个表格数据库数据_数据库
  15. 神经网络理论及应用答案,神经网络理论名词解释
  16. win10多合一原版系统_win10简体中文64位16299.15多合一版本
  17. php 将百分数处理成小数,php将百分数如何转小数
  18. (RabbitMQ 二)Springboot项目中使用RabbitMQ的相关依赖
  19. PCL入门系列 —— NormalEstimation、NormalEstimationOMP 基于邻域的点云法线估计
  20. 又是一年1024,去年的1025你是怎么过的?

热门文章

  1. docker + openface进行人脸识别(初探)
  2. Android手机利用KSWEB+端口转发搭建PHP服务器
  3. linux firefox插件开发教程,Linux下firefox插件开发
  4. 遇到百度网址安全中心提醒您该页面可能存在XXXXXX的处理解决办法
  5. js第2章基本语法 课后习题——求出1~100之间的素数、求红白黑球
  6. 从0开始写一个播放器系列-开篇
  7. 蓝光播放支持3D ArcSoft TotalMedia Theatre 5.0.1.87 Platinum Retail Multilanguage 破解版
  8. 腾讯员工平均年薪近百万,工程师一个月赚8万!网友:我和马云财产加起来过千亿,我骄傲了嘛?...
  9. 结巴分词 java 权重_结巴分词 (转载)
  10. git 生成ssh公钥