kafka的Consumer 消费者(六)
Kafka 的消费方式
消息队列主要有两个消费方式:pull(拉)模式 和 push(推)模式
pull(拉)模式 consumer采用从broker中主动拉取数据,push(推)模式 broker决定消息发送速率。而每个消费者受限于所部署的服务器性能限制,如果使用push(推)模式很可能会出现有的consumer还来不及消费;因此kafka是采用的pull(拉)模式,可以根据消费者服务器的性能去调节消费者消费的消息速率。当然pull(拉)模式也有一定的缺陷,如果kafka中没有数据,消费者可能会陷入循环中,一直返回空数据。
消费者总体工作流程
生产者向每个分区的leader发送数据,follower主动与leader进行数据的同步保证数据的可靠性,消费者消费分区的数据,一个消费者可以消费一个或者多个分区的数据,而每个消费者是独立,并不存在相互关联的; 但是每个分区的数据只能有消费者组中的一个消费者消费。
消费者组原理
消费者组
Consumer Grep (CG): 消费者组,有多个consumer组成,形成一个消费者组的条件是所有的消费者的groupid相同
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
- 消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
- 如果向消费者组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置不会接收任何消息
消费者组初始化流程
首先了解一个组件:coordinator,它是辅助实现消费者组的初始化和分区的分配。每个broker节点中都有一个对应的coordinator。coordinator的节点选择 = groupid 的hashcode值 % 50 (_consumer_offset的分区数量),而消费者组中的消费者就是通过这个公式来选择coordinator节点
例如:groupid的hashcode值是1,1%50=1,那么_consumer_offset主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
消费者组详细消费流程
- 消费者组在进行工作之前,会先创建一个消费者网络连接客户端,主要用于与kafka集群进行交互
- 消费者组发送消费请求用于抓取数据的初始化
- 调用send方法发送请求
- 发送完请求后通过回调方法拉取对应的结果,放入消息队列中
- 消费者从队列中拉取指定数量的消息进行处理
- 将消息进行反序列化,经过拦截器在进行数据的处理
消费者API
示例1:消费一个主题
注意:在消费者API代码中必须配置消费者组ID。命令行启动消费者不填组ID会被自动填写随机的消费者组ID
public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// ack配置,默认-1(all)properties.put(ProducerConfig.ACKS_CONFIG, "1");// 重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 10);KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("first", "ACK测试" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata data, Exception e) {if (e == null) {System.out.println("分区:" + data.partition());} else {System.err.println("发送失败");System.err.println(e);}}});}// 关闭资源producer.close();
}
示例2:消费一个主题下的指定分区
public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接kafkaproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者groupid, 注意:必须的配置此参数properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题对应的分区List<TopicPartition> topics = new ArrayList<>();topics.add(new TopicPartition("first", 0));consumer.assign(topics);// 消费数据while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据poll.forEach(System.out::println);}
}
分区的分配以及再平衡
kafka有四种主流的分区非配策略:Range、RoundRobin、Sticky、CooperativeSticky,可以通过 partition.assignment.strategy 参数修改分区的分配策略。默认策略是Range+CooperativeSticky。kafka可以同时使用多个分区分配策略。
Range 策略
Range是对每个topic而言的。首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序;通过 partitions数 / consumers数来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费几个分区
注意: 如果只是针对1个topic而言,消费者多消费一个分区影响不是很大,但是如果有N多个topic,那么针对每个topic前面的消费者都将多消费一个分区,容易产生数据倾斜
RoundRobin 策略
RoundRobin 是针对集群所有topic而言的,RoundRobin 轮询分区策略,是把所有的partition和所有的consumer都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配partition给到各个消费者
Sticky 策略
粘性分区可以理解为分配的结果带有"粘性的",即在执行一次新的分配之前,考虑上一次分配的结果,尽可能少的调整分配的变动,可以节省大量的开支。粘性分区是Kafka从0.11x版本开始引入的分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变
示例:
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");// 配置分区分配策略,可配置参数: RangeAssignor RoundRobinAssignor StickyAssignor CooperativeStickyAssignorproperties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}
offset位移
offset默认的维护位置
kafka0.9版本之前,consumer默认将offset保存在zookeeper中,从0.9版本开始,consumer默认将offset保存在kafka一个内置的topic中:__consumer_offsets。原因:如果每个消费者都和zookeeper进行交互储存对应的offset,将会占用大量的网络资源,效率比较低下
__consumer_offsets主题里面采用key-value的方式保存数据。key是 group.id + topic + 分区号,value是当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是 group.id + topic + 分区号就保留最新的数据。
默认情况下,kafka是不允许查看系统主题的数据的,可以通过 exclude.internal.topic 修改,默认true表示不能消费系统主题
示例:
# 修改kafka consumer的配置文件(vi config/consumer.properties),新增以下参数
exclude.internal.topic=false# 为了方便,创建一个新的topic
./bin/kafka-topics.sh --bootstrap-server 192.168.0.120:9092 --create --topic testOffset --partitions 2 --replication-factor 1# 启动生产者并向 testOffset 发送数据
./bin/kafka-console-producer.sh --bootstrap-server 192.168.0.120:9092 --topic testOffset# 启动消费者消费 testOffset 的数据,指定组是为了方便观察储存位置
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.120:9092 --topic testOffset --group test# 查看消费者消费注意__consumer_offsets
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.120:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
自动提交offset
为了使我们能够专注于自己的业务逻辑,kafka提供了自动提交offset的功能。自动提交offset的相关参数如下:
- enable.auto.commit:是否开始自动提交offset,默认true
- auto.commit.interval.ms:自动提交offset的时间间隔,默认5s
示例:
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());// 自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 提交时间间隔properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}
手动提交offset
虽然自动提交offset简单便利,但是由于其是基于时间提交的,开发员难以把握offset提交的时机。因此kafka还提供了手动提交offset的API
手动提交offset的方法有两种:commitSync 同步提交、commitAsync 异步提交。两者的相同点都会将本次提交的一批数据最高的偏移量提交,不同的是:同步提交阻塞当前线程, 一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),而异步提交没有重试机制,故有可能提交失败
- commitSync 同步提交:必须等待offset提交完毕再去消费下一批数据
- commitAsync 异步提交:发送完提交offset请求后,就开始消费下一批数据
示例1:同步提交
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());// 手动提交配置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}// 同步提交offsetconsumer.commitSync();}
}
示例2:异步提交
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());// 手动提交配置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}// 异步提交offsetconsumer.commitAsync();}
}
指定offset消费
目前有earliest、latest、none消费方式,默认latest,通过参数 auto.offset.reset 可以进行修改
- earliest: 自动将偏移量重置为最早的偏移量 --from-beginning
- latest: 自动将偏移量重置为最新偏移量
- none: 如果未找到消费者组的先前偏移量,则向消费者抛出异常
示例:
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");// 修改消费方式 earliest、latest、noneproperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);// 指定位置进行消费Set<TopicPartition> partitions = consumer.assignment();// 保证分区分配方案已经制定完成while (partitions.size() == 0) {consumer.poll(Duration.ofSeconds(1));// 拉取一次数据partitions = consumer.assignment();// 更新}for (TopicPartition partition : partitions) {consumer.seek(partition, 100);}while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}
指定时间消费
public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.120:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "first_01");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);List<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);// 指定位置进行消费Set<TopicPartition> partitions = consumer.assignment();// 分区集合// 保证分区分配方案已经制定完成while (partitions.size() == 0) {consumer.poll(Duration.ofSeconds(1));// 拉取一次数据partitions = consumer.assignment();// 更新}// 把时间转换为对应的offsetMap<TopicPartition, Long> map = new HashMap<>();for (TopicPartition partition : partitions) {map.put(partition, System.currentTimeMillis() - 24 * 3600 * 1000); // 消费分区一天前到现在的数据}Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);// 通过时间获取offset的集合for (TopicPartition partition : partitions) {OffsetAndTimestamp offsetAndTimestamp = offsets.get(partition);consumer.seek(partition, offsetAndTimestamp.offset()); // 指定消费的offset}while (true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(5)); // 间隔5秒拉取数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}
漏消费和重复消费
**重复消费:**已经消费的数据但是offset没有提交
**漏消费:**先提交offset后消费,有可能造成数据的漏消费
出现这两种情况我们可以通过kafka的 事务 来解决
消费者事务
如果想完成consumer端的精准一次性消费,那么需要 kafka消费端将消费过程和提交offset过程做原子绑定。 此时我们需要将kafka的offset保存到支持事务的自定义介质如MySQL中。
数据积压,消费者如何提高吞吐量
在kafka中,默认的日志储存时间为7天,超过了7天数据就会被删除。如果你消费一个主题的数据比如消费的了4天,但是只消费了数据量的20%,而到了7天时这个主题里面的数据还没有被消费完而将会删除时,这种情况可以加快消费速度。
1. 如果是kafka的消费能力不足,则可以考虑增加topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数,两者缺一不可
2. 如果是下游的数据处理不及时,提高每批次拉取的数量。 批次拉取数量过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压
源码地址:https://gitee.com/peachtec/hxz-study
kafka的Consumer 消费者(六)相关推荐
- 关于Kafka 的 consumer 消费者手动提交详解
前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...
- kafka之Consumer消费者基本概念
概念 消费者 消费者(Consumer)即读取Kafka集群中某些topic消息的程序,kafka中消费者分为两种类型: 消费者组 (consumer group) 独立消费者 (standalong ...
- kafka 主动消费_Kafka核心API——Consumer消费者
Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了.因此,本文 ...
- Apache Kafka Consumer 消费者集
1.目标 在我们的上一篇文章中,我们讨论了Kafka Producer.今天,我们将讨论Kafka Consumer.首先,我们将看到什么是Kafka Consumer和Kafka Consumer的 ...
- 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,
目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...
- kafka生产者、消费者java示例
1. 生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.Ke ...
- Jmeter之创建Kafka生产者和消费者进行性能测试
目录 1. A Brief Overview of Apache Kafka 2. Pepper-Box Serialized Config 3. Pepper Box Kafka Sampler 4 ...
- Kafka设计解析(六)- Kafka高性能架构之道
原创文章,转载请务必将下面这段话置于文章开头处. 本文转发自技术世界,原文链接 http://www.jasongj.com/kafka/high_throughput/ 摘要 上一篇文章<Ka ...
- 【kafka】Consumer is not subscribed to any topics
1.概述 一个网友的问题,然后我帮他解决,后来没告诉我后面结果如何了,先转载记录一下 转载: https://blog.csdn.net/github_32521685/article/details ...
最新文章
- MyBatis复习笔记2:配置文件详解
- Target runtime Apache Tomcat 6.0 is not defined
- java class类文件结构
- window server 2012 IE10 增强的安全设置 如何关闭
- Linux网络编程 | IO模型 :阻塞IO、非阻塞IO、信号驱动IO、异步IO、多路复用IO
- P4317-花神的数论题【组合数学】
- centos 生产 ssh-key
- java .z文件_java 压缩文件
- git commit 规范校验配置和版本发布配置
- 时间戳和字符串互相转换
- Yum介绍与常见用法
- Spring与Struts2的整合
- Android WorkManager 实战讲解
- 2021年上半年数据库系统工程师下午真题及答案解析
- Android指南针应用编写
- Innovator Admin 一个aras的管理器,又一个package安装方法
- 算法竞赛进阶指南——二分:防线
- SQLyog 报错2058 :连接 mysql 8.0.11 解决方法
- html横向滚动效果,html 中 鼠标滑轮实现横向滚动
- 无需下载 网页版 Matlab