kafka消息消费有延迟_简易实现kafka延迟消息
背景
当前业务存在以下场景:在一个事务内的最后一步是发送kafka消息,消费端收到通知后读取数据并做处理。但是由于kafka几乎是即时收到消息,导致偶尔出现“在发完kafka和提交事务的间隙,消费端收到了消息并读取到了事务提交前的数据”。
这个问题可以通过延迟消息来解决。
发送端 vs 消费端
要做延迟,那么首先要考虑的是:延迟放在发送端,还是放在消费端?最终选择放在消费端:让数据先被kafka存储起来,数据更安全。
想把延迟消息做成一个服务,不只是支持某一个场景/业务,在这种设计前提下,让延迟逻辑放在消费端,可以统一调整逻辑,也方便排查问题。
思路
是在整体外面包一层代理:另外创建一个延迟Topic,延迟消息都发到延迟Topic里。
有专门的服务来消费延迟Topic的消息,取到消息之后存储起来,定期检查消息是否已经延迟时间。
已到延迟时间的消息,重新发送到原先Topic。
这样做的好处是,不需要对kafka做任何改造。
存储
延迟队列消费者拉取到消息之后,要怎么存储?第三方存储,其需要满足以下几个条件:高性能:写入延迟要低,MQ的一个重要作用是削峰填谷,在选择临时存储时,写入性能必须要高,关系型数据库(如Mysql)通常不满足需求。
高可靠:延迟消息写入后,不能丢失,需要进行持久化,并进行备份
存储成本低:可以支持大量消息存储,(Redis存储成本太高)。
支持排序: 支持按照某个字段对消息进行排序,对于延迟消息需要按照时间进行排序。普通消息通常先发送的会被先消费,延迟消息与普通消息不同,需要进行排序。例如先发一条延迟10s的消息,再发一条延迟5s的消息,那么后发送的消息需要被先消费。
支持长时间保存:一些业务的延迟消息,需要延迟几个月,甚至更长,所以延迟消息必须能长时间保留。不过通常不建议延迟太长时间,存储成本比较大,且业务逻辑可能已经发生变化,已经不需要消费这些消息。
基于以上条件,选择了RocksDB来存储数据:高性能嵌入式KV存储引擎。
数据持久化到磁盘。
基于LMS存储,key自然排序,迭代器(Iterator)根据key顺序遍历。
代码
发送端
消息基类public class DelayMessage {
/**
* 事件唯一ID,用于去重检查
*/
private String eventId = UUIDGenerator.generateString();
/**
* 事件时间
*/
@JSONField(format = KafkaConstants.DATETIME_FORMAT)
private Date eventTime = new Date();
/**
* 真实事件时间
*/
@JSONField(format = KafkaConstants.DATETIME_FORMAT)
private Date actualTime;
/**
* 真实Topic
*/
private String actualTopic;
public Date getActualTime() {
return actualTime;
}
public T setActualTime(Date actualTime) {
this.actualTime = actualTime;
return (T) this;
}
public String getActualTopic() {
return actualTopic;
}
public T setActualTopic(String actualTopic) {
this.actualTopic = actualTopic;
return (T) this;
}
public Date getEventTime() {
return eventTime;
}
public T setEventTime(Date eventTime) {
this.eventTime = eventTime;
return (T) this;
}
}
消息对象继承DelayMessage,将消息发送到延迟Topic。
延迟服务消费端
接收延迟消息@KafkaListener(topics = {KafkaConstants.KAFKA_TOPIC_MESSAGE_DELAY}, containerFactory = "kafkaContainerFactory")
public boolean onMessage(String json) throws Throwable {
try {
DelayMessage delayMessage = deserialize(json, DelayMessage.class);
if (!isDelay(delayMessage)) {
// 如果接收到消息时,消息已经可以发送了,直接发送到实际的队列
sendActualTopic(delayMessage, json);
} else {
// 存储
localStorage(delayMessage, json);
}
} catch (Throwable e) {
log.error("consumer kafka delay message[{}] error!", json, e);
throw e;
}
return true;
}
private void sendActualTopic(DelayMessage delayMessage, String message) {
kafkaSender.send(message, delayMessage.getActualTopic());
}
@SneakyThrows
private void localStorage(DelayMessage delayMessage, String message) {
String key = generateRdbKey(delayMessage);
if (rocksDb.keyMayExist(RocksDbUtils.toByte(key), null)) {
return;
}
rocksDb.put(RocksDbUtils.toByte(key), RocksDbUtils.toByte(message));
}
private String generateRdbKey(DelayMessage delayMessage) {
return delayMessage.getActualTime().getTime() + RDB_KEY_SPLITTER + delayMessage.getEventId();
}
这里要注意生成key的方法:RocksDB是按key自然排序,迭代器遍历时是按key顺序遍历。
按时间来生成key,遍历时遇到第一个不符合的key,即可结束遍历。
key里加上消息ID,用以去重。
处理存储的延迟消息
启动定时任务(ScheduledExecutorService)定时检查消息。private void handleRdbMessage() {
try {
try (RocksIterator rocksIterator = rocksDb.newIterator()) {
for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) {
String key = "";
String value = "";
try {
byte[] keyByte = rocksIterator.key();
key = RocksDbUtils.toString(keyByte);
if (!isMessageExpired(key)) {
break;
}
value = RocksDbUtils.toString(rocksIterator.value());
DelayMessage delayMessage = JSON.parseObject(value, DelayMessage.class);
sendActualTopic(delayMessage, value);
rocksDb.delete(keyByte);
} catch (NumberFormatException e) {
// 异常key
log.error("handler kafka rocksdb delay message[{}:{}] NumberFormatException error!", key, value, e);
if (StringUtils.isNotBlank(key)) {
rocksDb.delete(RocksDbUtils.toByte(key));
}
} catch (Exception e) {
log.error("handler kafka rocksdb delay message[{}:{}] error!", key, value, e);
}
}
}
} catch (Exception e) {
// 捕获异常,否则ScheduledExecutorService会停止定时任务
log.error("handler kafka rocksdb delay message error!", e);
}
}
private boolean isMessageExpired(String rdbKey) {
long actualTime = Long.valueOf(rdbKey.split(RDB_KEY_SPLITTER)[0]);
return actualTime <= System.currentTimeMillis();
}
这里sendActualTopic和rocksDb.delete两个操作并不是原子性,但一般kafka消费端都会做防重复,所以也不会有问题。
其他
当前仅仅简易实现了延迟队列,还有很多需要完成完善的地方,比如:当前数据分散到不同的消费节点上,如果某一个节点服务器异常导致数据丢失,就只能人工介入,从kafka文件里获取数据;可通过部署不同的kafka group来达到数据备份,通过选主方式来决定哪一个group执行业务。
一条消息被存储三份:实际队列,延迟队列,RocksDB,可以通过操作kafka CommitLog的方式,让RocksDB里仅存储CommitLog offset 相关信息,减小RocksDB占用空间。
参考:
kafka消息消费有延迟_简易实现kafka延迟消息相关推荐
- kafka 重复消费和数据丢失_刨根问底,Kafka消息中间件到底会不会丢消息
大型互联网公司一般都会要求消息传递最大限度的不丢失,比如用户服务给代金券服务发送一个消息,如果消息丢失会造成用户未收到应得的代金券,最终用户会投诉. 为避免上面类似情况的发生,除了做好补偿措施,更应该 ...
- 企业微信推送消息延迟_企业微信发送应用消息的实现
企业号升级到企业微信后,发送应用消息的接口也变化了不少,除了原来的文本.图片.文件.语音.视频.图文消息等消息外,增加了文本卡片.markdown消息.小程序通知消息等内容,不过它们都可以共用一个接口 ...
- kafka中topic默认属性_分享:Kafka 的 Lag 计算误区及正确实现
前言 消息堆积是消息中间件的一大特色,消息中间件的流量削峰.冗余存储等功能正是得益于消息中间件的消息堆积能力.然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的 ...
- 企业微信推送消息延迟_企业微信发送应用消息,员工无法接收到推送消息。
请求消息体:[touser=18666211235,toparty=,totag=,agentid=1000040,msgtype=text,content=,media_id=,title=,des ...
- java 如何判定消息已在队列_【05期】消息队列中,如何保证消息的顺序性?
本文选自:advanced-java 作者:yanglbme 问:如何保证消息的顺序性? 面试官心理分析 其实这个也是用 MQ 的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保 ...
- kafka处理流式数据_通过Apache Kafka集成流式传输大数据
kafka处理流式数据 从实时过滤和处理大量数据,到将日志数据和度量数据记录到不同来源的集中处理程序中,Apache Kafka越来越多地集成到各种系统和解决方案中. 使用CData Sync ,可以 ...
- outlook收邮件延迟_如何计划或延迟在Outlook中发送电子邮件
outlook收邮件延迟 When you click Send on an email, it is typically sent immediately. But what if you want ...
- 租用美国服务器 解决java延迟_如何降低美国服务器延迟?美国服务器延迟多少算正常?...
美国服务器延迟多少算正常?很多人都说160ms到180ms.还有人说200ms到300ms.这些其实都没错,因为实际物理距离的问题,中国和美国都是东西跨度极大的国家.不同地区延迟上相差100ms都不为 ...
- mysql replication延迟_深入mysql主从复制延迟问题的详解
面试mysqldba的时候遇到一个题: 描述msyql replication 机制的实现原理,如何在不停掉mysql主库的情况下,恢复数据不一致的slave的数据库节点? MySQL的复制(repl ...
最新文章
- (C++)寻找1-100以内所有素数,复杂度为O(nsqrt(n))与O(nloglogn)的两种方法
- 基于卷积神经网络(CNN)的仙人掌图像分类
- 霍山职业学校16届计算机学生,霍山职高(安徽霍山职业学校)
- 周志华:“数据、算法、算力”人工智能三要素,在未来要加上“知识”| CCF-GAIR 2020...
- 【重大更新】DevExpress v17.1新版亮点(DevExtreme HTML5/JS篇)
- 从程序员到架构师的最佳技术成长之路
- Oracle分析函数FIRST_VALUE、LAST_VALUE
- 算法--组合数学:杨辉三角数学分析以及Java实现
- latex 波浪线_湖熟镇月牙刀波浪刀带哪家好厂家
- zsh 隐藏用户名和主机
- linux 配置path
- 2020华为软件精英挑战赛-有向图找环
- 怎样通过java用web3j查询以太坊交易信息?
- Linux——常用文件管理命令(必会)
- jeesit 可以用俩种导出
- 数据外推算法 c语言,[原创]如何进行内插法和外推法的计算
- MySQL: GTID简介,gtid_executed和gtid_purged概念
- AliCloud Duplicity
- 第2节Socket介绍
- 紫米10000mAh智能移动电源APP功能分析报告
热门文章
- 输入法(IME)实现原理
- lol服务器维护8.21,lol8.21版本更新了什么 lol8.21版本更新内容一览
- itextpdf 怎么下划线_java – 带有粗体和下划线的Itext新字体
- ae怎么设置gpu渲染_AE/PR mac版如何开启GPU加速渲染?
- 外国人眼中最好的五个第三方 DNS 服务器
- 读过的laravel文章
- 【观察】 2016年度中国企业级市场十大新闻
- dhcp服务器显示未连接,提示本地连接未启用dhcp怎么办?本地连接未启用dhcp的解决方法...
- 关于日期天数计算的函数(转)
- 电子商务系统怎么开发,有哪些流程_OctShop