目录

首先回顾之前的知识点

自动提交offset

手动提交

消费者poll消息的细节

完整代码:

按照新方法进行消费消息

1.指定时间进行消息的消费

2.指定分区开始从头消费+指定分区的偏移量开始消费

新消费组的消费offset规则


首先回顾之前的知识点

消费者消费消息,每消费offset+1,然后提交offset给到我们kafka中topic中的cousumer_offsets,该消费者宕机后,另外的消费者就会读取consumer_offsets读取我们的offset消费后面的消息

我们kafka消费者是自动拉取消息的,mq是队列push给消费者

自动提交:消息poll下来后(还没有消费)直接提交offset,速度很快,可能出现消费失败

手动提交:在消息消费时/消费后再提交offset

自动提交offset

缺点:可能会丢消息,比如消费者poll了topic中partition的消息后,然后提交offset,可能消费者没有消费成功

提交的内容offset——>消费组+topic+offset

自动提交的配置

   /*** 1.1设置是否自动提交offset并设置offset的间隔时间*/properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

一poll就提交offset了

手动提交

分为手动同步提交+手动异步提交

手动同步提交:消息消费完后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示成功

  consumer.commitAsync();

手动异步提交:不需要等集群返回ack,直接执行后序的逻辑即可,我们可以设置一个回调方法

消费者poll消息的细节

定义:消费者会根据设置的消费时间来决定消费多少消息

properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500)//拉取0.5s消息

默认消费者一次性poll500条信息(长轮询时间为1s),如果时间内poll了500条就结束for循环

 //长轮询拉取时间,1s:消费者拉取1s时间不管拉了多少条消息(除非时间内拉取完了zk维护的topic分区中所有消息)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

完整代码:

 while(true){//长轮询拉取时间,1s:消费者拉取1s时间不管拉了多少条消息(除非时间内拉取完了zk维护的topic分区中所有消息)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到的消息:partition= %d,offset= %d,key= %s,value=%s %n",record.partition(),record.offset(),record.key(),record.value());}/*** 4.1手动提交:所有消息消费完再提交offset给broker中_consumer_offsets*/if(records.count()>0){//同步:阻塞,提交成功,等待broker的返回ackconsumer.commitAsync();//异步:提交完后不需要等待broker返回ack,直接往下走}}

如果两次poll的间隔>30s,集群会认为该消费者消费能力弱将其踢出,触发rebalance机制,消息交给消费组中的其他消费者

  properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

按照新方法进行消费消息

1.指定时间进行消息的消费

1.根据时间将topic中partition分区信息全部放入map中——>2.然后指定时间,封装topic和分区与时间到map中——>3.最后再将map添加到更高级的map,key为分区,如果有两个分区就是2个map——>4.最后遍历,然后提取出value并得到offset打印

2.指定分区开始从头消费+指定分区的偏移量开始消费

        //指定分区消费consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));/*** 4.回溯消费消息(指定某分区从头开始消费)*/consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));/*** 4.1指定offset开始消费*/consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));consumer.seek(new TopicPartition(TOPIC_NAME,0),10);

新消费组的消费offset规则

新消费组在启动后,默认是从当前分区最后一条消息的offset+1开始消费,可以通过配置进行重新消费

  /*** 2.13设置下次换了消费组还是按照offset记录继续消费*/properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"earliest");

kafka-offset手动提交和自动提交相关推荐

  1. AJAX设置光标离开自动提交,Ajax自动提交和刷新页面

    所以,我是新来的ajax,我试图使用ajax和jquery提交表单,我想我有服务器端逻辑都想通了,因为当我加载页面它会自动提交并且页面刷新速度非常快.空白表单将进入数据库,但其中有很多内容,因为页面会 ...

  2. java mysql 自动提交_Mybatis的JDBC提交设置/关闭mysql自动提交------关于mysql自动提交引发的惨剧...

    学习Mybatis时提到了JDBC方式需要自己手动提交事务,如果不加session.commit会导致数据库的数据无法正常插入(程序本身又不给你报错,还装出一副我已经插入成功的样子) SqlSessi ...

  3. Oracle数据库的显示提交与隐式提交,针对oracle工具的自动提交机制

    显示提交与隐式提交: oracle 在进行插入(insert).修改(update)和删除(delete)需要 commit 才会生效.其余的操作不需要 commit 就会生效. 需要 commit ...

  4. oracle事务处理 自动提交

    提交数据有三种类型: 显式提交.隐式提交及自动提交.下面分别说明这三种类型. 1.显式提交:用COMMIT命令直接完成的提交为显式提交.其格式为:SQL>COMMIT: 2.隐式提交: 用SQL ...

  5. 火车票订票插件,可自动登陆,自动刷票,自动提交

    一年一度的抢票大战已经上演,因为现在铁道部改成分节点的放票策略, 打算在网上购票的朋友们要掌握好放票的时间,避免错过放票时间.  前几天在往上看到的别人介绍的一个Chrome的插件 ,可以自动登录火车 ...

  6. python kafka offset自动提交_Spring-Kafka —— 实现批量消费和手动提交offset

    spring-kafka的官方文档介绍,可以知道自1.1版本之后, @KafkaListener开始支持批量消费,只需要设置batchListener参数为true 把application.yml中 ...

  7. Kafka的消息自动提交和手动提交

    只说结论! 如果我们使用原始apache-kafka 依赖的API来消费数据: 如果enable.auto.commit为true,则表示自动提交,但不会在拉取数据之后立即提交.在一次poll的数据处 ...

  8. Kafka消费消息自动提交与手动提交

    消费者poll消息得过程(poll的意思是从broker拿消息,并不代表拿到就消费成功了) 消费者建立了与broker之间的⻓连接,开始poll消息. 默认一次poll 500条消息 props.pu ...

  9. kafka自动提交offset失败:Auto offset commit failed

    今天在服务日志中观察数据的消费情况时,发现了一个如下的警告,而且每隔几秒就会出现一次,虽然只是个警告, Auto offset commit failed for group order_group: ...

最新文章

  1. babylonjs 分部加载模型_使用 Babylon.js 在 HTML 页面加载 3D 对象
  2. python爬虫写入数据库_Python爬虫数据写入操作
  3. BSP和JSP里的UI元素ID生成逻辑
  4. Oracle10g数据库的树立
  5. Codeforces Round 258(Div. 2)
  6. 安卓磁链搜索下载播放和原理探索
  7. 程序猿软件开发保护眼睛,win7设置窗口护眼模式?
  8. 关于《2012年我的十大工程》双季进展情况报告总结
  9. unity获取obs虚拟摄像头
  10. lol大区服务器维护,LOL官宣“扩容升级”服务器,排队时间将大大减少,电一玩家喜大普奔!...
  11. android仿QQ优雅的修改App字体大小
  12. 如何实现Builder模式
  13. Win10 蓝屏 SFC /SCANNOW发现损坏文件
  14. C语言中return的作用
  15. 几种社会化分享工具总结
  16. 操作系统笔记(王道)(持续更新中)
  17. 马达震动测试软件,电机震动如何测试
  18. 聚丙烯酸负载小鼠血清白蛋白(MSA)/大鼠血清白蛋白(RSA)/小麦麦清白蛋白;PAA-MSA/RSA
  19. 【正点原子sys、delay、usart文件夹介绍】
  20. 写给地方网站的创业新手(转载)

热门文章

  1. 解决跨域的8种最常用方法(附终极通用大招)
  2. OSPF协议之链路数据库同步
  3. Vue数据代理的原理
  4. pwnable 笔记 Toddler's Bottle -passcode
  5. Deepin20搜狗输入法安装的解决办法
  6. 面试复盘:2020-05-30
  7. 10月第4周业务风控关注|多部门联合调查教育类App:重点排查游戏、打赏等内容
  8. logback.xml中additivity的使用
  9. 网友今生国际米兰制作的张版倚天片花
  10. 2017/11/28车辆限号日期查询