关于 Topic 和 Partition:

  Topic:

在 kafka 中,topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有一个类别。物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。

  Partition:

  每个 topic 可以划分多个分区(每个 Topic 至少有一个分区),同一 topic 下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka只保证在同一个分区内的消息是有序的。下图中,对于名字为 test 的 topic,做了 3 个分区,分别是p0、p1、p2.

➢ 每一条消息发送到 broker 时,会根据 partition 的规则选择存储到哪一个 partition。如果 partition 规则设置合理,那么所有的消息会均匀的分布在不同的partition中,这样就有点类似数据库的分库分表的概念,把数据做了分片处理。

  Topic&Partition 的存储:

  Partition 是以文件的形式存储在文件系统中,比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3,命名规则是<topic_name>-<partition_id>,创建3个分区的topic:

1

sh kafka-topics.sh --create --zookeeper 192.168.254.135:2181 --replication-factor 1 --partitions 3 --topic firstTopic

kafka 消息分发策略:

  消息是 kafka 中最基本的数据单元,在 kafka 中,一条消息由 key、value 两部分构成,在发送一条消息时,我们可以指定这个 key,那么 producer 会根据 key 和 partition 机制来判断当前这条消息应该发送并存储到哪个 partition 中。我们可以根据需要进行扩展 producer 的 partition 机制。

  我们可以通过如下代码来实现自己的分片策略:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

public class MyPartition implements Partitioner {//实现Partitioner接口

    private Random random=new Random();<br>

    @Override

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        //获得分区列表

        List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(topic);

        int partitionNum=0;

        if(key==null){

            partitionNum=random.nextInt(partitionInfos.size()); //随机分区

        }else{

            partitionNum=Math.abs((key.hashCode())%partitionInfos.size());

        }

        System.out.println("key ->"+key+"->value->"+value+"->"+partitionNum);

        return partitionNum;  //指定发送的分区值

    }

    @Override

    public void close() {

    }

    @Override

    public void configure(Map<String, ?> configs) {

    }

}

  然后基于之前的代码在producer上需要在消息发送端增加配置:指定自己的partiton策略

1

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gupaoedu.kafka.MyPartition");

消息默认的分发机制:

  默认情况下,kafka 采用的是 hash 取模的分区算法。如果Key 为 null,则会随机分配一个分区。这个随机是在这个参数”metadata.max.age.ms”的时间范围内随机选择一个。对于这个时间段内,如果 key 为 null,则只会发送到唯一的分区。这个值在默认情况下是 10 分钟更新一次。关 于 Metadata ,简单理解就是Topic/Partition 和 broker 的映射关系,每一个 topic 的每一个 partition,需要知道对应的 broker 列表是什么,leader是谁、follower 是谁。这些信息都是存储在 Metadata 这个类里面。

消费端如何消费指定的分区:

  通过下面的代码,就可以消费指定该 topic 下的 0 号分区。其他分区的数据就无法接收。

1

2

3

4

5

//消费指定分区的时候,不需要再订阅

//kafkaConsumer.subscribe(Collections.singletonList(topic));

//消费指定的分区

TopicPartition topicPartition=new TopicPartition(topic,0);

kafkaConsumer.assign(Arrays.asList(topicPartition));

消息的消费原理:

  在实际生产过程中,每个 topic 都会有多个 partitions,多个 partitions 的好处在于,一方面能够对 broker 上的数据进行分片有效减少了消息的容量从而提升 io 性能。另外一方面,为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic ,也就是消费端的负载均衡机制,也就是我们接下来要了解的,在多个 partition 以及多个 consumer 的情况下,消费者是如何消费消息的?kafka 存在 consumer group的 概 念 , 也 就是 group.id 一 样 的 consumer ,这些consumer 属于一个 consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的 consumer 来消费,那么同一个consumer group 里面的 consumer 是怎么去分配该消费哪个分区里的数据的呢?举个简单的例子就是如果存在的分区输,即partiton的数量于comsumer数量一致的时候,每个comsumer对应一个分区,如果comsumer数量多于分区,那么多出来的数量的comsumer将不工作,相反则是其中将会有comsumer消费多个分区。

  分区分配策略:

  在 kafka 中,存在两种分区分配策略,一种是 Range(默认)、另 一 种 另 一 种 还 是 RoundRobin ( 轮 询 )。 通 过comsumer的配置partition.assignment.strategy 这个参数来设置。

  Range strategy(范围分区): 

  Range 策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。假设我们有 10 个分区,3 个消费者,排完序的分区将会是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将 partitions 的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。比如我们有 10 个分区,3 个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

  C1-0 将消费 0, 1, 2, 3 分区

  C2-0 将消费 4, 5, 6 分区

  C3-0 将消费 7, 8, 9 分区

假如我们有 11 个分区,那么最后分区分配的结果看起来是这样的:

  C1-0 将消费 0, 1, 2, 3 分区

  C2-0 将消费 4, 5, 6, 7 分区

  C3-0 将消费 8, 9, 10 分区

假如我们有 2 个主题(T1 和 T2),分别有 10 个分区,那么最后分区分配的结果看起来是这样的:

  C1-0 将消费 T1 主题的 0, 1, 2, 3 分区以及 T2 主题的 0, 1, 2, 3 分区

  C2-0 将消费 T1 主题的 4, 5, 6 分区以及 T2 主题的 4, 5, 6 分区

  C3-0 将消费 T1 主题的 7, 8, 9 分区以及 T2 主题的 7, 8, 9 分区

可以看出,C1-0 消费者线程比其他消费者线程多消费了 2 个分区,这就是 Range strategy 的一个很明显的弊端.

  RoundRobin strategy(轮询分区):

  轮询分区策略是把所有 partition 和所有 consumer 线程都列出来,然后按照 hashcode 进行排序。最后通过轮询算法分配 partition 给消费线程。如果所有 consumer 实例的订阅是相同的,那么 partition 会均匀分布。假如按照 hashCode 排序完的 topic&partitions 组依次为 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为 C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:

  C1-0 将消费 T1-5, T1-2, T1-6 分区;

  C1-1 将消费 T1-3, T1-1, T1-9 分区;

  C2-0 将消费 T1-0, T1-4 分区;

  C2-1 将消费 T1-8, T1-7 分区;

  使用轮询分区策略必须满足两个条件

    1. 每个主题的消费者实例具有相同数量的流

    2. 每个消费者订阅的主题必须是相同的

  什么时候会触发这个策略呢?当出现以下几种情况时,kafka 会进行一次分区分配操作,也就是 kafka consumer 的 rebalance。

  1. 同一个 consumer group 内新增了消费者。

  2. 消费者离开当前所属的 consumer group,比如主动停机或者宕机。

  3. topic 新增了分区(也就是分区数量发生了变化)。

  4.消费者主动取消订阅topic

  kafka consuemr 的 rebalance 机制规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic的每个分区。而具体如何执行分区策略,就是前面提到过的两种内置的分区策略。而 kafka 对于分配策略这块,提供了可插拔的实现方式, 也就是说,除了这两种之外,我们还可以创建自己的分配机制。可以通过继承 AbstractPartitionAssignor 抽象类实现 assign 来做到。

  谁来执行 Rebalance 以及管理 consumer 的 group 呢?

  Kafka 提供了一个角色:coordinator(协调员) 来执行对于 consumer group 的管理,当 consumer group 的第一个 consumer 启动的时候,它会去和 kafka server(broker) 确定谁是它们组的 coordinator。之后该 group 内的所有成员都会和该 coordinator 进行协调通信。consumer group 如何确定自己的 coordinator 是谁呢? 消费 者 向 kafka 集 群 中 的 任 意 一 个 broker 发 送 一 个GroupCoordinatorRequest 请求,服务端会返回一个负载最 小 的 broker 节 点 的 id , 并 将 该 broker 设 置 为coordinator。在 rebalance 之前,需要保证 coordinator 是已经确定好了的,整个 rebalance 的过程分为两个步骤 ,一个是JoinGroup 的过程,在这个过程之后会进入一个Synchronizing Group State 阶段。那么这两个阶段都做了什么呢?

  JoinGroup 的过程:

  表示加入到 consumer group 中,在这一步中,所有的成员都会向 coordinator 发送 joinGroup 的请求。一旦所有成员都发送了 joinGroup 请求,那么 coordinator 会选择一个 consumer 担任 leader 角色,并把组成员信息和订阅信息发送消费者。下图就是描述了这么一个过程,并且请求与响应中携带的一些重要的信息。

  protocol_metadata: 序列化后的消费者的订阅信息

  leader_id: 消费组中的消费者,coordinator 会选择一个座位 leader,对应的就是 member_id

  member_metadata 对应消费者的订阅信息

  members:consumer group 中全部的消费者的订阅信息

  generation_id:年代信息,类似于 zookeeper 的时候的 epoch 是一样的,对于每一轮 rebalance ,generation_id 都会递增。主要用来保护 consumer group。隔离无效的 offset 提交。也就是上一轮的      consumer 成员无法提交 offset 到新的 consumer group 中。

  Synchronizing Group State 阶段:

  进入了 Synchronizing Group State阶段,主要逻辑是向 GroupCoordinator 发 送SyncGroupRequest 请求,并且处理 SyncGroupResponse响应,简单来说,就是 leader 将消费者对应的 partition 分配方案同步给 consumer group 中的所有 consumer,每个消费者都会向 coordinator 发送 syncgroup 请求,不过只有 leader 节点会发送分配方案,其他消费者只是打打酱油而已。当 leader 把方案发给 coordinator 以后,coordinator 会把结果设置到 SyncGroupResponse 中。这样所有成员都知道自己应该消费哪个分区。

  member_assignment :在syncGroup发送请求的时候,只有leader角色的comsumer才会去发送这个信息,而其他消费端是空的。然后会通过coordinator去分发给各个comsumer。

  ➢ consumer group 的分区分配方案是在客户端执行的!Kafka 将这个权利下放给客户端主要是因为这样做可以有更好的灵活性

offset :

  每个 topic可以划分多个分区(每个 Topic 至少有一个分区),同一topic 下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka 只保证在同一个分区内的消息是有序的; 对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个 offset。那么 offset 保存在哪里?

  这个重要的topic我们是不允许其出现单点故障的,所以我们需要在其生成都时候就创建副本,可是默认副本数是1 ,我们可以通过调整参数去修改:

offsets.topic.replication.factor=3

  offset 在哪里维护?

  在 kafka 中,提供了一个__consumer_offsets_* 的一个topic , 把 offset 信 息 写 入 到 这 个 topic 中 。__consumer_offsets——保存了每个 consumer group某一时刻提交的 offset 信息。__consumer_offsets 默认有50 个分区。可以在 /tmp/kafka-logs/ 下查看。那么如何查看对应的 consumer_group 保存在哪个分区中呢?

  通过Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由 于 默 认 情 况 下groupMetadataTopicPartitionCount 有 50 个分区,计算得到的结果为:4, 意味着当前的 consumer_group 的位移信息保存在__consumer_offsets 的第 4个分区,执行如下命令,可以查看当前 consumer_goup 中的offset 位移信息,消费端需保持连接状态。

1

sh kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 4 --broker-list 192.168.254.135:9092,192.168.254.136:9092,192.168.254.137:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

  或者 2.11-2.3.0版本。需要确保listeners=PLAINTEXT://192.168.1.101:9092 。外部代理地址 advertised.listeners=PLAINTEXT://192.168.1.101:9092都已经修改,且消费者已经有所消费,否者会卡着。

1

sh /mysoft/kafka/bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 35 --bootstrap-server 192.168.254.135:9092,192.168.254.136:9092,192.168.254.137:9092 --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

执行语句可以看到如下结果:

  这个意思就是 KafkaConsumerDemo1 消费组在 testTopic 中现在的 offsets 现在是 111.

动态增加Topic的副本(Replication):

  对于 __consumer_offsets 这个topic,在我们没有修改配置的情况下其默认的副本数量是 1 ,这种情况会出现的问题是消费组所对应的机器挂了会导致某一些消费者无法继续消费,当服务重启后,我们可以进行动态扩容副本数量。

  首先我们需要运行以下命令查看指定 Topic的情况:

sh kafka-topics.sh --topic __consumer_offsets  --describe --zookeeper 192.168.1.101:2181

  执行后会出现以下信息:

  紧接着,我们需要准备一个扩容的Json 文件(replication.json):

{"version": 1, "partitions": [{"topic": "__consumer_offsets", //哪个topic"partition": 35, //指定哪个分区"replicas": [//这里是机器的Id1, 2, 3]},{"topic": "__consumer_offsets", //哪个topic"partition": 36, //指定哪个分区"replicas": [//这里是机器的Id1, 2, 3]}//........可以多个]
}

  接下去需要执行以下命令:

sh kafka-reassign-partitions.sh --zookeeper 192.168.1.101:2181 --reassignment-json-file replication.json --execute   

  执行完会出现:

  可以执行以下命令验证执行结果:sh kafka-reassign-partitions.sh --zookeeper 192.168.1.101:2181 --reassignment-json-file replication.json --verify

  接着可以去zookeeper上查看该分区的副本情况:

  或者直接到kafka Topic数据目录下查看即可。

消息的存储:

  首先我们需要了解的是,kafka 是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,Log 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录(/tmp/kafka-log,这里可以通过server.properties中的log.dirs=/tmp/kafka-logs去修改)中就有 3 个目录,firstTopic-0~3多个分区在集群中的分配 如果我们对于一个 topic,在集群中创建多个 partition,那么 partition 是如何分布的呢?

1.将所有 N Broker 和待分配的 i 个 Partition 排序
2.将第 i 个 Partition 分配到第(i mod n)个 Broker 上

  结合前面讲的消息分发策略,就应该能明白消息发送到 broker 上,消息会保存到哪个分区中,并且消费端应该消费哪些分区的数据了。

幂等性:

  所谓的幂等,简单说就是对接口的多次调用所产生的结果和调用一次是一致的。在0.11.0.0版本引入了创建幂等性Producer的功能。仅需要设置props.put(“enable.idempotence”,true),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。enable.idempotence设置成true后,Producer自动升级成幂等性Producer。Kafka会自动去重。Broker会多保存一些字段。当Producer发送了相同字段值的消息后,Broker能够自动知晓这些消息已经重复了。作用范围:

  1. 只能保证单分区上的幂等性,即一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息。
  2. 只能实现单回话上的幂等性,这里的会话指的是Producer进程的一次运行。当重启了Producer进程之后,幂等性不保证。

事务型消息:

  Kafka在0.11版本开始提供对事务的支持,提供是read committed隔离级别的事务。保证多条消息原子性地写入到目标分区,同时也能保证Consumer只能看到事务成功提交的消息。

事务性Producer:

  保证多条消息原子性地写入到多个分区中。这批消息要么全部成功,要不全部失败。事务性Producer也不惧进程重启。设置:

properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//开启enable.idempotence = true
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-id");//设置Producer端参数 transactional.id

  除此之外,还要加上调用事务API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,分别应对事务的初始化、事务开始、事务提交以及事务终止。如下:

// kafka  事务型消息
producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}

  这段代码能保证record1和record2被当做一个事务同一提交到Kafka,要么全部成功,要么全部写入失败。

Consumer端的设置:

  设置 isolation.level参数,目前有两个取值:

  1. read_uncommitted:默认值表明Consumer端无论事务型Producer提交事务还是终止事务,其写入的消息都可以读取。
  2. read_committed:表明Consumer只会读取事务型Producer成功提交事务写入的消息。注意,非事务型Producer写入的所有消息都能看到。

kafka消息的分发与消费(一)相关推荐

  1. kafka消息反复从头开始消费问题排查

    问题描述   最近线上的一个数据服务(服务B)出现了一个比较诡异的问题 ,该服务消费上游服务(服务A)产生的kafka消息数据,上线后一直运行平稳,最近一周在两次上线的时候出现了大量数据更新的情况,查 ...

  2. Kafka消息丢失、重复消费的解决方案

    文章目录 生产者问题 消费者问题 问题总结 解决方案 生产者问题 Producer发送消息到队列,分区Leader收到消息后返回ACK给Producer,表示接收成功,此时可以继续发送下一笔消息. K ...

  3. Kafka整体结构图、Consumer与topic关系、Kafka消息分发、Consumer的负载均衡、Kafka文件存储机制、Kafka partition segment等(来自学习资料)

    ##1. Kafka整体结构图 Kafka名词解释和工作方式  Producer : 消息生产者,就是向kafka broker发消息的客户端.  Consumer : 消息消费者,向kafka ...

  4. Kafka消息积压案例分析

    案例 一个微服务同一个分组消费同一个topic的kafka消息,不通业务通过key值区分,由于其中一个业务消息量大,偶尔会出现消费滞后的情况,导致当前微服务消费组出现大量消息积压情况,影响业务. 简单 ...

  5. kafka消息消费有延迟_RabbitMQ与Kafka的技术差异以及使用注意点

    导言 作为一个有丰富经验的微服务系统架构师,经常有人问我,"应该选择RabbitMQ还是Kafka?".基于某些原因, 许多开发者会把这两种技术当做等价的来看待.的确,在一些案例场 ...

  6. kafka消息消费有延迟_注意了!Kafka与RabbitMQ千万不要乱用…

    作为一个有丰富经验的微服务系统架构师,经常有人问我,应该选择 RabbitMQ 还是 Kafka? 图片来自 Pexels 基于某些原因, 许多开发者会把这两种技术当做等价的来看待.的确,在一些案例场 ...

  7. kafka 消息分发机制、分区和副本机制

    一.消息分发机制 1.1 kafka 消息分发策略 消息是 kafka 中最基本的数据单元,在 kafka 中,一条消息由key.value两部分构成,在发送一条消息 时,我们可以指定这个key,那么 ...

  8. Kafka压力测试(写入MQ消息压测和消费MQ消息压测)

    1.测试目的 本次性能测试在正式环境下单台服务器上Kafka处理MQ消息能力进行压力测试.测试包括对Kafka写入MQ消息和消费MQ消息进行压力测试,根据10w.100w和1000w级别的消息处理结果 ...

  9. 【消息队列】kafka是如何保证消息不被重复消费的

    一.kafka自带的消费机制 kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offs ...

最新文章

  1. 中兴ZXR10交换机配置手册
  2. 理解Java的NIO
  3. 如何创建比C语言更快的编程语言?
  4. 安卓手机上微信无法打开Https网址的完美解决方案
  5. swift 笔记 (十四) —— 构造过程
  6. 解决Vmware虚拟机中没有网络连接Ubuntu无法上网
  7. 12.看板方法---度量和管理报告
  8. java hex2bin_hex2bin / bin2hex / pack / unpack 的理解及应用
  9. 计算机毕业设计springboot门诊管理系统
  10. mathtype过期,不用每次都去回顾教程
  11. 如何识别pdf文档中的文字(图像识别)python
  12. 突发!阿里巴巴大调整
  13. 电机或编码器相关的 CW 与 CCW
  14. 「SQL数据分析系列」3.查询入门
  15. C#:实现 Van Eck‘s sequence范·艾克序列算法(附完整源码)
  16. java 有五个学生 每个学生有3门课_1、有五个学生,每个学生有3门课(语文、数学、英语)的成绩, 写一...
  17. 通过指定cellid获取周围cellid信息,改变指定cellid的颜色
  18. unittest---unittest中verbosity参数设置
  19. rtf格式内容转html
  20. 908. Smallest Range I

热门文章

  1. 使用 IDEA 解决 Java8 的数据流问题,极大提升生产力!!
  2. 每日一皮:Bug 变 Feature !惊不惊喜,意不意外,刺不刺激!
  3. JVM调优实战:G1中的to-space exhausted问题
  4. 如何通过抓包实战来学习Web协议?
  5. 选项类 oracle ebs,Oracle EBS工具选项:关闭其他表单修改方法
  6. json中怎么去掉[]外的引号_SEO优化中怎么做站内和站外的锚文本
  7. 帝国cms7.5百度小程序针对搜索引擎自然搜索优化版生成静态版
  8. python下载mp4
  9. python softmax函数
  10. python 图片打印文章总结