Kafka学习记录(四)——消费者

目录

  • Kafka学习记录(四)——消费者
    • 对应课程
    • Kafka消费者工作流程
      • 消费方式和流程
      • 消费者组原理
      • 消费者组初始化流程
      • 消费者组详细消费流程
      • 重要参数
    • kafka消费者JavaAPI
      • 独立消费者
      • 消费者组
    • 分区策略
      • range分区策略原理
      • RoundRobin分区策略
      • Sticky分区策略
    • offset位移
      • 自动提交offset
      • 手动提交offset
      • 指定offset消费
    • 消费者事务
      • 重复消费和漏消费
      • 消费者事务
    • 如何提高吞吐量

对应课程

【尚硅谷】2022版Kafka3.x教程(从入门到调优,深入全面)

Kafka消费者工作流程

消费方式和流程

Kafka没有采用push的消费方式,因为由broker决定消息发送速率,很难适应所有消费者的消费速率。例如,推送的速度是50m/s,消费速度小于50m/s的消费者就来不及处理消息。因此,consumer采用从broker中主动拉取数据的pull方式。pull模式不足之处是,如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。

消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者

• 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。

• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

消费者组初始化流程

触发再平衡的两种情况:消费者和coordinator之间超过session.timeout.ms没有保持心跳,则该消费者会被移除;消费者处理消息的时长超过max.poll.interval.ms。

消费者组详细消费流程

重要参数

bootstrap.servers:向Kafka 集群建立初始连接用到的host/port列表。

key.deserializer和value.deserializer:指定接收消息的key 和value 的反序列化类型。一定要写全类名。

group.id:标记消费者所属的消费者组。

enable.auto.commit:默认值为true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms:如果enable.auto.commit设置为true, 则该值定义了消费者偏移量向Kafka 提交的频率,默认5s。

auto.offset.reset:当Kafka 中没有初始偏移量或当前偏移量在服务器中不存在时,earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。

offsets.topic.num.partitions:__consumer_offsets 的分区数,默认是50 个分区。

heartbeat.interval.ms:Kafka 消费者和coordinator 之间的心跳时间,默认3s。

session.timeout.ms:Kafka 消费者和coordinator 之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms:消费者处理消息的最大时长,默认是 5 分钟 。超过该值,该消费者被移除,消费者组执行再平衡。

fetch.min.bytes:默认1 个字节。消费者获取服务器端一批消息最小的字节数。

fetch.max.wait.ms:默认500ms 。如果没有从服务器端获取到一批数据的最小字节数 。该时间到,仍然会返回数据。

fetch.max.bytes:默认50m ,消费者获取服务器端一批消息最大的字节数 。

max.poll.records:一次poll拉取数据返回消息的最大条数, 默认是500条 。

kafka消费者JavaAPI

独立消费者

创建一个独立的消费者,消费first主题下的0号分区的数据。

CustomerConsumerPartitioner.java

package com.jd.springboot_kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;public class CustomerConsumerPartitioner {public static void main(String[] args) {// 0 配置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop101:9092,hadoop102:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指明消费者组的idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");// 1 创建kafka消费者对象KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(properties);//2 添加主题和分区List<TopicPartition> partitions = new ArrayList<TopicPartition>();TopicPartition topicPartition = new TopicPartition("first",1);partitions.add(topicPartition);kafkaConsumer.assign(partitions);// 3 消费数据while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> consumerRecord: consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者组

需求:测试同一个主题的分区数据只能由一个消费者组中的一个消费。

复制3份groupid一致的Consumer:

package com.jd.springboot_kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class CustomConsumer {public static void main(String[] args) {// 0 配置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop101:9092,hadoop102:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指明消费者组的idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");// 1 创建kafka消费者对象KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(properties);//2 添加订阅主题List<String> topics = new ArrayList<String>();topics.add("first");kafkaConsumer.subscribe(topics);// 3 消费数据while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> consumerRecord: consumerRecords) {System.out.println(consumerRecord);}}}}

分区策略

range分区策略原理

Range分区再平衡策略:

如果一个消费者被踢出消费者组,那么,将重新按照range方式分配。

RoundRobin分区策略

RoundRobin分区再平衡策略:

如果一个消费者被踢出消费者组,那么,将重新按照RoundRobin方式分配。

Sticky分区策略

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是Kafka 从 0.11.x 版本开始引入这种分配策略 首先会尽量均衡的放置分区到消费者上面 ,在出现同一消费者组内消费者出现问题的时候,会 尽量保持原有分配的分区不变化。

offset位移

Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中。而从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。

__consumer_offsets主题里面采用key和value的方式存储数据。key是group.id+topic+分区号, value就是当前offset的值。每隔一段时间, kafka内部会对这个topic进行compact,也就是每个group.id+topic+分区号就保留最新数据。

自动提交offset

手动提交offset

每拉取完一次数据,可以调用以下JavaAPI完成手动提交:

consumer.commitSync();
consumer.commitAsync();

在此之前,需要把自动提交配置成false:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

指定offset消费

消费各分区offset为300之后的消息,注意:需要在消费前获得全部的topic的分区情况。

CustomConsumerSeekOffset.java

package com.jd.springboot_kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class CustomConsumerSeekOffset {public static void main(String[] args) {// 0 配置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop101:9092,hadoop102:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指明消费者组的idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");// 1 创建kafka消费者对象KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(properties);//2 添加订阅主题List<String> topics = new ArrayList<String>();topics.add("first");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment = new HashSet<TopicPartition>();//获取主题分区集合,assignment需在拉取完一次数据while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(5));assignment = kafkaConsumer.assignment();}for (TopicPartition tp : assignment) {System.out.println("主题:"+tp.topic()+"中的分区:"+tp.partition());kafkaConsumer.seek(tp,300);}// 3 消费数据(offset在300以上)while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> consumerRecord: consumerRecords) {System.out.println(consumerRecord);}}}
}

指定时间戳offset消费

思想:将指定的时间戳转换为对应的offset。

消费各分区昨日之后的消息CustomConsumerSeekTime.java:

package com.jd.springboot_kafka.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class CustomConsumerSeekTime {public static void main(String[] args) {// 0 配置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop101:9092,hadoop102:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指明消费者组的idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");// 1 创建kafka消费者对象KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(properties);//2 添加订阅主题List<String> topics = new ArrayList<String>();topics.add("first");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment = new HashSet<TopicPartition>();//获取主题分区集合,assignment需在拉取完一次数据while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(5));assignment = kafkaConsumer.assignment();}//把时间转换为对应的offsetMap<TopicPartition, Long> timestampsToSearch = new HashMap<>();for (TopicPartition tp : assignment) {//指定每个主题分区的时间偏移timestampsToSearch.put(tp,System.currentTimeMillis() - 1*24*3600*1000);}Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);for (TopicPartition tp : assignment) {kafkaConsumer.seek(tp,topicPartitionOffsetAndTimestampMap.get(tp).offset());}// 3 消费数据(offset在300以上)while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> consumerRecord: consumerRecords) {System.out.println(consumerRecord);}}}
}

即使消息已被提交,但我们依然可以使用 seek() 方法来消费符合一些条件的消息,这样为消息的消费提供了很大的灵活性。

消费者事务

重复消费和漏消费

消费者事务

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如:MySQL)。

如何提高吞吐量

1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数= 分区数。(两者缺一不可);

2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间< 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

相关参数:

fetch.max.bytes:默认50m。消费者获取服务器端一批消息最大的字节数;

max.poll.records:一次poll 拉取数据返回消息的最大条数,默认是500 条。

Kafka学习记录(四)——消费者相关推荐

  1. Kafka学习记录(三)——Broker

    Kafka学习记录(三)--Broker 目录 Kafka学习记录(三)--Broker 对应课程 Zookeeper存储的Kafka信息 Broker总体工作流程 Broker的服役和退役 Kafk ...

  2. leveldb 学习记录(四)Log文件

    前文记录 leveldb 学习记录(一) skiplist leveldb 学习记录(二) Slice leveldb 学习记录(三) MemTable 与 Immutable Memtable le ...

  3. MySQL学习记录 (四) ----- SQL数据管理语句(DML)

    相关文章: <MySQL学习记录 (一) ----- 有关数据库的基本概念和MySQL常用命令> <MySQL学习记录 (二) ----- SQL数据查询语句(DQL)> &l ...

  4. kafka学习笔记(四) --- 压缩算法面面观

    kafka中,压缩,说白了就是,以较少的CPU开销去换更少的磁盘占用或更少的网络I/O传输. 怎么压缩 kafka的消息格式分为两种,社区分别成为V1版本和V2版本,V2版本是在0.11.0.0中正式 ...

  5. 【故障诊断发展学习记录四——数字孪生与控制系统健康管理(DT PHM)】

    数字数字 目录 1. 数字孪生的起源 1.1 数字工程 1.2  模型贯穿决策 1.3 数字工程路线图 1.4 数字工程战略目标 2. 美军数字工程 2.1 生态系统全视图 2.2 支持采办的的完整视 ...

  6. gRPC学习记录(四)--官方Demo

    了解proto3后,接下来看官方Demo作为训练,这里建议看一遍之后自己动手搭建出来,一方面巩固之前的知识,一方面是对整个流程更加熟悉. 官方Demo地址: https://github.com/gr ...

  7. grpc简单使用 java_gRPC学习记录(四)-官方Demo - Java 技术驿站-Java 技术驿站

    了解proto3后,接下来看官方Demo作为训练,这里建议看一遍之后自己动手搭建出来,一方面巩固之前的知识,一方面是对整个流程更加熟悉. 官方Demo地址: https://github.com/gr ...

  8. 《你好,放大器》----学习记录(四)

    4 使用放大器的共性问题 4.1 放大器的封装 选择运放的封装,对整体电路板尺寸.焊接工艺和散热有影响,对电路性能也有影响 4.1.1 关于封装的一些基本概念 关于封装,主要关心两个参数: 管脚间距 ...

  9. python3.10官方文档学习记录四__赋值、比较运算

    1 先来个例子: Python 还可以完成比二加二更复杂的任务. 例如,可以编写 斐波那契数列 的初始子序列,如下所示: >>> # 斐波那契级数: ... # 两个元素的和定义了下 ...

最新文章

  1. base64是哪个jar包的_如何通过一个类名找到它属于哪个jar包?
  2. 为什么泪水充满了我的眼眶,那是一种从未有过的感伤,
  3. java todo error_java基础-异常
  4. 237. Delete Node in a Linked List
  5. 将Glassfish 3连接到外部ActiveMQ 5代理
  6. js设置全局变量ajax中赋值
  7. 人脸识别最新进展——几篇相关论文总结
  8. Guns 旗舰版2.1发布,更新树形表格
  9. Java简单记事本设计实验报告_java记事本实验报告
  10. Python 变量 字符串 运算
  11. 算法竞赛入门经典 电子书(附习题解析)网盘下载
  12. CondaHTTPError: HTTP 000 CONNECTION FAILED for url <https://mirrors.tuna.tsi解决办法 亲测有效
  13. Mysql出现Table 'performance_schema.session_status' doesn't exist
  14. C++代码实现图片调色
  15. 仿B站的视频评论列表
  16. 嵌入式学习项目实战 --- 在线词典
  17. IDEA添加快捷注释功能
  18. Hybrid App开发实战
  19. 美联致美-脂嵌魔鬼身材,脂肪搬家搬出好身材
  20. 3D人脸成像技术整理

热门文章

  1. python爬取网易云热评
  2. 聚首银川 探索互联网远行之路
  3. 穿越雷区--蓝桥杯笔记
  4. H5即时通讯聊天系统源码lM聊天
  5. 16年资深测试大牛教你三部成为测试架构师
  6. html 超级链接微课,HTML入门基础微课(01)
  7. 细胞实验中的对照给药
  8. 【对讲机的那点事】对讲机耳机的使用及注意事项
  9. SAP各模块及主要模块所对应的常用表
  10. Python解压zip文件出现TypeError: pwd: expected bytes, got str的解决方案