转载自 springboot手动提交kafka offset

enable.auto.commit参数设置成了false

但是测试发现enable.auto.commit参数设置成了false,kafka的offset依然提交了(也没有进行人工提交offset)。

查看源码

如果我们enable.auto.commit设置为false,那么就会走标红的if语句。而且下面有个stopInvokerAndCommitManualAcks()方法,看名字就知道是人工提交的意思。那么我们进去stopInvokerAndCommitManualAcks()方法瞅瞅。

如上图所示有个processCommits()方法,那么继续追进去:

单单看标红的方法是不是就知道这方法里面是更新offset和提交offset的方法。那么我们继续追进去:


结论:如果我们把enable.auto.commit参数设置成true。那么offset交给kafka来管理,offset进行默认的提交模式。 
enable.auto.commit参数设置成false。那么就是Spring来替为我们做人工提交,从而简化了人工提交的方式。 
所以kafka和springboot结合中的enable.auto.commit为false为spring的人工提交模式。enable.auto.commit为true是采用kafka的默认提交模式。

手动提交

spring.kafka.consumer.enable-auto-commit设置为false,设置AckMode的值

 /*** The offset commit behavior enumeration.*/public enum AckMode {/*** Commit after each record is processed by the listener.*/RECORD,/*** Commit whatever has already been processed before the next poll.*/BATCH,/*** Commit pending updates after* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.*/TIME,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded.*/COUNT,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded or after {@link ContainerProperties#setAckTime(long)* ackTime} has elapsed.*/COUNT_TIME,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}.*/MANUAL,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}. The consumer is woken to
  • RECORD
    每处理一条commit一次
  • BATCH(默认)
    每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME 
    每次间隔ackTime的时间去commit
  • COUNT 
    累积达到ackCount次的ack去commit
  • COUNT_TIME
    ackTime或ackCount哪个条件先满足,就commit
  • MANUAL
    listener负责ack,但是背后也是批量上去
  • MANUAL_IMMEDIATE
    listner负责ack,每调用一次,就立即commit

manual commit

@KafkaListener(topics = "k010")public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception {LOGGER.info(cr.toString());ack.acknowledge();}

方法参数里头传递Acknowledgment,然后手工ack

如果只添加上面语句会报错:

the listener container must have a MANUAL Ackmode to populate the Acknowledgment

我们要配置AckMode为MANUAL Ackmode

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

springboot手动提交kafka offset相关推荐

  1. sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式

    Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息.输入流保证每个消息从Kafka ...

  2. SparkStreaming手动维护Kafka Offset的几种方式

    Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息.输入流保证每个消息从Kafka ...

  3. 关于Kafka 的 consumer 消费者手动提交详解

    前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...

  4. python kafka offset自动提交_Spring-Kafka —— 实现批量消费和手动提交offset

    spring-kafka的官方文档介绍,可以知道自1.1版本之后, @KafkaListener开始支持批量消费,只需要设置batchListener参数为true 把application.yml中 ...

  5. springboot配置手动提交_kafka教程-springboot消费者-手动提交offset

    介绍 自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机. 因此 Kafka 还提供了 手动提交 offset 的 API. 手动提交 offse ...

  6. Kafka:Consumer手动提交offset

    在上一篇博客中介绍了使用Consumer订阅多个Topic或者多个Partition: Kafka:Consumer订阅 在上一篇博客的测试样例中,Consumer都是自动提交offset,这是通过下 ...

  7. Kafka的消息自动提交和手动提交

    只说结论! 如果我们使用原始apache-kafka 依赖的API来消费数据: 如果enable.auto.commit为true,则表示自动提交,但不会在拉取数据之后立即提交.在一次poll的数据处 ...

  8. Kafka消费消息自动提交与手动提交

    消费者poll消息得过程(poll的意思是从broker拿消息,并不代表拿到就消费成功了) 消费者建立了与broker之间的⻓连接,开始poll消息. 默认一次poll 500条消息 props.pu ...

  9. Kafka手动提交偏移量的作用到底是什么???

    手动提交偏移量的原因 最近拜读了很多文章,都谈到为了保证消息的安全消费(避免消息丢失和消息重复读取),建议消费者客户端手动提交偏移量.具体如下: 1.当设置为自动提交时,当kafka消费者读取到消息后 ...

最新文章

  1. OpenGL中投影变换矩阵的反向推导
  2. 利用RC网络降低可调节LDO输出噪声
  3. HTML和CSS初级前端面试题汇总(持续补充)
  4. Android Studio动态调试Smali
  5. linux 启动程序-p,Linux应用程序开发笔记:配置linuxptp开机启动(ubuntu gPTP)
  6. 微信小程序 保存持久化cookie
  7. word文档打印 自动编码_办公室文件打印有哪些技巧 办公室文件打印技巧介绍【图文】...
  8. linux服务器下降,linux - 远程升级Ubuntu:如何最大程度地降低丢失服务器的风险? - Ubuntu问答...
  9. Android中英文单词录入背诵软件
  10. 单片机原理与c语言程序设计付先成版答案,单片机原理与C语言程序设计
  11. 尔雅 科学通史(吴国盛) 个人笔记及课后习题 2018 第一章 科学通史绪论
  12. 减速电机计算公式中功率(P),扭力(NM),转速(RPM),减速比(RATIO)四大因素互相转化的重要性
  13. 按计算机应用领域来分 电子邮件属于,计算机考试题库和答案.doc
  14. 源码解析-为什么引入了jackson-dataformat-xml 包我的接口全变成了xml格式?
  15. Win10蓝牙开关不显示,任务栏不显示蓝牙图标解决方案
  16. Spark独到见解--3控制算子
  17. linux 2.6.32文件系统的dentry父子关系
  18. Java笔记 - 黑马程序员_06(Stream,字节流,字符流,对象流(序列化流),属性集(Properties))
  19. 高通about.html 文件,高通case提交指南2015Oct(4)(1)
  20. python进行独立样本t检验

热门文章

  1. 2019山科计算机专业分数线,2019山东科技大学研究生分数线汇总(含2016-2019历年复试)...
  2. leetcode115. 不同的子序列
  3. C++的new、delete需要注意的一点:使用危险函数导致的越界CRT detected that the application wrote to memory after end of heap
  4. [Java基础]Stream流终结操作之forEachcount
  5. [蓝桥杯][2013年第四届真题]剪格子-dfs
  6. 算法-排序-k排序(算法导论第三版第八章思考题8-5)
  7. 数据结构与算法-- 八皇后问题(多种实现方案)
  8. 家居灯光控制系统设计 android,基于Android的室内照明控制系统设计与实现
  9. laradock双版本php,自己撸一个 LaraDock(使用 Docker LNMP 部署 PHP 开发环境)
  10. Linux 用户和组