转载地址:https://www.linuxidc.com/Linux/2018-11/155192.htm

1.  前言

我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名义订阅),而主题下是分区,消息是存储在分区中的,所以事实上生产者发送消息到分区,消费者则从分区读取消息,那么,这里问题来了,生产者将消息投递到哪个分区?消费者组中的消费者实例之间是怎么分配分区的呢?接下来,就围绕着这两个问题一探究竟。

2.  主题的分区数设置

在server.properties配置文件中可以指定一个全局的分区数设置,这是对每个主题下的分区数的默认设置,默认是1。

当然每个主题也可以自己设置分区数量,如果创建主题的时候没有指定分区数量,则会使用server.properties中的设置。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1
在创建主题的时候,可以使用--partitions选项指定主题的分区数量

[root@localhost kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abc
Topic:abc PartitionCount:2 ReplicationFactor:1 Configs:
Topic: abc Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: abc Partition: 1 Leader: 0 Replicas: 0 Isr: 0
3.  生产者与分区

首先提出一个问题:生产者将消息投递到分区有没有规律?如果有,那么它是如何决定一条消息该投递到哪个分区的呢?

3.1.  默认的分区策略

The default partitioning strategy:

If a partition is specified in the record, use it
If no partition is specified but a key is present choose a partition based on a hash of the key
If no partition or key is present choose a partition in a round-robin fashion
org.apache.kafka.clients.producer.internals.DefaultPartitioner

默认的分区策略是:

如果在发消息的时候指定了分区,则消息投递到指定的分区
如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

通过源代码可以更加作证这一点

4.  分区与消费者
消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,那么消费者实例和分区之前的对应关系是怎样的呢?

换句话说,就是组中的每一个消费者负责那些分区,这个分配关系是如何确定的?

同一时刻,一条消息只能被组中的一个消费者实例消费

消费者组订阅这个主题,意味着主题下的所有分区都会被组中的消费者消费到,如果按照从属关系来说的话就是,主题下的每个分区只从属于组中的一个消费者,不可能出现组中的两个消费者负责同一个分区。

那么,问题来了。如果分区数大于或者等于组中的消费者实例数,那自然没有什么问题,无非一个消费者会负责多个分区,(PS:当然,最理想的情况是二者数量相等,这样就相当于一个消费者负责一个分区);但是,如果消费者实例的数量大于分区数,那么按照默认的策略(PS:之所以强调默认策略是因为你也可以自定义策略),有一些消费者是多余的,一直接不到消息而处于空闲状态。

话又说回来,假设多个消费者负责同一个分区,那么会有什么问题呢?

我们知道,Kafka它在设计的时候就是要保证分区下消息的顺序,也就是说消息在一个分区中的顺序是怎样的,那么消费者在消费的时候看到的就是什么样的顺序,那么要做到这一点就首先要保证消息是由消费者主动拉取的(pull),其次还要保证一个分区只能由一个消费者负责。倘若,两个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,则会造成很多浪费,因为这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异。

4.1.  消费者分区分配策略

org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

如果是自定义分配策略的话可以继承AbstractPartitionAssignor这个类,它默认有3个实现

4.1.1.  range

range策略对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor

这是默认的分配策略

可以通过消费者配置中partition.assignment.strategy参数来指定分配策略,它的值是类的全路径,是一个数组

/**
 * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
 * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
 * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
 * divide, then the first few consumers will have one extra partition.
 *
 * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
 * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
 *
 * The assignment will be:
 * C0: [t0p0, t0p1, t1p0, t1p1]
 * C1: [t0p2, t1p2]
 */

range策略是基于每个主题的

对于每个主题,我们以数字顺序排列可用分区,以字典顺序排列消费者。然后,将分区数量除以消费者总数,以确定分配给每个消费者的分区数量。如果没有平均划分(PS:除不尽),那么最初的几个消费者将有一个额外的分区。

简而言之,就是,

1、range分配策略针对的是主题(PS:也就是说,这里所说的分区指的某个主题的分区,消费者值的是订阅这个主题的消费者组中的消费者实例)

2、首先,将分区按数字顺序排行序,消费者按消费者名称的字典序排好序

3、然后,用分区总数除以消费者总数。如果能够除尽,则皆大欢喜,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区

例如,假设有两个消费者C0和C1,两个主题t0和t1,并且每个主题有3个分区,分区的情况是这样的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,基于以上信息,最终消费者分配分区的情况是这样的:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

为什么是这样的结果呢?

因为,对于主题t0,分配的结果是C0负责P0和P1,C1负责P2;对于主题t2,也是如此,综合起来就是这个结果

上面的过程用图形表示的话大概是这样的:

阅读代码,更有助于理解:

public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
    //    主题与消费者的映射                                                            
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());

for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        String topic = topicEntry.getKey();    //    主题
        List<String> consumersForTopic = topicEntry.getValue();    //    消费者列表

//    partitionsPerTopic表示主题和分区数的映射
        //    获取主题下有多少个分区
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic == null)
            continue;

//    消费者按字典序排序
        Collections.sort(consumersForTopic);

//    分区数量除以消费者数量
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        //    取模,余数就是额外的分区
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            //    分配分区
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}

4.1.2. roundrobin(轮询)

roundronbin分配策略的具体实现是org.apache.kafka.clients.consumer.RoundRobinAssignor

/**
 * The round robin assignor lays out all the available partitions and all the available consumers. It
 * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer
 * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
 * will be within a delta of exactly one across all consumers.)
 *
 * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
 * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
 *
 * The assignment will be:
 * C0: [t0p0, t0p2, t1p1]
 * C1: [t0p1, t1p0, t1p2]
 *
 * When subscriptions differ across consumer instances, the assignment process still considers each
 * consumer instance in round robin fashion but skips over an instance if it is not subscribed to
 * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced
 * assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2,
 * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0,
 * t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
 *
 * Tha assignment will be:
 * C0: [t0p0]
 * C1: [t1p0]
 * C2: [t1p1, t2p0, t2p1, t2p2]
 */

轮询分配策略是基于所有可用的消费者和所有可用的分区的

与前面的range策略最大的不同就是它不再局限于某个主题

如果所有的消费者实例的订阅都是相同的,那么这样最好了,可用统一分配,均衡分配

例如,假设有两个消费者C0和C1,两个主题t0和t1,每个主题有3个分区,分别是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,最终分配的结果是这样的:

C0: [t0p0, t0p2, t1p1]

C1: [t0p1, t1p0, t1p2]

用图形表示大概是这样的:

假设,组中每个消费者订阅的主题不一样,分配过程仍然以轮询的方式考虑每个消费者实例,但是如果没有订阅主题,则跳过实例。当然,这样的话分配肯定不均衡。

什么意思呢?也就是说,消费者组是一个逻辑概念,同组意味着同一时刻分区只能被一个消费者实例消费,换句话说,同组意味着一个分区只能分配给组中的一个消费者。事实上,同组也可以不同订阅,这就是说虽然属于同一个组,但是它们订阅的主题可以是不一样的。

例如,假设有3个主题t0,t1,t2;其中,t0有1个分区p0,t1有2个分区p0和p1,t2有3个分区p0,p1和p2;有3个消费者C0,C1和C2;C0订阅t0,C1订阅t0和t1,C2订阅t0,t1和t2。那么,按照轮询分配的话,C0应该负责

首先,肯定是轮询的方式,其次,比如说有主题t0,t1,t2,它们分别有1,2,3个分区,也就是t0有1个分区,t1有2个分区,t2有3个分区;有3个消费者分别从属于3个组,C0订阅t0,C1订阅t0和t1,C2订阅t0,t1,t2;那么,按照轮询分配的话,C0应该负责t0p0,C1应该负责t1p0,其余均由C2负责。

上述过程用图形表示大概是这样的:

为什么最后的结果是

C0: [t0p0]

C1: [t1p0]

C2: [t1p1, t2p0, t2p1, t2p2]

这是因为,按照轮询t0p1由C0负责,t1p0由C1负责,由于同组,C2只能负责t1p1,由于只有C2订阅了t2,所以t2所有分区由C2负责,综合起来就是这个结果

细想一下可以发现,这种情况下跟range分配的结果是一样的

5.  测试代码

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

<groupId>com.linuxidc.example</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

<name>kafka-demo</name>
    <description></description>

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

<build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

package com.linuxidc.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class HelloProducer {

public static void main(String[] args) {

Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("abc", Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e) {
                        e.printStackTrace();
                    }else {
                        System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());
                    }
                }
            });
        }
        producer.close();

}
}

package com.linuxidc.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class HelloConsumer {

public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
//        props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("foo", "bar", "abc"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

Kafka分区与消费者的关系相关推荐

  1. Kafka生产者和消费者分区策略部分源码解析

    之前我在看其他的博客时,发现对于kafka consumer的RoundRobin的缺点分析中,有两种观点,一种认为缺点在于如果消费者组中消费者消费的主题不同,或者消费者线程数不同,那么会造成消费者消 ...

  2. kafka的副本以及分区与副本的关系

    一 副本的作用 1.Kafka 副本作用:提高数据可靠性. 2.Kafka 中副本分为:Leader 和 Follower.Kafka 生产者只会把数据发往 Leader, 然后 Follower 找 ...

  3. 深入分析Kafka生产者和消费者

    深入Kafka生产者和消费者 Kafka生产者 消息发送的流程 发送方式 发送并忘记 同步发送 异步发送 生产者属性配置 序列化器 分区器 自定义分区器 Kafka消费者 消费者属性配置 消费者基础概 ...

  4. Kafka生产者与消费者详解

    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...

  5. Kafka学习之消费者

    Kafka学习之消费者 前言 本博客主要介绍up在学习kafka中间件时候觉得需要记录的知识点. 内容 1.消费者与消费组 消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订 ...

  6. Kafka-partition和消费者的关系

    背景:我们在kafka经常会听到分区(partition)和消费者,消费者组,那么到底有什么关系呢,下面我们抛开kafka的其他问题,单纯的聊一聊这二者的关系,方便大家理解 一.kafka为什么要分区 ...

  7. 关于Kafka 的 consumer 消费者手动提交详解

    前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...

  8. Kafka分区分配策略(4)——分配的实施

    接上文: 1.[Kafka分区分配策略(1)--RangeAssignor] 2.[Kafka分区分配策略(2)--RoundRobinAssignor和StickyAssignor] 3.[Kafk ...

  9. Kafka分区分配策略(3)——自定义分区分配策略

    接上文: 1.[Kafka分区分配策略(1)--RangeAssignor] 2.[Kafka分区分配策略(2)--RoundRobinAssignor和StickyAssignor] 欢迎支持笔者新 ...

  10. Kafka分区分配策略(2)——RoundRobinAssignor和StickyAssignor

    接上文[Kafka分区分配策略(1)--RangeAssignor] 欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔 ...

最新文章

  1. Head First设计模式之目录
  2. 个人博客满血复活,求测试~~~
  3. python网络编程—Socket
  4. jquery 选择器、筛选器、事件绑定与事件委派
  5. python高通滤波_图像处理之高通滤波及低通滤波
  6. Oracle--sqlplus如何设置SQLPlus结果显示的宽度,ORACLE sqlplus提示符设置
  7. 下载并安装mercurial/hg
  8. 华为机试 第二题-速战速决
  9. vscode怎么设置动态背景
  10. html修改文字颜色代码
  11. 成功的礼品公司的产品经营模式
  12. 为什么浙江初中数学用计算机,计算器对初中数学学习几点看法
  13. php 如何启动ica文件,什么是ICA文件?Win10专业版如何打开ICA文件?
  14. C++中闭包的简单实现
  15. 富士康承认使用未成年实习生称将解聘失职员工-富士康-未成年-实习生
  16. 飞桨paddle遇到bug调试修正【迁移工具、版本兼容性】
  17. leetcode792:匹配子序列的单词数
  18. sklearn 混淆矩阵分析pima 印第安人糖尿病数据
  19. 用友政务表格技术应用开发实践:预算一体化产品核心功能搭建
  20. 我编程我快乐——读后感(前奏)

热门文章

  1. 使用 Travis 自动部署 Hexo 到 Github 与 自己的服务器
  2. centos6 系统安装 system-config-kickstart 工具
  3. DS实验题 Searchname
  4. CSS圆角兼容IE6
  5. 基于semisync实现MySQL的主从半同步复制
  6. 小记linux如何挂载window下的共享文件
  7. [转]Authority-check
  8. html++留言板增加删除,实现留言板删除留言的具体思路跟操作
  9. java 判断是合法语言_使用Java 怎么实现一个判断IP地址是否合法的功能
  10. mysql 最小值对应的其他属性_查询最小值对应的非group by字段