一. 延时消息

延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

使用延时消息的典型场景,例如:

在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。

在电商系统中,用户七天内没有评价商品,则默认好评。

这些场景对应的解决方案,包括:

轮询遍历数据库记录

JDK 的 DelayQueue

ScheduledExecutorService

基于 Quartz 的定时任务

基于 Redis 的 zset 实现延时队列。

除此之外,还可以使用消息队列来实现延时消息,例如 RocketMQ。

二. RocketMQ

RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是2012年阿里巴巴开源的第三代分布式消息中间件。

三. RocketMQ 实现延时消息

3.1 业务背景

我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功;当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。

当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。

例如:1小时后尝试推送、3小时后尝试推送、1天后尝试推送、3天后尝试推送等等。因此,考虑使用延时消息实现该功能。

3.2 生产者(Producer)

生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。

首先,定义一个支持延时发送的 AbstractProducer。

abstract class AbstractProducer :ProducerBean() {

var producerId: String? = null

var topic: String? = null

var tag: String?=null

var timeoutMillis: Int? = null

var delaySendTimeMills: Long? = null

val log = LogFactory.getLog(this.javaClass)

open fun sendMessage(messageBody: Any, tag: String) {

val msgBody = JSON.toJSONString(messageBody)

val message = Message(topic, tag, msgBody.toByteArray())

if (delaySendTimeMills != null) {

val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!

message.startDeliverTime = startDeliverTime

log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")

}

val logMessageId = buildLogMessageId(message)

try {

val sendResult = send(message)

log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)

} catch (e: Exception) {

log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)

}

}

fun buildLogMessageId(message: Message): String {

return "topic: " + message.topic + "\n" +

"producer: " + producerId + "\n" +

"tag: " + message.tag + "\n" +

"key: " + message.key + "\n"

}

}

根据业务需要,增加一个支持重试机制的 Producer

@Component

@ConfigurationProperties("mqs.ons.producers.xxx-producer")

@Configuration

@Data

class CleanReportPushEventProducer :AbstractProducer() {

lateinit var delaySecondList:List

fun sendMessage(messageBody: CleanReportPushEventMessage){

//重试超过次数之后不再发事件

if (delaySecondList!=null) {

if(messageBody.times>=delaySecondList.size){

return

}

val msgBody = JSON.toJSONString(messageBody)

val message = Message(topic, tag, msgBody.toByteArray())

val delayTimeMills = delaySecondList[messageBody.times]*1000L

message.startDeliverTime = System.currentTimeMillis() + delayTimeMills

log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )

val logMessageId = buildLogMessageId(message)

try {

val sendResult = send(message)

log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)

} catch (e: Exception) {

log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)

}

}

}

}

在 CleanReportPushEventProducer 中,超过了重试的次数就不会再发送消息了。

每一次延时消息的时间也会不同,因此需要根据重试的次数来获取这个delayTimeMills 。

通过 System.currentTimeMillis() + delayTimeMills 可以设置 message 的 startDeliverTime。然后调用 send(message) 即可发送延时消息。

我们使用商用版的 RocketMQ,因此支持精度为秒级别的延迟消息。在开源版本中,RocketMQ 只支持18个特定级别的延迟消息。:(

3.3 消费者(Consumer)

消费者负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。

定义 Push 类型的 AbstractConsumer:

@Data

abstract class AbstractConsumer ():MessageListener{

var consumerId: String? = null

lateinit var subscribeOptions: List

var threadNums: Int? = null

val log = LogFactory.getLog(this.javaClass)

override fun consume(message: Message, context: ConsumeContext): Action {

val logMessageId = buildLogMessageId(message)

val body = String(message.body)

try {

log.info(logMessageId + " body: " + body)

val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))

log.info(logMessageId + " result: " + result.name)

return result

} catch (e: Exception) {

if (message.reconsumeTimes >= 3) {

log.error(logMessageId + " error: " + e.message, e)

}

return Action.ReconsumeLater

}

}

abstract fun getMessageBodyType(tag: String): Type?

abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action

protected fun buildLogMessageId(message: Message): String {

return "topic: " + message.topic + "\n" +

"consumer: " + consumerId + "\n" +

"tag: " + message.tag + "\n" +

"key: " + message.key + "\n" +

"MsgId:" + message.msgID + "\n" +

"BornTimestamp" + message.bornTimestamp + "\n" +

"StartDeliverTime:" + message.startDeliverTime + "\n" +

"ReconsumeTimes:" + message.reconsumeTimes + "\n"

}

}

再定义具体的消费者,并且在消费失败之后能够再发送一次消息。

@Configuration

@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")

@Data

class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {

val logger: Logger = LoggerFactory.getLogger(this.javaClass)

override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {

if(obj is CleanReportPushEventMessage){

//清除事件

logger.info("consumer clean-report event report_id:${obj.id} ")

//消费失败之后再发送一次消息

if(!cleanReportService.sendCleanReportEvent(obj.id)){

val times = obj.times+1

eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))

}

}

return Action.CommitMessage

}

override fun getMessageBodyType(tag: String): Type? {

return CleanReportPushEventMessage::class.java

}

}

其中,cleanReportService 的 sendCleanReportEvent() 会通过 http 的方式调用业务方提供的接口,进行事件消息的推送。如果推送失败了,则会进行下一次的推送。(这里使用了 eventProducer 的 sendMessage() 方法再次投递消息,是因为要根据调用的http接口返回的内容来判断消息是否发送成功。)

最后,定义 ConsumerFactory

@Component

class ConsumerFactory(val consumers: List,val aliyunOnsOptions: AliyunOnsOptions) {

val logger: Logger = LoggerFactory.getLogger(this.javaClass)

@PostConstruct

fun start() {

CompletableFuture.runAsync{

consumers.stream().forEach {

val properties = buildProperties(it.consumerId!!, it.threadNums)

val consumer = ONSFactory.createConsumer(properties)

if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) {

for (options in it.subscribeOptions!!) {

consumer.subscribe(options.topic, options.tag, it)

}

consumer.start()

val message = "\n".plus(

it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}

.collect(Collectors.toList()))

logger.info(String.format("consumer: %s\n", message))

}

}

}

}

private fun buildProperties(consumerId: String,threadNums: Int?): Properties {

val properties = Properties()

properties.put(PropertyKeyConst.ConsumerId, consumerId)

properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)

properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)

if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {

properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)

} else {

// 测试环境接入RocketMQ

properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)

}

properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)

return properties

}

}

四. 总结

正如本文开头曾介绍过,可以使用多种方式来实现延时消息。然而,我们的系统本身就大量使用了 RocketMQ,借助成熟的 RocketMQ 实现延时消息不失为一种可靠而又方便的方式。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

rocktmq 消息延时清空_使用Kotlin+RocketMQ实现延时消息的示例代码相关推荐

  1. ROS系列——mavros功能包中常用话题和服务介绍,包括消息名称、类型、头文件、成员变量、示例代码

    ROS系列--mavros功能包中常用话题和服务介绍,包括消息名称.类型.头文件.成员变量.示例代码 官方链接 常用话题 订阅 1.1 系统状态 1.2 GPS数据 1.3 本地位置 1.4 三轴速度 ...

  2. socket未读消息 如何设计_如何设计IM系统的消息架构?

    导读:IM全称是『Instant Messaging』,中文名是即时通讯.在高度信息化的移动互联网时代,生活中IM类产品已经成为必备品,像"钉钉"以IM为核心功能的产品.还有一些非 ...

  3. python发送消息到微信_通过python登录微信发送消息

    通过扫描二维码登录后发送信息,主要应用到了wxpy库,具体的看代码 from threading import Timer from wxpy import * import requests # # ...

  4. 人听到坏消息的反应_形容突然间听到坏消息的成语有哪些?

    晴天霹雳.惊闻噩耗.当头棒喝.天塌地陷.兵连祸结.祸从天降.大难临头.飞来横祸.意外之灾 1.[ dāng tóu bàng hè ]佛教禅宗和尚接待初学的人常常用棒一击或大喝一声,促他醒悟.比喻严厉 ...

  5. java贪心算法 区间调度_贪心算法-区间调度问题解之证明(示例代码)

    一.贪心算法 定义:一个算法是贪心算法,如果它是通过一些小的步骤来一个求解,并且在每一步根据局部情况选择一个决定,使得某些主要的指标得到优化. 二.区间调度问题 1. 问题:我们有一组需求{1,2,3 ...

  6. 支持向量机python代码_用TensorFlow实现多类支持向量机的示例代码

    这篇文章主要介绍了用TensorFlow实现多类支持向量机的示例代码,现在分享给大家,也给大家做个参考.一起过来看看吧 本文将详细展示一个多类支持向量机分类器训练iris数据集来分类三种花. SVM算 ...

  7. 快递信息css3手风琴代码_用纯CSS实现手风琴效果的示例代码

    昨天在做一个旅游页面的项目,前端页面实现的过程中遇到这样一个需求.需要把一组图片形成手风琴的展示效果.认真的思考一遍后,决定就用普通的HTML+CSS就可以实现这个需求.今天趁着空闲时间稍微梳理了一下 ...

  8. python编辑代码的页面_使用CodeMirror实现Python3在线编辑器的示例代码

    一.编写页面 主要是引入相关的css文件和js文件,这里采用简单插入link和script标签的形式. Document click 二.配置CodeMirror 在index.js中配置CodeMi ...

  9. java实现登陆面试题_【Javaweb】笔面试题 ---(1)(示例代码)

    Javaweb 面试题:理解才是最重要的,而不是原封不动的背下来 一.请简述doget和dopost它们的区别 1) get是从服务器上获取数据,post是向服务器传送数据. 2) 在客户端,Get方 ...

最新文章

  1. LeetCode简单题之构造矩形
  2. 网络流最大流 Dinic算法模板
  3. 推荐一个Android Studio很实用的插件android-butterknife-zelezny
  4. Unknown SSL protocol error in connection to xxx:443
  5. web前端细解cookie那些事
  6. Cache related website
  7. preact源码学习(3)
  8. Python入门--多态
  9. Klevgrand Tines for Mac(电钢琴模拟插件)
  10. 隐式类型转换 与 隐式类型转换操作符
  11. 驱动开发遇到version magic不匹配
  12. 安全检查计算机,计算机可以通过安全检查机吗?
  13. Project Sumatra
  14. 少室山论道——天下武功
  15. Mac电脑访问不了正常URL
  16. 00截断上传绕过_上传绕过总结
  17. 传统防火墙与Web应用程序防火墙(WAF)的区别
  18. 什么叫「人的格局」?是否有必要培…
  19. 易语言超级列表框表项关键字搜索代码示例_易语言
  20. K8S之kubelet介绍

热门文章

  1. 精益数据分析(学习笔记)——长期更新
  2. nanopore测序技术专题(六):测序错误率太高无法使用?
  3. 数字图像处理笔记一 - 图像采集(空间分辨率和幅度分辨率)
  4. 延时函数介绍和呼吸灯的实现
  5. 通讯接口:I2C和USART,SPI,CAN,USB2.0
  6. java slfj教程_SLF4J入门程序
  7. c++ char4个字节_西门子PLC的TCP通讯(不同项目下)①--TSEND_C指令
  8. 安卓设置菊花动画_Android Progressbar自定义菊花效果
  9. CLion使用WSL的cmake报错解决: configure_file Problem configuring file Call Stack (most recent call first):
  10. LeetCode 1027. Longest Arithmetic Sequence--笔试题--C++解法