介绍

自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机。

因此 Kafka 还提供了 手动提交 offset 的 API。

手动提交 offset 的方法有两种:

commitSync(同步提交)

commitAsync(异步提交)(吞吐量较大,推荐)

ack方式。spring kafka对 commitSync、commitAsync 进行封装,默认用 commitSync 同步提交

相同点是:都会将本次 poll 的一批数据最高的偏移量提交;

不同点是:

commitSync 阻塞当前线程,一直到提交成功,吞吐量会受到很大的影响。失败会自动重试(可能会提交失败)

commitAsync 没有失败重试机制,故有可能提交失败。不会阻塞当前线程,吞吐量较大

注意:依赖2.4.7以上(含)可能报错

依赖2.4.7以上(含)可能报错,所以此处 尽量用 2.4.6 以下

org.springframework.kafka

spring-kafka

2.4.1.RELEASE

修改application.prpertise# 不自动提交

spring.kafka.consumer.enable-auto-commit=false

# ack`manual` 或 `manual_immediate`

spring.kafka.listener.ack-mode=manual

完整配置server.port=8083

###########【Kafka集群】###########

spring.kafka.bootstrap-servers=localhost:9092

###########【初始化消费者配置】###########

# 默认的消费组ID

spring.kafka.consumer.properties.group.id=user-group

# 是否自动提交offset

spring.kafka.consumer.enable-auto-commit=false

# 提交offset延时(接收到消息后多久提交offset)

spring.kafka.consumer.auto.commit.interval.ms=1000

spring.kafka.listener.ack-mode=manual

# 当kafka中没有初始offset或offset超出范围时将自动重置offset

# earliest:重置为分区中最小的offset;

# latest:重置为分区中最新的offset(消费分区中新产生的数据);

# none:只要有一个分区不存在已提交的offset,就抛出异常;

spring.kafka.consumer.auto-offset-reset=earliest

# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)

spring.kafka.consumer.properties.session.timeout.ms=120000

# 消费请求超时时间

spring.kafka.consumer.properties.request.timeout.ms=180000

# Kafka提供的序列化和反序列化类

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 消费端监听的topic不存在时,项目启动会报错(关掉)

spring.kafka.listener.missing-topics-fatal=false

# 设置批量消费

#spring.kafka.listener.type=batch

# 批量消费每次最多消费多少条消息

#spring.kafka.consumer.max-poll-records=50

java

ack方式

spring kafka提供的方式,使用简单,默认使用 commitSync 同步提交

@KafkaListener(id = "consumer1",topics = {"userlog"})

public void onMessage(ConsumerRecord, ?> record, Acknowledgment ack) {

// 消费的哪个topic、partition的消息,打印出消息内容

System.out.println(String.format("消费:topic:%s-partition:%s-offset:%s-value:%s", record.topic(),record.partition(),record.offset(),record.value()));

ack.acknowledge();

}

commitAsync 异步方式@KafkaListener(id = "consumer1",topics = {"userlog"})

public void onMessage(@Payload String message,

@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,

@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

@Header(KafkaHeaders.OFFSET) String offset,

Consumer consumer) {

// 消费的哪个topic、partition的消息,打印出消息内容

System.out.println(String.format("消费:topic:%s-partition:%s-offset:%s-value:%s", topic,partition,offset,message));

consumer.commitAsync();

}

commitSync同步方式

形参与上面方法一样,只是调用方法不同

@KafkaListener(id = "consumer1",topics = {"userlog"})

public void onMessage(@Payload String message,

@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,

@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

@Header(KafkaHeaders.OFFSET) String offset,

Consumer consumer) {

// 消费的哪个topic、partition的消息,打印出消息内容

System.out.println(String.format("消费:topic:%s-partition:%s-offset:%s-value:%s", topic,partition,offset,message));

consumer.commitSync();

}

springboot配置手动提交_kafka教程-springboot消费者-手动提交offset相关推荐

  1. springboot 与rabbitmq集成+生产者投递确认+消费者手动确认+TTL+死信队列+延时队列

    1.生产者的消息可靠性投递机制 1)springboot yml文件配置 spring:rabbitmq:host: 10.0.23.83username: lifwepassword: 123456 ...

  2. springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失. 消息正常发送的 ...

  3. springboot配置Druid数据源

    springboot配置druid数据源 Author:SimpleWu springboot整合篇 前言 对于数据访问层,无论是Sql还是NoSql,SpringBoot默认采用整合SpringDa ...

  4. SpringBoot基础学习之SpringBoot配置(上篇)

    前言: 小伙伴们,大家好,我是狂奔の蜗牛rz,当然你们可以叫我蜗牛君,我是一个学习Java半年多时间的小菜鸟,同时还有一个伟大的梦想,那就是有朝一日,成为一个优秀的Java架构师. 这个SpringB ...

  5. springboot配置进行https请求访问demo

    SpringBoot配置https 代码地址:springboot配置https 1.配置https相关要求 需要通过命令生成相关的证书,springboot通过证书才能够进行端口转发到https请求 ...

  6. springboot配置文件定义全局变量

    springboot配置文件定义全局变量 问题在线 springboot项目中redis,mysql数据源使用的同一个,中间件服务器都是放在同一台服务器上的,所以ip地址是统一的,如果服务器需要替换, ...

  7. springboot banner在线生成_SpringBoot系列教程10--小花样之SpringBoot配置自定义Banner

    SpringBoot系列教程10--小花样之SpringBoot配置自定义Banner 作者:一一哥 一. Spring Boot 常用配置 本章节主要介绍一下 Spring Boot 中的一些常用配 ...

  8. SpringBoot2.x系列教程10--小花样之SpringBoot配置自定义Banner

    SpringBoot系列教程10--小花样之SpringBoot配置自定义Banner 作者:一一哥 一. Spring Boot 常用配置 本章节主要介绍一下 Spring Boot 中的一些常用配 ...

  9. 2021 申请SSL证书、Nginx和SpringBoot配置阿里云SSL证书解决HTTP HTTPS及不生效、打包报错详细教程

    2021 申请SSL证书.Nginx和SpringBoot配置阿里云SSL证书解决HTTP HTTPS及不生效.打包报错详细教程 目录 1. 获取SSL证书文件 1.1 购买SSL证书 1.2创建证书 ...

最新文章

  1. mysql命令的分类_MySQL常用命令分类汇总
  2. Kotlin 系列(二) 基本语法(1)
  3. Robot Framework-Ride界面介绍及库的添加
  4. 分析不同类型页面渲染过程
  5. 【Qt开发】QTableWidget设置根据内容调整列宽和行高
  6. api商品分享源码_SSM框架高并发和商品秒杀项目高并发秒杀API源码免费分享
  7. android sim卡分析,Android 判断SIM卡属于哪个移动运营商详解及实例
  8. 求n的阶乘java_Java 求n的阶乘
  9. Hibernate实体对象的生命周期(三种状态详解)
  10. 怎么清理计算机磁盘空间,怎样清理电脑磁盘空间
  11. 黑天鹅mobi_破坏我们系统的因素:黑天鹅分类法
  12. 关于bitlocker加密后的格式化
  13. python程序 爱意_情人节到了,隔离在家的你还不快用Python给你的她表达下爱意?...
  14. 计算机论文课题来源,浅析论文题目的来源和意义
  15. 无人值守安装系统完整(CentOS7)
  16. 关于COM中变体类型VARIANT
  17. 研华PCI1716L的C#编程
  18. 深度学习:生成式对抗网络,让机器在博弈中实现“自我成长”
  19. PDPS 服务器型Imtools的安装与最新一期许可证分享及替换方法
  20. 20_TodoList案例

热门文章

  1. rsync替换cp命令显示速度和进度
  2. Android dex、odex、oat、vdex、art区别
  3. AS3.1.2配置gradle阿里云镜像(可用)
  4. 启动Activity不显示界面
  5. Clojure 学习入门(2)- eclipse 插件
  6. (Android)java虚拟机和Dalvik虚拟机的区别
  7. iOS平台一套完善的Crash Report解决方案
  8. 联想服务器(RD540)安装双系统
  9. java getchild_Java IFileStore.getChild方法代碼示例
  10. python闭包和函数调用区别_python – 函数闭包与可调用类