Kafka之enable.auto.commit使用解析
通过字面意思我们不难理解这是kafka的自动提交功能。
配置消费者(配置ENABLE_AUTO_COMMIT_CONFIG为 true 配置自动提交)
enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。
auto.commit.interval.ms 的默认值是 5000,单位是毫秒。
此时我们配置消息消费后自动提交offset 位置
@Beanpublic KafkaConsumer<String, String> kafkaConsumer() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);return consumer;}
配置消息监听
@Slf4j
@Component
public class PackageMessageConsumer {@AutowiredKafkaConsumer<String, String> kafkaConsumer;@AutowiredEventProcessMaster eventProcessMaster;@PostConstructpublic void listenRealTimeGroup() {new Thread(() -> consumer()).start();}private void consumer() {List<String> topics = new ArrayList<>();topics.add("test-kafka-auto.commit");kafkaConsumer.subscribe(topics);while(true) {try {// 拉取消息时间间隔 msConsumerRecords<String, String> records = kafkaConsumer.poll(10);for (ConsumerRecord<String, String> record : records) {String key = record.key();Object content = record.value();eventProcessMaster.processRequest(new Event(record.topic(), key, content));}} catch (Exception e){log.error(e.getMessage());}}}
}
配置自动提交offset 位置之后,我们不必关心消息消费到了什么位置,当程序重启后,消息也不会重复消费;
配置消费者(配置ENABLE_AUTO_COMMIT_CONFIG为 false 配置手动提交)
手动提交顾名思义就是每次我们消费后,kafka不会手动更新offset 位置,同时 auto.commit.interval.ms 也就不被再考虑了。
@Beanpublic KafkaConsumer<String, String> kafkaConsumer() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");// 与自动提交的区别config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);return consumer;}
当我们设置成手动提交后,不修改其他代码会发现每次重启程序,kafka都会将我们没清理的所有数据都重新消费一遍,与我们需求的幂等性不符,将代码进行完善
@Beanpublic KafkaConsumer<String, String> kafkaConsumer() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");// 与自动提交的区别config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");// 自动提交时间间隔config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);return consumer;}
@Slf4j
@Component
public class DependPackageMessageConsumer {@AutowiredKafkaConsumer<String, String> kafkaConsumer;@AutowiredEventProcessMaster eventProcessMaster;@PostConstructpublic void listenRealTimeGroup() {new Thread(() -> consumer()).start();}private void consumer() {List<String> topics = new ArrayList<>();topics.add("test-kafka-auto.commit");kafkaConsumer.subscribe(topics);while(true) {try {ConsumerRecords<String, String> records = kafkaConsumer.poll(10);for (ConsumerRecord<String, String> record : records) {String key = record.key();Object content = record.value();eventProcessMaster.processRequest(new Event(record.topic(), key, content));}// 手动提交 offset 位置kafkaConsumer.commitSync();} catch (Exception e){log.error(e.getMessage());}}}
}
加上手动确认后服务重启,每次都会从上次offset 确认的位置开始消费
Kafka之enable.auto.commit使用解析相关推荐
- 容易被误会的 Kafka 消费者属性 enable.auto.commit
前言 理解一下Kafka的读的自动提交功能. 找到了一篇专门介绍这个功能的文章,选择主要的内容进行一下翻译和做笔记. 自动提交参数auto.commit的设置 Understanding the 'e ...
- springboot和kafka结合其中enable.auto.commit等于false失效
事件描述 公司使用的是Spring Cloud工作的微服务框架.其中做了SpringBoot和kafka的结合.但是意外的是enable.auto.commit参数设置成了false,kafka的of ...
- 理解 Kafka 消费者属性的 enable.auto.commit
前言 理解一下Kafka的读的自动提交功能. 找到了一篇专门介绍这个功能的文章,选择主要的内容进行一下翻译和做笔记. 正文 Understanding the 'enable.auto.commit' ...
- Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析
文章目录 概述 Spring-kafka生产者源码流程 Spring-kafka消费者源码流程(`@EnableKafka`和`@KafkaListener` ) Flow KafkaListener ...
- Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE
Kafka不能既开启消费端的自动应答又开启监听模式的自动应答
- Pulsar和Kafka基准测试:Pulsar性能精准解析(完整版)
关于 Apache Pulsar Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息.存储.轻量化函数式计算为一体,采用计算与存储分离架构设计,支 ...
- kafka channle的应用案例
kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...
- 正确处理kafka多线程消费的姿势
最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...
- springboot手动提交kafka offset
转载自 springboot手动提交kafka offset enable.auto.commit参数设置成了false 但是测试发现enable.auto.commit参数设置成了false,kaf ...
最新文章
- 成功解决ValueError: row index was 65536, not allowed by .xls format
- Linux网络编程——tcp并发服务器(epoll实现)
- java 线程停止在那个为止_java停止线程
- 使用Qt Creator 2.60编写C/C++程序
- vscode-textlive-paper学习记录
- 判断存储,是栈?还是队列?
- static Member Function
- 系统学习深度学习(二十三)--SqueezeNet
- 用户,用户组,文件和目录权限详解
- 机器学习岗面试准备提纲笔记
- 远程桌面命令是什么 如何使用命令连接远程桌面
- Linux网络编程必学的TCP/IP协议——图解分层(通俗易懂)【建议新手收藏】
- C# winform 快速导入excel 到datagridview
- 如何计算机闲置虚拟机算法_利用闲置计算机的最佳方法
- 29-SpringBoot 安全与SpringSecurity
- Android 5.0状态栏通知图标的实现
- C语言深度解剖读书笔记
- 清华计算机类专业介绍,清华大学研究生专业介绍:计算机技术
- 布局福建市场,维也纳酒店欧暇·地中海酒店能否为投资人带来信心与底气?
- Niagara基于javascript的控件开发