springboot配置手动提交_kafka教程-springboot消费者-手动提交offset
介绍
自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机。
因此 Kafka 还提供了 手动提交 offset 的 API。
手动提交 offset 的方法有两种:
commitSync(同步提交)
commitAsync(异步提交)(吞吐量较大,推荐)
ack方式。spring kafka对 commitSync、commitAsync 进行封装,默认用 commitSync 同步提交
相同点是:都会将本次 poll 的一批数据最高的偏移量提交;
不同点是:
commitSync 阻塞当前线程,一直到提交成功,吞吐量会受到很大的影响。失败会自动重试(可能会提交失败)
commitAsync 没有失败重试机制,故有可能提交失败。不会阻塞当前线程,吞吐量较大
注意:依赖2.4.7以上(含)可能报错
依赖2.4.7以上(含)可能报错,所以此处 尽量用 2.4.6 以下
org.springframework.kafka
spring-kafka
2.4.1.RELEASE
修改application.prpertise# 不自动提交
spring.kafka.consumer.enable-auto-commit=false
# ack`manual` 或 `manual_immediate`
spring.kafka.listener.ack-mode=manual
完整配置server.port=8083
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=localhost:9092
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=user-group
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=false
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
spring.kafka.listener.ack-mode=manual
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=earliest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
#spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
#spring.kafka.consumer.max-poll-records=50
java
ack方式
spring kafka提供的方式,使用简单,默认使用 commitSync 同步提交
@KafkaListener(id = "consumer1",topics = {"userlog"})
public void onMessage(ConsumerRecord, ?> record, Acknowledgment ack) {
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println(String.format("消费:topic:%s-partition:%s-offset:%s-value:%s", record.topic(),record.partition(),record.offset(),record.value()));
ack.acknowledge();
}
commitAsync 异步方式@KafkaListener(id = "consumer1",topics = {"userlog"})
public void onMessage(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) String offset,
Consumer consumer) {
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println(String.format("消费:topic:%s-partition:%s-offset:%s-value:%s", topic,partition,offset,message));
consumer.commitAsync();
}
commitSync同步方式
形参与上面方法一样,只是调用方法不同
@KafkaListener(id = "consumer1",topics = {"userlog"})
public void onMessage(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) String offset,
Consumer consumer) {
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println(String.format("消费:topic:%s-partition:%s-offset:%s-value:%s", topic,partition,offset,message));
consumer.commitSync();
}
springboot配置手动提交_kafka教程-springboot消费者-手动提交offset相关推荐
- springboot 与rabbitmq集成+生产者投递确认+消费者手动确认+TTL+死信队列+延时队列
1.生产者的消息可靠性投递机制 1)springboot yml文件配置 spring:rabbitmq:host: 10.0.23.83username: lifwepassword: 123456 ...
- springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失
目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失. 消息正常发送的 ...
- springboot配置Druid数据源
springboot配置druid数据源 Author:SimpleWu springboot整合篇 前言 对于数据访问层,无论是Sql还是NoSql,SpringBoot默认采用整合SpringDa ...
- SpringBoot基础学习之SpringBoot配置(上篇)
前言: 小伙伴们,大家好,我是狂奔の蜗牛rz,当然你们可以叫我蜗牛君,我是一个学习Java半年多时间的小菜鸟,同时还有一个伟大的梦想,那就是有朝一日,成为一个优秀的Java架构师. 这个SpringB ...
- springboot配置进行https请求访问demo
SpringBoot配置https 代码地址:springboot配置https 1.配置https相关要求 需要通过命令生成相关的证书,springboot通过证书才能够进行端口转发到https请求 ...
- springboot配置文件定义全局变量
springboot配置文件定义全局变量 问题在线 springboot项目中redis,mysql数据源使用的同一个,中间件服务器都是放在同一台服务器上的,所以ip地址是统一的,如果服务器需要替换, ...
- springboot banner在线生成_SpringBoot系列教程10--小花样之SpringBoot配置自定义Banner
SpringBoot系列教程10--小花样之SpringBoot配置自定义Banner 作者:一一哥 一. Spring Boot 常用配置 本章节主要介绍一下 Spring Boot 中的一些常用配 ...
- SpringBoot2.x系列教程10--小花样之SpringBoot配置自定义Banner
SpringBoot系列教程10--小花样之SpringBoot配置自定义Banner 作者:一一哥 一. Spring Boot 常用配置 本章节主要介绍一下 Spring Boot 中的一些常用配 ...
- 2021 申请SSL证书、Nginx和SpringBoot配置阿里云SSL证书解决HTTP HTTPS及不生效、打包报错详细教程
2021 申请SSL证书.Nginx和SpringBoot配置阿里云SSL证书解决HTTP HTTPS及不生效.打包报错详细教程 目录 1. 获取SSL证书文件 1.1 购买SSL证书 1.2创建证书 ...
最新文章
- mysql命令的分类_MySQL常用命令分类汇总
- Kotlin 系列(二) 基本语法(1)
- Robot Framework-Ride界面介绍及库的添加
- 分析不同类型页面渲染过程
- 【Qt开发】QTableWidget设置根据内容调整列宽和行高
- api商品分享源码_SSM框架高并发和商品秒杀项目高并发秒杀API源码免费分享
- android sim卡分析,Android 判断SIM卡属于哪个移动运营商详解及实例
- 求n的阶乘java_Java 求n的阶乘
- Hibernate实体对象的生命周期(三种状态详解)
- 怎么清理计算机磁盘空间,怎样清理电脑磁盘空间
- 黑天鹅mobi_破坏我们系统的因素:黑天鹅分类法
- 关于bitlocker加密后的格式化
- python程序 爱意_情人节到了,隔离在家的你还不快用Python给你的她表达下爱意?...
- 计算机论文课题来源,浅析论文题目的来源和意义
- 无人值守安装系统完整(CentOS7)
- 关于COM中变体类型VARIANT
- 研华PCI1716L的C#编程
- 深度学习:生成式对抗网络,让机器在博弈中实现“自我成长”
- PDPS 服务器型Imtools的安装与最新一期许可证分享及替换方法
- 20_TodoList案例
热门文章
- rsync替换cp命令显示速度和进度
- Android dex、odex、oat、vdex、art区别
- AS3.1.2配置gradle阿里云镜像(可用)
- 启动Activity不显示界面
- Clojure 学习入门(2)- eclipse 插件
- (Android)java虚拟机和Dalvik虚拟机的区别
- iOS平台一套完善的Crash Report解决方案
- 联想服务器(RD540)安装双系统
- java getchild_Java IFileStore.getChild方法代碼示例
- python闭包和函数调用区别_python – 函数闭包与可调用类