通过字面意思我们不难理解这是kafka的自动提交功能。

配置消费者(配置ENABLE_AUTO_COMMIT_CONFIG为 true 配置自动提交)

enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。

auto.commit.interval.ms 的默认值是 5000,单位是毫秒。

此时我们配置消息消费后自动提交offset 位置

    @Beanpublic KafkaConsumer<String, String> kafkaConsumer() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);return consumer;}

配置消息监听

@Slf4j
@Component
public class PackageMessageConsumer {@AutowiredKafkaConsumer<String, String> kafkaConsumer;@AutowiredEventProcessMaster eventProcessMaster;@PostConstructpublic void listenRealTimeGroup() {new Thread(() -> consumer()).start();}private void consumer() {List<String> topics = new ArrayList<>();topics.add("test-kafka-auto.commit");kafkaConsumer.subscribe(topics);while(true) {try {// 拉取消息时间间隔 msConsumerRecords<String, String> records = kafkaConsumer.poll(10);for (ConsumerRecord<String, String> record : records) {String key = record.key();Object content =  record.value();eventProcessMaster.processRequest(new Event(record.topic(), key, content));}} catch (Exception e){log.error(e.getMessage());}}}
}

配置自动提交offset 位置之后,我们不必关心消息消费到了什么位置,当程序重启后,消息也不会重复消费;

配置消费者(配置ENABLE_AUTO_COMMIT_CONFIG为 false 配置手动提交)

手动提交顾名思义就是每次我们消费后,kafka不会手动更新offset 位置,同时 auto.commit.interval.ms 也就不被再考虑了

    @Beanpublic KafkaConsumer<String, String> kafkaConsumer() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");// 与自动提交的区别config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);return consumer;}

当我们设置成手动提交后,不修改其他代码会发现每次重启程序,kafka都会将我们没清理的所有数据都重新消费一遍,与我们需求的幂等性不符,将代码进行完善

    @Beanpublic KafkaConsumer<String, String> kafkaConsumer() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");// 与自动提交的区别config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");// 自动提交时间间隔config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);return consumer;}
@Slf4j
@Component
public class DependPackageMessageConsumer {@AutowiredKafkaConsumer<String, String> kafkaConsumer;@AutowiredEventProcessMaster eventProcessMaster;@PostConstructpublic void listenRealTimeGroup() {new Thread(() -> consumer()).start();}private void consumer() {List<String> topics = new ArrayList<>();topics.add("test-kafka-auto.commit");kafkaConsumer.subscribe(topics);while(true) {try {ConsumerRecords<String, String> records = kafkaConsumer.poll(10);for (ConsumerRecord<String, String> record : records) {String key = record.key();Object content =  record.value();eventProcessMaster.processRequest(new Event(record.topic(), key, content));}// 手动提交 offset 位置kafkaConsumer.commitSync();} catch (Exception e){log.error(e.getMessage());}}}
}

加上手动确认后服务重启,每次都会从上次offset 确认的位置开始消费

Kafka之enable.auto.commit使用解析相关推荐

  1. 容易被误会的 Kafka 消费者属性 enable.auto.commit

    前言 理解一下Kafka的读的自动提交功能. 找到了一篇专门介绍这个功能的文章,选择主要的内容进行一下翻译和做笔记. 自动提交参数auto.commit的设置 Understanding the 'e ...

  2. springboot和kafka结合其中enable.auto.commit等于false失效

    事件描述 公司使用的是Spring Cloud工作的微服务框架.其中做了SpringBoot和kafka的结合.但是意外的是enable.auto.commit参数设置成了false,kafka的of ...

  3. 理解 Kafka 消费者属性的 enable.auto.commit

    前言 理解一下Kafka的读的自动提交功能. 找到了一篇专门介绍这个功能的文章,选择主要的内容进行一下翻译和做笔记. 正文 Understanding the 'enable.auto.commit' ...

  4. Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析

    文章目录 概述 Spring-kafka生产者源码流程 Spring-kafka消费者源码流程(`@EnableKafka`和`@KafkaListener` ) Flow KafkaListener ...

  5. Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE

    Kafka不能既开启消费端的自动应答又开启监听模式的自动应答

  6. Pulsar和Kafka基准测试:Pulsar性能精准解析(完整版)

    关于 Apache Pulsar Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息.存储.轻量化函数式计算为一体,采用计算与存储分离架构设计,支 ...

  7. kafka channle的应用案例

      kafka channle的应用案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 最近在新公司负责大数据平台的建设,平台搭建完毕后,需要将云平台(我们公司使用的Ucloud的 ...

  8. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

  9. springboot手动提交kafka offset

    转载自 springboot手动提交kafka offset enable.auto.commit参数设置成了false 但是测试发现enable.auto.commit参数设置成了false,kaf ...

最新文章

  1. 成功解决ValueError: row index was 65536, not allowed by .xls format
  2. Linux网络编程——tcp并发服务器(epoll实现)
  3. java 线程停止在那个为止_java停止线程
  4. 使用Qt Creator 2.60编写C/C++程序
  5. vscode-textlive-paper学习记录
  6. 判断存储,是栈?还是队列?
  7. static Member Function
  8. 系统学习深度学习(二十三)--SqueezeNet
  9. 用户,用户组,文件和目录权限详解
  10. 机器学习岗面试准备提纲笔记
  11. 远程桌面命令是什么 如何使用命令连接远程桌面
  12. Linux网络编程必学的TCP/IP协议——图解分层(通俗易懂)【建议新手收藏】
  13. C# winform 快速导入excel 到datagridview
  14. 如何计算机闲置虚拟机算法_利用闲置计算机的最佳方法
  15. 29-SpringBoot 安全与SpringSecurity
  16. Android 5.0状态栏通知图标的实现
  17. C语言深度解剖读书笔记
  18. 清华计算机类专业介绍,清华大学研究生专业介绍:计算机技术
  19. 布局福建市场,维也纳酒店欧暇·地中海酒店能否为投资人带来信心与底气?
  20. Niagara基于javascript的控件开发

热门文章

  1. 怎么修改服务器ip密码忘了怎么办,改服务器的ip地址如何修改密码
  2. 2022 愿自律者的人生,自由而优秀
  3. html打印当前页面的函数,js调用iframe实现打印页面内容的方法
  4. python打包上传至pypi —— 具有多个目录的项目工程快速打包上传
  5. 秋分护脚正当时 双驰识足鸟个性化定制呵护您的足部健康
  6. exchange partition中including indexes命令失效
  7. mysql数据库如何创建uuid
  8. 硕迪填报如何自动生成UUID并存入数据库
  9. 「Python爬虫系列讲解」十一、基于登录分析的 Selenium 微博爬虫
  10. 快一起来看看如何把音频转化为文字吧