消费者poll消息得过程(poll的意思是从broker拿消息,并不代表拿到就消费成功了)

  • 消费者建立了与broker之间的⻓连接,开始poll消息。
  • 默认一次poll 500条消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500 );

可以根据消费速度的快慢来设置,因为如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。

可以通过这个值进行设置:

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000 );

如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次⻓轮询结束。

ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis( 1000 ));

消费者发送心跳的时间间隔

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 );

kafka如果超过 10 秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000 );

自动提交offset

消费者poll到消息后默认情况下,会自动向broker的_consumer_offsets主题提交当前主题-分区消费的偏移量。

自动提交会丢消息: 因为如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此 时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息。之前未被消费的消息就丢失掉了。

手动提交offset

手动提交分为手动同步提交与手动异步提交

很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。开启手动提交功能的前提是消费者客户端参数enable.auto.commit 配置为false ,示例如下:

properties.put("enable.auto.commit", "false");

手动同步提交

手动提交可以细分为同步提交和异步提交,对应于KafkaConsumer 中的commitSync()和commitAsync()两种类型的方法。我们这里先讲述同步提交的方式commitSync()方法
定义如下:

public void commitSync()

这个方法很简单,下面使用它演示同步提交的简单用法:

while(true) {
        ConsumerRecords<String , String> records= consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
        //do some logical processing .
        }
        consumer.commitSync() ;
    }

可以看到示例中先对拉取到的每一条消息做相应的逻辑处理,然后对整个消息集做同步提交。参考KafkaConsumer 源码中提供的示例,针对上面的示例还可以修改为批量处理+批量提交的方式, 关键代码如下:

final int minBatchSize=200;
List<ConsumerRecord> buffer=new ArrayList<>();
while(isRunning.get()){ConsumerRecord<String,String> records=consumer.poll(1000);for (ConsumerRecord<String,String> record:records){buffer.add(record);}if (buffer.size()>=minBatchSize){consumer.commitSync();buffer.clear();}
}

上面的示例中将拉取到的消息存入缓存buffer,等到积累到足够多的时候,也就是示例中大于等于200 个的时候,再做相应的批量处理,之后再做批量提交。这两个示例都有重复消费的问题,如果在业务逻辑处理完之后,并且在同步位移提交前,程序出现了崩渍, 那么待恢复之后又只能从上一次位移提交的地方拉取消息,由此在两次位移提交的窗口中出现了重复消费的现象。

commitSync ()方法会根据poll()方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误( Unrecoverable Eηor ),它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如CommitFailedException 、WakeupException 、InterruptException 、AuthenticationException 、AuthorizationException 等,我们可以将其捕获并做针对性的处理。对于采用commitSync()的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批
次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个含参方法,具体定义如下:

public void commitSync(final Map<TopicPartition , OffsetAndMetadata> offsets)

该方法提供了一个offsets 参数, 用来提交指定分区的位移。无参的commitSync()方法只能提交当前批次对应的position 值。如果需要提交一个中间值,比如业务每消费一条消息就提交一次位移,那么就可以使用这种方式

while ((isRunning.get())){ConsumerRecords<String,String> records=consumer.poll(1000);for (ConsumerRecord<String,String> record:records){long offset=record.offset();TopicPartition partition=new TopicPartition(record.topic() ,record.partition());consumer.commitSync(Collections.singletonMap(partition,new offsetAndMetdata(offset+1)));}
}

在实际应用中,很少会有这种每消费一条消息就提交一次消费位移的必要场景。commitSync()方法本身是同步执行的,会耗费一定的性能,而示例中的这种提交方式会将性能拉到一个相当低的点。更多时候是按照分区的粒度划分提交位移的界限,这里我们就要用到了ConsumerRecords 类的partitions()方法和records(TopicPartition)方法,

异步提交的方式( commitAsync())
在执行的时候消费者线程不会被阻塞, 可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以便消费者的性能得到一定的增强。commitAsync 方法有三个不同的重载方法,具体定义如下:

public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition , OffsetAndMetadata> offsets , OffsetCommitCallback callback)

第一个无参的方法和第三个方法中的offsets 都很好理解,对照commitSync()方法即可。关键的是这里的第二个方法和第三个方法中的callback 参数,它提供了一个异步提交的回调方法,当位移提交完成后会回调OffsetCommitCallback 中的onComplete()方法。

commitAsync()提交的时候同样会有失败的情况发生,那么我们应该怎么处理呢?读者有可能想到的是重试,问题的关键也就在这里了。如果某一次异步提交的消费位移为x , 但是提交失败了,然后下一次又异步提交了消费位移为x+y,这次成功了。如果这里引入了重试机制,前一次的异步提交的消费位移在重试的时候提交成功了,那么此时的消费位移又变为了x 。如果此时发生异常(或者再均衡) , 那么恢复之后的消费者(或者新的消费者)就会从x 处开始消费消息,这样就发生了重复消费的问题。
为此我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小,如果前者小于后者,则说明有更大的位移己经提交了,不需要再进行本次重试:如果两者相同,则说明可以进行重试提交。除非程序编码错误,否则不会出现前者大于后者的情况。如果位移提交失败的情况经常发生,那么说明系统肯定出现了故障,在-般情况下,位移提交失败的情况很少发生,不重试也没有关系,后面的提交也会有成功的。重试会增加代码逻辑的复杂度,不重试会增加重复消费的概率。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;如果消费者正常退出或发生再均衡的情况,那么可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。

try{while(isRunning.get()){consumer.commitAsync();}
}finally {try{consumer.commitSync();}finally {{consumer.close();}}
}

指定分区消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));

消息回溯消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0 )));

指定offset消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
consumer.seek(new TopicPartition(TOPIC_NAME, 0 ), 10 );

从指定时间点消费

List<PartitionInfo> topicPartitions =consumer.partitionsFor(TOPIC_NAME);
//从 1 小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60 ;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()),fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap =consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;Long offset = value.offset();System.out.println("partition-" + key.partition() +"|offset-" + offset);System.out.println();//根据消费里的timestamp确定offsetif (value != null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}
}

新消费组得消费偏移量

当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费?

  • latest(默认) :只消费自己启动之后发送到主题的消息
  • earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Kafka消费消息自动提交与手动提交相关推荐

  1. jdbc的事务自动提交和手动提交,以及mybatis开启自动提交后是否会复用一个连接的验证

    jdbc的sql执行默认是自动提交事务 默认时自动提交,当我手动提交时会报错,不过数据还是会插入到数据库中,因为执行完executeupdate后数据库会自动commit 开启手动提交 手动提交则需要 ...

  2. Kafka的消息自动提交和手动提交

    只说结论! 如果我们使用原始apache-kafka 依赖的API来消费数据: 如果enable.auto.commit为true,则表示自动提交,但不会在拉取数据之后立即提交.在一次poll的数据处 ...

  3. Kafka手动提交偏移量的作用到底是什么???

    手动提交偏移量的原因 最近拜读了很多文章,都谈到为了保证消息的安全消费(避免消息丢失和消息重复读取),建议消费者客户端手动提交偏移量.具体如下: 1.当设置为自动提交时,当kafka消费者读取到消息后 ...

  4. kafka-offset手动提交和自动提交

    目录 首先回顾之前的知识点 自动提交offset 手动提交 消费者poll消息的细节 完整代码: 按照新方法进行消费消息 1.指定时间进行消息的消费 2.指定分区开始从头消费+指定分区的偏移量开始消费 ...

  5. KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

    文章目录 一.基础集成 1. 技术选型 2. 导入依赖 3. kafka配置 4. auto-offset-reset 简述 5. 新增一个订单类 6. 生产者(异步) 7. 消费者 8. kafka ...

  6. kafka Java客户端之 consumer API 消费消息

    背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...

  7. kafka:消息发送以及消费的过程

    摘要 kafka的存储消息,生产者发送消息,消费者消费消息.这些看起来简单,但实际细想,会有很多问题需要解决:消息是单个单个发送还是批量发送?broker的主题里一有消息就立即推送给消费者吗?生产者的 ...

  8. KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)

    文章目录 1. 技术选型 2. 导入依赖 3. kafka配置 4. 生产者(同步) 5. 生产者(异步) 6. 消费者 1. 技术选型 软件/框架 版本 jdk 1.8.0_202 springbo ...

  9. kafka的消息丢失和重复消费解决办法

    1.消息发送 Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置.Kafka通过配置request.required.ac ...

最新文章

  1. 条件随机场概率无向图模型的因子分解
  2. ApiCloud云端管理平台(v.20151022)
  3. 翻译 github上How to be a good programmer
  4. (七)boost库之单例类
  5. VARIANT变体类型数据
  6. IntelliJ Idea 常用12款插件(提高开发效率),附优秀主题插件
  7. python钓鱼网站_Python+MySQL获取PhishTank的钓鱼网站列表作业笔记
  8. redo log 和undo log_MySQL 持久化保障机制-redo 日志
  9. 神经网络控制器设计原理,神经网络控制系统设计
  10. python实战项目分析2—物流
  11. java flv转mp3_Java调用ffmpeg转换视频格式为flv的示例详解
  12. 计算机奥赛金牌排名,2019五大学科竞赛含金量排名
  13. MATLAB批量绘图
  14. 电脑控制android 源代码,[源代码]电脑通过adb控制安卓手机
  15. 巨准拓客CRM【工商财税】行业获客解决方案
  16. 学习笔记:UDP实现进程心跳检测
  17. css中col-lg-1是什么意思,css - 引导程序中“col-md-4”,“col-xs-1”,“col-lg-2”中数字的含义...
  18. #include int inc(int a) { return(++a); } int multi(int*a,int*b,int*c) { return(*c=*a**b); }
  19. 分布式系统理论基础 - CAP
  20. 计算机考试老师怎么评卷,你写的字VS电脑阅卷呈现的字,两者相差悬殊,网友:机器尽力了...

热门文章

  1. 『驻外』工作,不是只有非洲
  2. 用“真快乐”切入社交电商后,国美做“家装”能否多点开花?
  3. python语言中不属于组合数据类型的是_下列不属于组合数据类型的是()。
  4. 解决mingw-w64外网下载太慢问题,离线包安装配置过程讲解
  5. Linux下的motion detection(最简单的办公室监控系统) 邮件自动发送
  6. 克服“讨好型人格”,你需要《被讨厌的勇气》
  7. R包SPOTlight安装(新手向)
  8. 接连霸榜,这门国产编程语言是真的强!
  9. OpenStack基金会项目Airship的新成员要做什么?
  10. 电容触摸屏驱动---基于FT5406