背景

当前业务存在以下场景:在一个事务内的最后一步是发送kafka消息,消费端收到通知后读取数据并做处理。但是由于kafka几乎是即时收到消息,导致偶尔出现“在发完kafka和提交事务的间隙,消费端收到了消息并读取到了事务提交前的数据”。

这个问题可以通过延迟消息来解决。

发送端 vs 消费端

要做延迟,那么首先要考虑的是:延迟放在发送端,还是放在消费端?最终选择放在消费端:让数据先被kafka存储起来,数据更安全。

想把延迟消息做成一个服务,不只是支持某一个场景/业务,在这种设计前提下,让延迟逻辑放在消费端,可以统一调整逻辑,也方便排查问题。

思路

是在整体外面包一层代理:另外创建一个延迟Topic,延迟消息都发到延迟Topic里。

有专门的服务来消费延迟Topic的消息,取到消息之后存储起来,定期检查消息是否已经延迟时间。

已到延迟时间的消息,重新发送到原先Topic。

这样做的好处是,不需要对kafka做任何改造。

存储

延迟队列消费者拉取到消息之后,要怎么存储?第三方存储,其需要满足以下几个条件:高性能:写入延迟要低,MQ的一个重要作用是削峰填谷,在选择临时存储时,写入性能必须要高,关系型数据库(如Mysql)通常不满足需求。

高可靠:延迟消息写入后,不能丢失,需要进行持久化,并进行备份

存储成本低:可以支持大量消息存储,(Redis存储成本太高)。

支持排序: 支持按照某个字段对消息进行排序,对于延迟消息需要按照时间进行排序。普通消息通常先发送的会被先消费,延迟消息与普通消息不同,需要进行排序。例如先发一条延迟10s的消息,再发一条延迟5s的消息,那么后发送的消息需要被先消费。

支持长时间保存:一些业务的延迟消息,需要延迟几个月,甚至更长,所以延迟消息必须能长时间保留。不过通常不建议延迟太长时间,存储成本比较大,且业务逻辑可能已经发生变化,已经不需要消费这些消息。

基于以上条件,选择了RocksDB来存储数据:高性能嵌入式KV存储引擎。

数据持久化到磁盘。

基于LMS存储,key自然排序,迭代器(Iterator)根据key顺序遍历。

代码

发送端

消息基类public class DelayMessage {

/**

* 事件唯一ID,用于去重检查

*/

private String eventId = UUIDGenerator.generateString();

/**

* 事件时间

*/

@JSONField(format = KafkaConstants.DATETIME_FORMAT)

private Date eventTime = new Date();

/**

* 真实事件时间

*/

@JSONField(format = KafkaConstants.DATETIME_FORMAT)

private Date actualTime;

/**

* 真实Topic

*/

private String actualTopic;

public Date getActualTime() {

return actualTime;

}

public T setActualTime(Date actualTime) {

this.actualTime = actualTime;

return (T) this;

}

public String getActualTopic() {

return actualTopic;

}

public T setActualTopic(String actualTopic) {

this.actualTopic = actualTopic;

return (T) this;

}

public Date getEventTime() {

return eventTime;

}

public T setEventTime(Date eventTime) {

this.eventTime = eventTime;

return (T) this;

}

}

消息对象继承DelayMessage,将消息发送到延迟Topic。

延迟服务消费端

接收延迟消息@KafkaListener(topics = {KafkaConstants.KAFKA_TOPIC_MESSAGE_DELAY}, containerFactory = "kafkaContainerFactory")

public boolean onMessage(String json) throws Throwable {

try {

DelayMessage delayMessage = deserialize(json, DelayMessage.class);

if (!isDelay(delayMessage)) {

// 如果接收到消息时,消息已经可以发送了,直接发送到实际的队列

sendActualTopic(delayMessage, json);

} else {

// 存储

localStorage(delayMessage, json);

}

} catch (Throwable e) {

log.error("consumer kafka delay message[{}] error!", json, e);

throw e;

}

return true;

}

private void sendActualTopic(DelayMessage delayMessage, String message) {

kafkaSender.send(message, delayMessage.getActualTopic());

}

@SneakyThrows

private void localStorage(DelayMessage delayMessage, String message) {

String key = generateRdbKey(delayMessage);

if (rocksDb.keyMayExist(RocksDbUtils.toByte(key), null)) {

return;

}

rocksDb.put(RocksDbUtils.toByte(key), RocksDbUtils.toByte(message));

}

private String generateRdbKey(DelayMessage delayMessage) {

return delayMessage.getActualTime().getTime() + RDB_KEY_SPLITTER + delayMessage.getEventId();

}

这里要注意生成key的方法:RocksDB是按key自然排序,迭代器遍历时是按key顺序遍历。

按时间来生成key,遍历时遇到第一个不符合的key,即可结束遍历。

key里加上消息ID,用以去重。

处理存储的延迟消息

启动定时任务(ScheduledExecutorService)定时检查消息。private void handleRdbMessage() {

try {

try (RocksIterator rocksIterator = rocksDb.newIterator()) {

for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) {

String key = "";

String value = "";

try {

byte[] keyByte = rocksIterator.key();

key = RocksDbUtils.toString(keyByte);

if (!isMessageExpired(key)) {

break;

}

value = RocksDbUtils.toString(rocksIterator.value());

DelayMessage delayMessage = JSON.parseObject(value, DelayMessage.class);

sendActualTopic(delayMessage, value);

rocksDb.delete(keyByte);

} catch (NumberFormatException e) {

// 异常key

log.error("handler kafka rocksdb delay message[{}:{}] NumberFormatException error!", key, value, e);

if (StringUtils.isNotBlank(key)) {

rocksDb.delete(RocksDbUtils.toByte(key));

}

} catch (Exception e) {

log.error("handler kafka rocksdb delay message[{}:{}] error!", key, value, e);

}

}

}

} catch (Exception e) {

// 捕获异常,否则ScheduledExecutorService会停止定时任务

log.error("handler kafka rocksdb delay message error!", e);

}

}

private boolean isMessageExpired(String rdbKey) {

long actualTime = Long.valueOf(rdbKey.split(RDB_KEY_SPLITTER)[0]);

return actualTime <= System.currentTimeMillis();

}

这里sendActualTopic和rocksDb.delete两个操作并不是原子性,但一般kafka消费端都会做防重复,所以也不会有问题。

其他

当前仅仅简易实现了延迟队列,还有很多需要完成完善的地方,比如:当前数据分散到不同的消费节点上,如果某一个节点服务器异常导致数据丢失,就只能人工介入,从kafka文件里获取数据;可通过部署不同的kafka group来达到数据备份,通过选主方式来决定哪一个group执行业务。

一条消息被存储三份:实际队列,延迟队列,RocksDB,可以通过操作kafka CommitLog的方式,让RocksDB里仅存储CommitLog offset 相关信息,减小RocksDB占用空间。

参考:

kafka消息消费有延迟_简易实现kafka延迟消息相关推荐

  1. kafka 重复消费和数据丢失_刨根问底,Kafka消息中间件到底会不会丢消息

    大型互联网公司一般都会要求消息传递最大限度的不丢失,比如用户服务给代金券服务发送一个消息,如果消息丢失会造成用户未收到应得的代金券,最终用户会投诉. 为避免上面类似情况的发生,除了做好补偿措施,更应该 ...

  2. 企业微信推送消息延迟_企业微信发送应用消息的实现

    企业号升级到企业微信后,发送应用消息的接口也变化了不少,除了原来的文本.图片.文件.语音.视频.图文消息等消息外,增加了文本卡片.markdown消息.小程序通知消息等内容,不过它们都可以共用一个接口 ...

  3. kafka中topic默认属性_分享:Kafka 的 Lag 计算误区及正确实现

    前言 消息堆积是消息中间件的一大特色,消息中间件的流量削峰.冗余存储等功能正是得益于消息中间件的消息堆积能力.然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的 ...

  4. 企业微信推送消息延迟_企业微信发送应用消息,员工无法接收到推送消息。

    请求消息体:[touser=18666211235,toparty=,totag=,agentid=1000040,msgtype=text,content=,media_id=,title=,des ...

  5. java 如何判定消息已在队列_【05期】消息队列中,如何保证消息的顺序性?

    本文选自:advanced-java 作者:yanglbme 问:如何保证消息的顺序性? 面试官心理分析 其实这个也是用 MQ 的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保 ...

  6. kafka处理流式数据_通过Apache Kafka集成流式传输大数据

    kafka处理流式数据 从实时过滤和处理大量数据,到将日志数据和度量数据记录到不同来源的集中处理程序中,Apache Kafka越来越多地集成到各种系统和解决方案中. 使用CData Sync ,可以 ...

  7. outlook收邮件延迟_如何计划或延迟在Outlook中发送电子邮件

    outlook收邮件延迟 When you click Send on an email, it is typically sent immediately. But what if you want ...

  8. 租用美国服务器 解决java延迟_如何降低美国服务器延迟?美国服务器延迟多少算正常?...

    美国服务器延迟多少算正常?很多人都说160ms到180ms.还有人说200ms到300ms.这些其实都没错,因为实际物理距离的问题,中国和美国都是东西跨度极大的国家.不同地区延迟上相差100ms都不为 ...

  9. mysql replication延迟_深入mysql主从复制延迟问题的详解

    面试mysqldba的时候遇到一个题: 描述msyql replication 机制的实现原理,如何在不停掉mysql主库的情况下,恢复数据不一致的slave的数据库节点? MySQL的复制(repl ...

最新文章

  1. (C++)寻找1-100以内所有素数,复杂度为O(nsqrt(n))与O(nloglogn)的两种方法
  2. 基于卷积神经网络(CNN)的仙人掌图像分类
  3. 霍山职业学校16届计算机学生,霍山职高(安徽霍山职业学校)
  4. 周志华:“数据、算法、算力”人工智能三要素,在未来要加上“知识”| CCF-GAIR 2020...
  5. 【重大更新】DevExpress v17.1新版亮点(DevExtreme HTML5/JS篇)
  6. 从程序员到架构师的最佳技术成长之路
  7. Oracle分析函数FIRST_VALUE、LAST_VALUE
  8. 算法--组合数学:杨辉三角数学分析以及Java实现
  9. latex 波浪线_湖熟镇月牙刀波浪刀带哪家好厂家
  10. zsh 隐藏用户名和主机
  11. linux 配置path
  12. 2020华为软件精英挑战赛-有向图找环
  13. 怎样通过java用web3j查询以太坊交易信息?
  14. Linux——常用文件管理命令(必会)
  15. jeesit 可以用俩种导出
  16. 数据外推算法 c语言,[原创]如何进行内插法和外推法的计算
  17. MySQL: GTID简介,gtid_executed和gtid_purged概念
  18. AliCloud Duplicity
  19. 第2节Socket介绍
  20. 紫米10000mAh智能移动电源APP功能分析报告

热门文章

  1. 输入法(IME)实现原理
  2. lol服务器维护8.21,lol8.21版本更新了什么 lol8.21版本更新内容一览
  3. itextpdf 怎么下划线_java – 带有粗体和下划线的Itext新字体
  4. ae怎么设置gpu渲染_AE/PR mac版如何开启GPU加速渲染?
  5. 外国人眼中最好的五个第三方 DNS 服务器
  6. 读过的laravel文章
  7. 【观察】 2016年度中国企业级市场十大新闻
  8. dhcp服务器显示未连接,提示本地连接未启用dhcp怎么办?本地连接未启用dhcp的解决方法...
  9. 关于日期天数计算的函数(转)
  10. 电子商务系统怎么开发,有哪些流程_OctShop