Kafka3.0 提交offset方式
文章目录
- offset 的默认维护位置
- 自动提交 offset
- 手动提交 offset
offset 的默认维护位置
Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中
从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets
之所以保存在Kafka内置的topic中,是因为如果offset保存在zookeeper中,会产生大量的网络通信,从而效率低下。
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
1)消费 offset 案例
(0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。
(1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
(2)采用命令行方式,创建一个新的 topic为huanhuan。
[root@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic huanhuan--partitions 2 --replication-factor 2
(3)启动生产者往 huanhuan生产数据。
[root@hadoop102 kafka]$ bin/kafka-console-producer.sh --topic huanhuan --bootstrap-server hadoop102:9092
(4)启动消费者消费 huanhuan数据。
[root@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic huanhuan --group test
注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。
(5)查看消费者消费主题__consumer_offsets。
[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
[offset,huanhuan,1]::OffsetAndMetadata(offset=7, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)
[offset,huanhuan,0]::OffsetAndMetadata(offset=8, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)
自动提交 offset
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
- enable.auto.commit:是否开启自动提交offset功能,默认是true
- auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
参数名称 | 描述 |
---|---|
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 |
消费者自动提交 offset代码
package com.apache.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerAutoOffset {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接kafka服务器properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//提交时间间隔(默认5秒,设置为1秒,以毫秒为单位)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//定义消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//定义主题ArrayList<String> topic = new ArrayList<>();topic.add("first");kafkaConsumer.subscribe(topic);//消费数据while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}
手动提交 offset
虽然自动提交offset十分简单便利,但由于其是基于时间提交的,难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。
两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
1)同步提交 offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。
以下为同步提交 offset 的示例。
package com.apache.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接kafka服务器properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//提交时间间隔(默认5秒,设置为1秒,以毫秒为单位)//properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//定义消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//定义主题ArrayList<String> topic = new ArrayList<>();topic.add("first");kafkaConsumer.subscribe(topic);//消费数据while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 同步提交 offsetkafkaConsumer.commitSync();}}
}
2)异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
以下为异步提交 offset 的示例:
package com.apache.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerByHandAsync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接kafka服务器properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//提交时间间隔(默认5秒,设置为1秒,以毫秒为单位)//properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//定义消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//定义主题ArrayList<String> topic = new ArrayList<>();topic.add("first");kafkaConsumer.subscribe(topic);//消费数据while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 异步提交 offsetkafkaConsumer.commitAsync();}}
}
Kafka3.0 提交offset方式相关推荐
- 四种常见的 POST 提交数据方式
HTTP/1.1 协议规定的 HTTP 请求方法有 OPTIONS.GET.HEAD.POST.PUT.DELETE.TRACE.CONNECT 这几种.其中 POST 一般用来向服务端提交数据,本文 ...
- (转载)四种常见的 POST 提交数据方式
转载地址:https://imququ.com/post/four-ways-to-post-data-in-http.html 四种常见的 POST 提交数据方式 HTTP/1.1 协议规定的 HT ...
- python kafka offset自动提交_Spring-Kafka —— 实现批量消费和手动提交offset
spring-kafka的官方文档介绍,可以知道自1.1版本之后, @KafkaListener开始支持批量消费,只需要设置batchListener参数为true 把application.yml中 ...
- C# HttpWebRequest提交数据方式
HttpWebRequest和HttpWebResponse类是用于发送和接收HTTP数据的最好选择.它们支持一系列有用的属性.这两个类位 于System.Net命名空间,默认情况下这个类对于控制台程 ...
- Web 四种常见的POST提交数据方式
HTTP/1.1 协议规定的 HTTP 请求方法有 OPTIONS.GET.HEAD.POST.PUT.DELETE.TRACE.CONNECT 这几种.其中 POST 一般用来向服务端提交数据,本文 ...
- Spark _05Standalone模式两种提交任务方式
Standalone模式两种提交任务方式 Standalone-client提交任务方式 提交命令 ./spark-submit --master spark://node1:7077 --class ...
- kafka自动提交offset失败:Auto offset commit failed
今天在服务日志中观察数据的消费情况时,发现了一个如下的警告,而且每隔几秒就会出现一次,虽然只是个警告, Auto offset commit failed for group order_group: ...
- application/json 四种常见的 POST 提交数据方式
四种常见的 POST 提交数据方式 HTTP/1.1 协议规定的 HTTP 请求方法有 OPTIONS.GET.HEAD.POST.PUT.DELETE.TRACE.CONNECT 这几种.其中 ...
- 四种常见的 POST 提交数据方式对应的content-type取值
做前后端分离一般都有第3中 , 第一种 基本上jquery那年代用的了 第2种在需要传文件时用的 https://www.cnblogs.com/wushifeng/p/6707248.html 四种 ...
最新文章
- 利用nginx+tomcat+memcached组建web服务器负载均衡
- 数字图像缩放之最近邻插值与双线性插值处理效果对比
- darknet: ./src/cuda.c:36: check_error: Assertion `0' failed.
- 647. Palindromic Substrings 回文子串
- classcastexception异常_内部类、异常以及 LeetCode 每日一题
- python调用c++_python高性能编程之Cython篇 第一章
- mybatis 显示 sql日志
- java 时区处理_如何使用Java处理日历时区?
- java图的邻接表实现两种方式及实例应用分析
- u2 接口 服务器硬盘,M.2、U.2谁更好?主流硬盘接口都有哪些?
- mysql规格单位转化_存储单位的换算(KB, MB, GB)
- 经营三类医疗器械不使用计算机,第三十条经营第三类医疗器械的企业,应当具有符合医疗器械经营质量管理要求的计算机信息管理系统,保证经营的产品可追溯。计算机信息管理系统应当具有以下功能:...
- Python 实现哥德巴赫猜想
- Synplify 综合Gtech 网表
- NOIP2016 “西湖边超萌小松鼠” 模拟赛
- 设随机变量用计算机模拟,概率论实验报告1.docx
- 2021第六届天梯赛cccc总决赛题解
- PAT 乙级 1047 团体编程赛 python
- ToList()方法
- jQuery实现电影海报特效
热门文章
- STM32F4XX的DFU功能
- android sim卡工具,手机sim卡工具包老是弹出来怎么办?sim卡工具包不断弹出删除方法...
- 宝尚网上股票缩量商场拉大盘股高潮
- 学妹问我没有实际项目经验,简历要怎么写?
- 本人亲测,可以使用,万网虚拟主机绑定多个子域名方法(转载)
- 二手书籍交易网站毕业设计,二手书籍买卖平台毕设,二手书籍交易市场平台设计与实现毕业设计论文分析
- 《网站分析实战--如何以数据驱动决策,提升网站价值》学习笔记
- 1236288-25-7 DSPE-PEG-FA Folic acid PEG DSPE 磷脂-聚乙二醇-叶酸
- 2021-11-04 历年提高组真题刷题统计表zyz
- 笔记《基于无人驾驶方程式赛车的传感器融合目标检测算法研究及实现》