应用场景

目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:

  • 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。
  • 12306 购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30 分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。

在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:

  • 使用 redis 给订单设置过期时间,最后通过判断 redis 中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道 redis 都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。
  • 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。
  • 使用 jvm 原生的 DelayQueue ,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。

消息延迟推送的实现

首先我们创建交换机和消息队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class MQConfig {public static final String LAZY_EXCHANGE = "Ex.LazyExchange";public static final String LAZY_QUEUE = "MQ.LazyQueue";public static final String LAZY_KEY = "lazy.#";@Beanpublic TopicExchange lazyExchange(){//Map<String, Object> pros = new HashMap<>();//设置交换机支持延迟消息推送//pros.put("x-delayed-message", "topic");TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);exchange.setDelayed(true);return exchange;}@Beanpublic Queue lazyQueue(){return new Queue(LAZY_QUEUE, true);}@Beanpublic Binding lazyBinding(){return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);}
}复制代码

我们在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。

 //Map<String, Object> pros = new HashMap<>();//设置交换机支持延迟消息推送//pros.put("x-delayed-message", "topic");TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);复制代码

发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延迟时间。

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class MQSender {@Autowiredprivate RabbitTemplate rabbitTemplate;//confirmCallback returnCallback 代码省略,请参照上一篇public void sendLazy(Object message){rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id + 时间戳 全局唯一CorrelationData correlationData = new CorrelationData("12345678909"+new Date());//发送消息时指定 header 延迟时间rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//message.getMessageProperties().setHeader("x-delay", "6000");message.getMessageProperties().setDelay(6000);return message;}}, correlationData);}
}复制代码

我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 x-delay。等同于我们手动设置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/*** Set the x-delay header.* @param delay the delay.* @since 1.6*/
public void setDelay(Integer delay) {if (delay == null || delay < 0) {this.headers.remove(X_DELAY);}else {this.headers.put(X_DELAY, delay);}
}复制代码

消费端进行消费

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;@Component
public class MQReceiver {@RabbitListener(queues = "MQ.LazyQueue")@RabbitHandlerpublic void onLazyMessage(Message msg, Channel channel) throws IOException{long deliveryTag = msg.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, true);System.out.println("lazy receive " + new String(msg.getBody()));}复制代码

测试结果

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {@Autowiredprivate MQSender mqSender;@Testpublic void sendLazy() throws Exception {String msg = "hello spring boot";mqSender.sendLazy(msg + ":");}
}复制代码

果然在 6 秒后收到了消息 lazy receive hello spring boot:

Java学习、面试;文档、视频资源免费获取

转载于:https://juejin.im/post/5cf7c2cd51882537465f29bf

RabbitMQ 延迟队列,消息延迟推送相关推荐

  1. ASP.NET Core2基于RabbitMQ对Web前端实现推送功能

    在我们很多的Web应用中会遇到需要从后端将指定的数据或消息实时推送到前端,通常的做法是前端写个脚本定时到后端获取,或者借助WebSocket技术实现前后端实时通讯.因定时刷新的方法弊端很多(已不再采用 ...

  2. php消息实时推送技术,基于HTTP协议之WEB消息实时推送技术原理及实现

    很早就想写一些关于网页消息实时推送技术方面的文章,但是由于最近实在忙,没有时间去写文章.本文主要讲解基于 HTTP1.1 协议的 WEB 推送的技术原理及实现.本人曾经在工作的时候也有做过一些用到网页 ...

  3. 消息推送服务器推pc,PC浏览器消息实时推送的解决方案 ——EPush推送平台

    原标题:PC浏览器消息实时推送的解决方案 --EPush推送平台 陈华 研发工程师,2014入职去哪儿网.参与研发的EPush推送平台,增强了订单推送的时效性,提高了酒店自助订单处理率.最近负责CEQ ...

  4. 快递企业如何完成运单订阅消息的推送

    经常网购的朋友,会实时收到运单状态的提醒信息,这些提醒信息包括微信推送,短信推送,邮件推送,支付宝生活窗推送,QQ推送等,信息内容主要包括快件到哪里,签收等信息的提醒,这些友好的提醒信息会极大的增强购 ...

  5. 突破微信小程序模板消息的推送限制

    "模版消息"是小程序非常重要且可主动触达用户的一种能力.爱鲜蜂小程序通过"模版消息",建立一套用户唤醒机制,达到提升用户复购率的目的.小打卡小程序的近30天访问 ...

  6. netty服务器定时发送消息,netty+websocket+quartz实现消息定时推送

    netty+websocket+quartz实现消息定时推送&&IM聊天室 在讲功能实现之前,我们先来捋一下底层的原理,后面附上工程结构及代码 1.NIO NIO主要包含三大核心部分: ...

  7. Swift - 本地消息的推送通知(附样例)

    使用UILocalNotification可以很方便的实现消息的推送功能.我们可以设置这个消息的推送时间,推送内容等. 当推送时间一到,不管用户在桌面还是其他应用中,屏幕上方会都显示出推送消息. 1, ...

  8. java消息推送怎么实现_PHP实现的消息实时推送功能

    本文实例讲述了PHP实现的消息实时推送功能.分享给大家供大家参考,具体如下: 入口文件index.html <!DOCTYPE HTML> <html> <head> ...

  9. 微信公众号(测试号)消息模板推送

    微信公众号(测试号)消息模板推送 源码地址 https://github.com/panjianlong13/Weixin-PushMessage 微信测试号配置 登录到微信公众平台接口测试账号申请U ...

  10. spring boot 集成 websocket 实现消息主动推送

    前言 http协议是无状态协议,每次请求都不知道前面发生了什么,而且只可以由浏览器端请求服务器端,而不能由服务器去主动通知浏览器端,是单向的,在很多场景就不适合,比如实时的推送,消息通知或者股票等信息 ...

最新文章

  1. 4.ClassLink - 一种新型的VPC 经典网络的连接方式
  2. Django Models一对多操作
  3. 面试经历—广州YY(欢聚时代)
  4. js组装知识(待续……)
  5. python是干嘛的-python语言是干什么的
  6. WIBU-KEY加密狗驱动软件使用说明
  7. Jenkins高级篇之Pipeline方法篇-Pipeline Basic Steps-6-写文件writeFile和git SCM
  8. 使用photoshop批量处理大量照片(1000张以上)方法介绍
  9. node2vec简单总结
  10. Fedora 9 Samba 配置
  11. 基于VGG16主干模型的segnet语义分割详解及实例
  12. Zabbix监控MySQL工具
  13. Storm启动报错Internal Server Error
  14. 认识QA, 游戏测试工程师究竟是做什么的?
  15. Windows7安装教程
  16. 我是一个几乎没有计算机知识的人,怎样入门计算机?
  17. 可恢复保险丝特性测试
  18. 5步教你成功求职进入BAT
  19. idea配置maven并用maven打包
  20. 【预训练视觉-语言模型文献阅读】VL-BERT: PRE-TRAINING OF GENERIC VISUAL- LINGUISTIC REPRESENTATIONS(ICLR 2020)

热门文章

  1. python做abaqus后处理_python进行abaqus后处理的二次开发,我想提取odb文件中的nodeSet,结果总提示Keyerror,请各位大神指教。...
  2. 螺钉装弹垫平垫机器人_【经验总结】什么时候用平垫,什么时候用弹垫?
  3. java基础之java内存模型
  4. Deep Learning Toolkits 的比较(转)
  5. JavaScript异步
  6. Fortran执行语句中的“双冒号” ::
  7. iOS开发UI篇—字典转模型
  8. mysql left join join right
  9. highcharts图表高级入门之polar:极地图的基本配置以及一些关键配置说明
  10. vim下更省心地用中文