KafkaConsumer实现精确的latest(结尾)开始消费
KafkaConsumer实现精确的latest开始消费
- 代码
- 需求
- 分析
- 优化
- 总结
代码
代码:https://gitee.com/ydfind/java-total/tree/master/debug-example
需求
调用某个接口,返回结果包含traceId和商品集合,然后根据该traceId去kafka查找debug日志,包含该traceId的日志,数量是商品集合的数量(每个商品都有对应的一条kafka日志,该日志即为商品相关的信息),对日志进行处理后,返回前端展示。
分析
因为拿到traceId的时候,对方其实已经把debug信息发送到kafka了,所以我在拿到traceId,再去kafka查找就要设置auto.offset.reset=earliest, 这就会导致我多查询很多的无效数据。
假如我在调用接口前,先订阅该topic,使用默认auto_offset_reset为latest,在拿到traceId再poll(),结果怎么样呢?不行,因为在poll时发现position为空才会根据auto_offset_reset策略设置,而poll前对方接口早就把信息发到kafka了,即消息被错过了
后面发现可以利用seek函数实现这个功能:
consumer.subscribe(Collections.singletonList("topic"));Set<TopicPartition> topicPartitions = null;while (CollectionUtils.isEmpty(topicPartitions)) {consumer.poll(Duration.ofMillis(100));topicPartitions = consumer.assignment();}Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitions);for (TopicPartition partition: topicPartitions) {consumer.seek(partition, topicPartitionLongMap.get(partition));}
我们发现有seekToEnd函数,但其实并不能实现我们的需求,具体看源码:
@Overridepublic void seekToEnd(Collection<TopicPartition> partitions) {if (partitions == null)throw new IllegalArgumentException("Partitions collection cannot be null");acquireAndEnsureOpen();try {Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;for (TopicPartition tp : parts) {log.debug("Seeking to end of partition {}", tp);// 这里只是把对应分区的offset重置策略设为LATEST,但其实默认就是这个subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);}} finally {release();}}
我们可以看到 只是把对应分区的offset重置策略设为LATEST,但其实默认就是这个。
优化
因为我是用户组里只有一个用户消费,即该KafkaConsumer默认分配所有的分区,可以改为下面实现:
List<PartitionInfo> topic = consumer.partitionsFor("topic");List<TopicPartition> topicPartitions = topic.stream().map(item -> new TopicPartition(item.topic(), item.partition())).collect(Collectors.toList());// 订阅所有分区consumer.assign(topicPartitions);Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitions);for (TopicPartition partition: topicPartitions) {consumer.seek(partition, topicPartitionLongMap.get(partition));}
发现上面需要4s多的时间,而优化后1s不到!
总结
实现上面的需求的方法:
1.auto.offset.reset设置为earliest,但可能会多消费很多消息; // 具体多消费的消息不清楚
2.auto.offset.reset默认,利用seek函数来定位到最近的,若手动订阅所有的分区,会快很多!
其它:
1.其实还有offsetForTime函数,但具体没有尝试过
KafkaConsumer实现精确的latest(结尾)开始消费相关推荐
- 【Kafka】(四)Kafka使用 Consumer 接收消息消费
Consumer概要 consumer中的关键术语: 消费者(consumer):从kafka中拉取数据并进行处理 消费者组(consumer group):一个消费者组由一个或者多个consumer ...
- kafka消息的分发与消费(一)
关于 Topic 和 Partition: Topic: 在 kafka 中,topic 是一个存储消息的逻辑概念,可以认为是一个消息集合.每条消息发送到 kafka 集群的消息都有一个类别.物理上来 ...
- rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?
上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下: 这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/R ...
- kafka消费组查看和删除
生产环境监控发现kafka存在大量消费组 查看消费组信息: ./kafka-consumer-groups.sh --bootstrap-server ip:port --list 查看特定消费组信息 ...
- 聊聊Kafka(三)Kafka消费者与消费组
Kafka消费者与消费组 简介 消费者 概念入门 消费者.消费组 心跳机制 消息接收 必要参数配置 订阅 反序列化 位移提交 消费者位移管理 再均衡 避免重平衡 消费者拦截器 消费组管理 什么是消费者 ...
- 怎么理解 Kafka 消费者与消费组之间的关系?
与生产者对应的是消费者,应用程序可以通过 KafkaConsumer 来订阅主题,并从订阅的主题中拉取消息.不过在使用 KafkaConsumer 消费消息之前需要先了解消费者和消费组的概念,否则无法 ...
- 怎么理解Kafka消费者与消费组之间的关系?
与生产者对应的是消费者,应用程序可以通过 KafkaConsumer 来订阅主题,并从订阅的主题中拉取消息.不过在使用 KafkaConsumer 消费消息之前需要先了解消费者和消费组的概念,否则无法 ...
- 【MQ】Kafka笔记
笔记来源:尚硅谷视频笔记2.0版+2.1版 黑马视频:Kafka深入探秘者来了 kafka笔记地址:https://blog.csdn.net/hancoder/article/details/107 ...
- Kafka学习之消费者
Kafka学习之消费者 前言 本博客主要介绍up在学习kafka中间件时候觉得需要记录的知识点. 内容 1.消费者与消费组 消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订 ...
- kafka的auto.offset.reset详解与测试
取值及定义# auto.offset.reset有以下三个可选值: latest (默认) earliest none 三者均有共同定义: 对于同一个消费者组,若已有提交的offset,则从提交的of ...
最新文章
- windows 本地安全设置 灰色_安全检查与加固
- ONES 万事联合创始人 amp; CTO 冯斌:企业服务产品的探索实践
- python实现名片管理系统在哪里_python实现名片管理系统项目
- 如何突破你的“内在阻力”,让你渴望多年的梦想都能达成,并创造超越想像极限的“全方位成功”?突破内在阻力全方位成功...
- .NET开发者常会忽略的几个错误
- CH - 0805 防线(二分+思维)
- java config 类_Spring ----JavaConfig类代替XML配置Bean
- 如何修改Vs2008环境变量
- CVE-2020-1472 域内提权利用(域提权)
- 设计灵感|展览海报如何排版?好的作品给你灵感
- html下拉表覆盖透明,css透明元素如何遮挡住fixed元素
- mysql 库存预警_仓库管理中如何实现库存预警
- Kotlin的基本数据类型
- 线性代数知识荟萃(3)——行列式
- 系统重温Pandas笔记:(六)连接
- 2020 mit6.s081 os Lab: xv6 traps
- MFC CImageList序列图的用法
- dskinlite(uieasy mfc界面库)使用记录4:绘制动态元素(listbox)
- centos7安装bbr_Centos7下,BBR修正版 一键安装与使用
- 7 centos 配置sudo权限_CentOS7 配置sudo并使用