KafkaConsumer实现精确的latest开始消费

  • 代码
  • 需求
  • 分析
  • 优化
  • 总结

代码

代码:https://gitee.com/ydfind/java-total/tree/master/debug-example

需求

调用某个接口,返回结果包含traceId和商品集合,然后根据该traceId去kafka查找debug日志,包含该traceId的日志,数量是商品集合的数量(每个商品都有对应的一条kafka日志,该日志即为商品相关的信息),对日志进行处理后,返回前端展示。

分析

因为拿到traceId的时候,对方其实已经把debug信息发送到kafka了,所以我在拿到traceId,再去kafka查找就要设置auto.offset.reset=earliest, 这就会导致我多查询很多的无效数据。

假如我在调用接口前,先订阅该topic,使用默认auto_offset_reset为latest,在拿到traceId再poll(),结果怎么样呢?不行,因为在poll时发现position为空才会根据auto_offset_reset策略设置,而poll前对方接口早就把信息发到kafka了,即消息被错过了

后面发现可以利用seek函数实现这个功能:

consumer.subscribe(Collections.singletonList("topic"));Set<TopicPartition> topicPartitions = null;while (CollectionUtils.isEmpty(topicPartitions)) {consumer.poll(Duration.ofMillis(100));topicPartitions = consumer.assignment();}Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitions);for (TopicPartition partition: topicPartitions) {consumer.seek(partition, topicPartitionLongMap.get(partition));}

我们发现有seekToEnd函数,但其实并不能实现我们的需求,具体看源码:

@Overridepublic void seekToEnd(Collection<TopicPartition> partitions) {if (partitions == null)throw new IllegalArgumentException("Partitions collection cannot be null");acquireAndEnsureOpen();try {Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;for (TopicPartition tp : parts) {log.debug("Seeking to end of partition {}", tp);// 这里只是把对应分区的offset重置策略设为LATEST,但其实默认就是这个subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);}} finally {release();}}

我们可以看到 只是把对应分区的offset重置策略设为LATEST,但其实默认就是这个。

优化

因为我是用户组里只有一个用户消费,即该KafkaConsumer默认分配所有的分区,可以改为下面实现:

 List<PartitionInfo> topic = consumer.partitionsFor("topic");List<TopicPartition> topicPartitions = topic.stream().map(item -> new TopicPartition(item.topic(), item.partition())).collect(Collectors.toList());// 订阅所有分区consumer.assign(topicPartitions);Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitions);for (TopicPartition partition: topicPartitions) {consumer.seek(partition, topicPartitionLongMap.get(partition));}

发现上面需要4s多的时间,而优化后1s不到!

总结

实现上面的需求的方法:
1.auto.offset.reset设置为earliest,但可能会多消费很多消息; // 具体多消费的消息不清楚
2.auto.offset.reset默认,利用seek函数来定位到最近的,若手动订阅所有的分区,会快很多!

其它:
1.其实还有offsetForTime函数,但具体没有尝试过

KafkaConsumer实现精确的latest(结尾)开始消费相关推荐

  1. 【Kafka】(四)Kafka使用 Consumer 接收消息消费

    Consumer概要 consumer中的关键术语: 消费者(consumer):从kafka中拉取数据并进行处理 消费者组(consumer group):一个消费者组由一个或者多个consumer ...

  2. kafka消息的分发与消费(一)

    关于 Topic 和 Partition: Topic: 在 kafka 中,topic 是一个存储消息的逻辑概念,可以认为是一个消息集合.每条消息发送到 kafka 集群的消息都有一个类别.物理上来 ...

  3. rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?

    上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下: 这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/R ...

  4. kafka消费组查看和删除

    生产环境监控发现kafka存在大量消费组 查看消费组信息: ./kafka-consumer-groups.sh --bootstrap-server ip:port --list 查看特定消费组信息 ...

  5. 聊聊Kafka(三)Kafka消费者与消费组

    Kafka消费者与消费组 简介 消费者 概念入门 消费者.消费组 心跳机制 消息接收 必要参数配置 订阅 反序列化 位移提交 消费者位移管理 再均衡 避免重平衡 消费者拦截器 消费组管理 什么是消费者 ...

  6. 怎么理解 Kafka 消费者与消费组之间的关系?

    与生产者对应的是消费者,应用程序可以通过 KafkaConsumer 来订阅主题,并从订阅的主题中拉取消息.不过在使用 KafkaConsumer 消费消息之前需要先了解消费者和消费组的概念,否则无法 ...

  7. 怎么理解Kafka消费者与消费组之间的关系?

    与生产者对应的是消费者,应用程序可以通过 KafkaConsumer 来订阅主题,并从订阅的主题中拉取消息.不过在使用 KafkaConsumer 消费消息之前需要先了解消费者和消费组的概念,否则无法 ...

  8. 【MQ】Kafka笔记

    笔记来源:尚硅谷视频笔记2.0版+2.1版 黑马视频:Kafka深入探秘者来了 kafka笔记地址:https://blog.csdn.net/hancoder/article/details/107 ...

  9. Kafka学习之消费者

    Kafka学习之消费者 前言 本博客主要介绍up在学习kafka中间件时候觉得需要记录的知识点. 内容 1.消费者与消费组 消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订 ...

  10. kafka的auto.offset.reset详解与测试

    取值及定义# auto.offset.reset有以下三个可选值: latest (默认) earliest none 三者均有共同定义: 对于同一个消费者组,若已有提交的offset,则从提交的of ...

最新文章

  1. windows 本地安全设置 灰色_安全检查与加固
  2. ONES 万事联合创始人 amp; CTO 冯斌:企业服务产品的探索实践
  3. python实现名片管理系统在哪里_python实现名片管理系统项目
  4. 如何突破你的“内在阻力”,让你渴望多年的梦想都能达成,并创造超越想像极限的“全方位成功”?突破内在阻力全方位成功...
  5. .NET开发者常会忽略的几个错误
  6. CH - 0805 防线(二分+思维)
  7. java config 类_Spring ----JavaConfig类代替XML配置Bean
  8. 如何修改Vs2008环境变量
  9. CVE-2020-1472 域内提权利用(域提权)
  10. 设计灵感|展览海报如何排版?好的作品给你灵感
  11. html下拉表覆盖透明,css透明元素如何遮挡住fixed元素
  12. mysql 库存预警_仓库管理中如何实现库存预警
  13. Kotlin的基本数据类型
  14. 线性代数知识荟萃(3)——行列式
  15. 系统重温Pandas笔记:(六)连接
  16. 2020 mit6.s081 os Lab: xv6 traps
  17. MFC CImageList序列图的用法
  18. dskinlite(uieasy mfc界面库)使用记录4:绘制动态元素(listbox)
  19. centos7安装bbr_Centos7下,BBR修正版 一键安装与使用
  20. 7 centos 配置sudo权限_CentOS7 配置sudo并使用

热门文章

  1. 【模式匹配】之 —— Z-BOX算法
  2. Openerp部分学习资料
  3. 2019百日打卡DAY12
  4. 闲聊一下android 3D 网络游戏
  5. 在yandex投放广告的话,需要注册俄罗斯常用的域名吗?
  6. 组网胖模式_华三无线apEWP-WA4320i-acn-fit 如何由瘦模式改为胖模式
  7. mysql 库存超卖_mysql处理高并发,防止库存超卖
  8. Jupyter关联规则挖掘-莫名其妙的问题
  9. 2018杭州·云栖大会:一文直击地表最强黑科技
  10. 麻省理工学院公开课:信号与系统:模拟与数字信号处理 调幅演示