文章目录

  • offset 的默认维护位置
  • 自动提交 offset
  • 手动提交 offset

offset 的默认维护位置

Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中

从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets

之所以保存在Kafka内置的topic中,是因为如果offset保存在zookeeper中,会产生大量的网络通信,从而效率低下。

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

1)消费 offset 案例
(0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。
(1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

(2)采用命令行方式,创建一个新的 topic为huanhuan。

[root@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic huanhuan--partitions 2 --replication-factor 2

(3)启动生产者往 huanhuan生产数据。

[root@hadoop102 kafka]$ bin/kafka-console-producer.sh --topic huanhuan --bootstrap-server hadoop102:9092

(4)启动消费者消费 huanhuan数据。

[root@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic huanhuan --group test

注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。

(5)查看消费者消费主题__consumer_offsets。

[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
[offset,huanhuan,1]::OffsetAndMetadata(offset=7, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)
[offset,huanhuan,0]::OffsetAndMetadata(offset=8, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)

自动提交 offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

自动提交offset的相关参数:

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

参数名称 描述
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。

消费者自动提交 offset代码

package com.apache.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerAutoOffset {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接kafka服务器properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//提交时间间隔(默认5秒,设置为1秒,以毫秒为单位)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//定义消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//定义主题ArrayList<String> topic = new ArrayList<>();topic.add("first");kafkaConsumer.subscribe(topic);//消费数据while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

手动提交 offset

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,难以把握offset提交的时机。因 此Kafka还提供了手动提交offset的API。

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。

两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

1)同步提交 offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

以下为同步提交 offset 的示例。

package com.apache.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接kafka服务器properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//提交时间间隔(默认5秒,设置为1秒,以毫秒为单位)//properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//定义消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//定义主题ArrayList<String> topic = new ArrayList<>();topic.add("first");kafkaConsumer.subscribe(topic);//消费数据while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 同步提交 offsetkafkaConsumer.commitSync();}}
}

2)异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

以下为异步提交 offset 的示例:

package com.apache.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerByHandAsync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接kafka服务器properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//提交时间间隔(默认5秒,设置为1秒,以毫秒为单位)//properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);//定义消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//定义主题ArrayList<String> topic = new ArrayList<>();topic.add("first");kafkaConsumer.subscribe(topic);//消费数据while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 异步提交 offsetkafkaConsumer.commitAsync();}}
}

Kafka3.0 提交offset方式相关推荐

  1. 四种常见的 POST 提交数据方式

    HTTP/1.1 协议规定的 HTTP 请求方法有 OPTIONS.GET.HEAD.POST.PUT.DELETE.TRACE.CONNECT 这几种.其中 POST 一般用来向服务端提交数据,本文 ...

  2. (转载)四种常见的 POST 提交数据方式

    转载地址:https://imququ.com/post/four-ways-to-post-data-in-http.html 四种常见的 POST 提交数据方式 HTTP/1.1 协议规定的 HT ...

  3. python kafka offset自动提交_Spring-Kafka —— 实现批量消费和手动提交offset

    spring-kafka的官方文档介绍,可以知道自1.1版本之后, @KafkaListener开始支持批量消费,只需要设置batchListener参数为true 把application.yml中 ...

  4. C# HttpWebRequest提交数据方式

    HttpWebRequest和HttpWebResponse类是用于发送和接收HTTP数据的最好选择.它们支持一系列有用的属性.这两个类位 于System.Net命名空间,默认情况下这个类对于控制台程 ...

  5. Web 四种常见的POST提交数据方式

    HTTP/1.1 协议规定的 HTTP 请求方法有 OPTIONS.GET.HEAD.POST.PUT.DELETE.TRACE.CONNECT 这几种.其中 POST 一般用来向服务端提交数据,本文 ...

  6. Spark _05Standalone模式两种提交任务方式

    Standalone模式两种提交任务方式 Standalone-client提交任务方式 提交命令 ./spark-submit --master spark://node1:7077 --class ...

  7. kafka自动提交offset失败:Auto offset commit failed

    今天在服务日志中观察数据的消费情况时,发现了一个如下的警告,而且每隔几秒就会出现一次,虽然只是个警告, Auto offset commit failed for group order_group: ...

  8. application/json 四种常见的 POST 提交数据方式

    四种常见的 POST 提交数据方式   HTTP/1.1 协议规定的 HTTP 请求方法有 OPTIONS.GET.HEAD.POST.PUT.DELETE.TRACE.CONNECT 这几种.其中 ...

  9. 四种常见的 POST 提交数据方式对应的content-type取值

    做前后端分离一般都有第3中 , 第一种 基本上jquery那年代用的了 第2种在需要传文件时用的 https://www.cnblogs.com/wushifeng/p/6707248.html 四种 ...

最新文章

  1. 利用nginx+tomcat+memcached组建web服务器负载均衡
  2. 数字图像缩放之最近邻插值与双线性插值处理效果对比
  3. darknet: ./src/cuda.c:36: check_error: Assertion `0' failed.
  4. 647. Palindromic Substrings 回文子串
  5. classcastexception异常_内部类、异常以及 LeetCode 每日一题
  6. python调用c++_python高性能编程之Cython篇 第一章
  7. mybatis 显示 sql日志
  8. java 时区处理_如何使用Java处理日历时区?
  9. java图的邻接表实现两种方式及实例应用分析
  10. u2 接口 服务器硬盘,M.2、U.2谁更好?主流硬盘接口都有哪些?
  11. mysql规格单位转化_存储单位的换算(KB, MB, GB)
  12. 经营三类医疗器械不使用计算机,第三十条经营第三类医疗器械的企业,应当具有符合医疗器械经营质量管理要求的计算机信息管理系统,保证经营的产品可追溯。计算机信息管理系统应当具有以下功能:...
  13. Python 实现哥德巴赫猜想
  14. Synplify 综合Gtech 网表
  15. NOIP2016 “西湖边超萌小松鼠” 模拟赛
  16. 设随机变量用计算机模拟,概率论实验报告1.docx
  17. 2021第六届天梯赛cccc总决赛题解
  18. PAT 乙级 1047 团体编程赛 python
  19. ToList()方法
  20. jQuery实现电影海报特效

热门文章

  1. STM32F4XX的DFU功能
  2. android sim卡工具,手机sim卡工具包老是弹出来怎么办?sim卡工具包不断弹出删除方法...
  3. 宝尚网上股票缩量商场拉大盘股高潮
  4. 学妹问我没有实际项目经验,简历要怎么写?
  5. 本人亲测,可以使用,万网虚拟主机绑定多个子域名方法(转载)
  6. 二手书籍交易网站毕业设计,二手书籍买卖平台毕设,二手书籍交易市场平台设计与实现毕业设计论文分析
  7. 《网站分析实战--如何以数据驱动决策,提升网站价值》学习笔记
  8. 1236288-25-7 DSPE-PEG-FA Folic acid PEG DSPE 磷脂-聚乙二醇-叶酸
  9. 2021-11-04 历年提高组真题刷题统计表zyz
  10. 笔记《基于无人驾驶方程式赛车的传感器融合目标检测算法研究及实现》