kafka-offset手动提交和自动提交
目录
首先回顾之前的知识点
自动提交offset
手动提交
消费者poll消息的细节
完整代码:
按照新方法进行消费消息
1.指定时间进行消息的消费
2.指定分区开始从头消费+指定分区的偏移量开始消费
新消费组的消费offset规则
首先回顾之前的知识点
消费者消费消息,每消费offset+1,然后提交offset给到我们kafka中topic中的cousumer_offsets,该消费者宕机后,另外的消费者就会读取consumer_offsets读取我们的offset消费后面的消息
我们kafka消费者是自动拉取消息的,mq是队列push给消费者
自动提交:消息poll下来后(还没有消费)直接提交offset,速度很快,可能出现消费失败
手动提交:在消息消费时/消费后再提交offset
自动提交offset
缺点:可能会丢消息,比如消费者poll了topic中partition的消息后,然后提交offset,可能消费者没有消费成功
提交的内容offset——>消费组+topic+offset
自动提交的配置
/*** 1.1设置是否自动提交offset并设置offset的间隔时间*/properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
一poll就提交offset了
手动提交
分为手动同步提交+手动异步提交
手动同步提交:在消息消费完后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示成功
consumer.commitAsync();
手动异步提交:不需要等集群返回ack,直接执行后序的逻辑即可,我们可以设置一个回调方法
消费者poll消息的细节
定义:消费者会根据设置的消费时间来决定消费多少消息
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500)//拉取0.5s消息
默认消费者一次性poll500条信息(长轮询时间为1s),如果时间内poll了500条就结束for循环
//长轮询拉取时间,1s:消费者拉取1s时间不管拉了多少条消息(除非时间内拉取完了zk维护的topic分区中所有消息)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
完整代码:
while(true){//长轮询拉取时间,1s:消费者拉取1s时间不管拉了多少条消息(除非时间内拉取完了zk维护的topic分区中所有消息)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到的消息:partition= %d,offset= %d,key= %s,value=%s %n",record.partition(),record.offset(),record.key(),record.value());}/*** 4.1手动提交:所有消息消费完再提交offset给broker中_consumer_offsets*/if(records.count()>0){//同步:阻塞,提交成功,等待broker的返回ackconsumer.commitAsync();//异步:提交完后不需要等待broker返回ack,直接往下走}}
如果两次poll的间隔>30s,集群会认为该消费者消费能力弱将其踢出,触发rebalance机制,消息交给消费组中的其他消费者
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
按照新方法进行消费消息
1.指定时间进行消息的消费
1.根据时间将topic中partition分区信息全部放入map中——>2.然后指定时间,封装topic和分区与时间到map中——>3.最后再将map添加到更高级的map,key为分区,如果有两个分区就是2个map——>4.最后遍历,然后提取出value并得到offset打印
2.指定分区开始从头消费+指定分区的偏移量开始消费
//指定分区消费consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));/*** 4.回溯消费消息(指定某分区从头开始消费)*/consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));/*** 4.1指定offset开始消费*/consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));consumer.seek(new TopicPartition(TOPIC_NAME,0),10);
新消费组的消费offset规则
新消费组在启动后,默认是从当前分区最后一条消息的offset+1开始消费,可以通过配置进行重新消费
/*** 2.13设置下次换了消费组还是按照offset记录继续消费*/properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"earliest");
kafka-offset手动提交和自动提交相关推荐
- AJAX设置光标离开自动提交,Ajax自动提交和刷新页面
所以,我是新来的ajax,我试图使用ajax和jquery提交表单,我想我有服务器端逻辑都想通了,因为当我加载页面它会自动提交并且页面刷新速度非常快.空白表单将进入数据库,但其中有很多内容,因为页面会 ...
- java mysql 自动提交_Mybatis的JDBC提交设置/关闭mysql自动提交------关于mysql自动提交引发的惨剧...
学习Mybatis时提到了JDBC方式需要自己手动提交事务,如果不加session.commit会导致数据库的数据无法正常插入(程序本身又不给你报错,还装出一副我已经插入成功的样子) SqlSessi ...
- Oracle数据库的显示提交与隐式提交,针对oracle工具的自动提交机制
显示提交与隐式提交: oracle 在进行插入(insert).修改(update)和删除(delete)需要 commit 才会生效.其余的操作不需要 commit 就会生效. 需要 commit ...
- oracle事务处理 自动提交
提交数据有三种类型: 显式提交.隐式提交及自动提交.下面分别说明这三种类型. 1.显式提交:用COMMIT命令直接完成的提交为显式提交.其格式为:SQL>COMMIT: 2.隐式提交: 用SQL ...
- 火车票订票插件,可自动登陆,自动刷票,自动提交
一年一度的抢票大战已经上演,因为现在铁道部改成分节点的放票策略, 打算在网上购票的朋友们要掌握好放票的时间,避免错过放票时间. 前几天在往上看到的别人介绍的一个Chrome的插件 ,可以自动登录火车 ...
- python kafka offset自动提交_Spring-Kafka —— 实现批量消费和手动提交offset
spring-kafka的官方文档介绍,可以知道自1.1版本之后, @KafkaListener开始支持批量消费,只需要设置batchListener参数为true 把application.yml中 ...
- Kafka的消息自动提交和手动提交
只说结论! 如果我们使用原始apache-kafka 依赖的API来消费数据: 如果enable.auto.commit为true,则表示自动提交,但不会在拉取数据之后立即提交.在一次poll的数据处 ...
- Kafka消费消息自动提交与手动提交
消费者poll消息得过程(poll的意思是从broker拿消息,并不代表拿到就消费成功了) 消费者建立了与broker之间的⻓连接,开始poll消息. 默认一次poll 500条消息 props.pu ...
- kafka自动提交offset失败:Auto offset commit failed
今天在服务日志中观察数据的消费情况时,发现了一个如下的警告,而且每隔几秒就会出现一次,虽然只是个警告, Auto offset commit failed for group order_group: ...
最新文章
- babylonjs 分部加载模型_使用 Babylon.js 在 HTML 页面加载 3D 对象
- python爬虫写入数据库_Python爬虫数据写入操作
- BSP和JSP里的UI元素ID生成逻辑
- Oracle10g数据库的树立
- Codeforces Round 258(Div. 2)
- 安卓磁链搜索下载播放和原理探索
- 程序猿软件开发保护眼睛,win7设置窗口护眼模式?
- 关于《2012年我的十大工程》双季进展情况报告总结
- unity获取obs虚拟摄像头
- lol大区服务器维护,LOL官宣“扩容升级”服务器,排队时间将大大减少,电一玩家喜大普奔!...
- android仿QQ优雅的修改App字体大小
- 如何实现Builder模式
- Win10 蓝屏 SFC /SCANNOW发现损坏文件
- C语言中return的作用
- 几种社会化分享工具总结
- 操作系统笔记(王道)(持续更新中)
- 马达震动测试软件,电机震动如何测试
- 聚丙烯酸负载小鼠血清白蛋白(MSA)/大鼠血清白蛋白(RSA)/小麦麦清白蛋白;PAA-MSA/RSA
- 【正点原子sys、delay、usart文件夹介绍】
- 写给地方网站的创业新手(转载)