使用RabbitMQ实现延迟任务

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

上述类似的需求是我们经常会遇见的问题。最常用的方法是定期轮训数据库,设置状态。在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源。当面对千万级、上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了。除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式。但如果系统的架构中本身就有RabbitMQ的话,那么选择RabbitMQ来实现类似的功能也是一种选择。

使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求。

消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。

Dead Letter Exchanges

Exchage的概念在这里就不在赘述,可以从这里进行了解。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

2. 上面的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

实现延迟队列

延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。

生产者输出消息到Queue1,并且这个消息是设置有有效时间的,比如60s。消息会在Queue1中等待60s,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到,处理延迟任务。

具体实现步骤如下:

第一步, 首先需要创建2个队列。Queue1和Queue2。Queue1是一个消息缓冲队列,在这个队列里面实现消息的过期转发。如下图,设置Dead letter exchange和Dead letter routing key。设置这两个属性就是当消息在这个队列中expire后,采用哪个路由发送。这个dlx的exchange需要事先创建好,就是一个普通的exchange。由于我们还需要向Queue1发送消息,那么还需要创建一个exchange,并且和Queue1绑定。例子中,exchange同样取名:queue1。

我们还需要建一个Queue2,这个队列用于消息在Queue1中过期后转发的目标队列。所以这个Queue2队列建好以后,需要绑定Queue1设置的死信路由:dlx。完成Queue2的绑定以后,环境就搭建完成了。

第二步,实现消息的Producer。由于我们的目的是让进入Queue1的消息过期,然后自动转送到Queue2中,所以发送的时候,需要设置过期时间。

ConnectionFactory factory = new ConnectionFactory();factory.setUsername("bsp");factory.setPassword("123456");factory.setVirtualHost("/");factory.setHost("10.23.22.42");factory.setPort(5672);conn = factory.newConnection();channel = conn.createChannel();byte[] messageBodyBytes = "Hello, world!".getBytes();byte i = 10;while (i-- > 0) {                channel.basicPublish("queue1", "queue1", new AMQP.BasicProperties.Builder().expiration(String.valueOf(i * 1000)).build(),new byte[] { i });}

上面的代码我模拟了1-10号消息,消息的内容里面是1-10。过期的时间是10-1秒。这里要注意,虽然10是第一个发送,但是它过期的时间最长。

第三步,实现消息的Consumer。Consumer就是延迟任务的具体实施者。由于具体的任务往往是一个比较耗时的任务,所以一般来说,任务一般在异步线程中执行。

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("bsp");
factory.setPassword("123456");
factory.setVirtualHost("/");
factory.setHost("10.23.22.42");
factory.setPort(5672);conn = factory.newConnection();
channel = conn.createChannel();channel.basicConsume("queue2", true, "consumer", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {long deliveryTag = envelope.getDeliveryTag();//do some work async
System.out.println(body[0]);
}
});

运行后如上面的程序,过了10s以后,消费者开始收到数据,但是它是一次性收到如下结果:

10、9 、8 、7 、6、5 、4 、3 、2 、1

Consumer第一个收到的还是10。虽然10是第一个放进队列,但是它的过期时间最长。所以由此可见,即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。参考官方文档发现“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。

所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

使用RabbitMQ实现延迟任务相关推荐

  1. RabbitMQ延迟消费和重复消费

    转载自 https://blog.csdn.net/quliuwuyiz/article/details/79301054 使用RabbitMQ实现延迟任务 场景一:物联网系统经常会遇到向终端下发命令 ...

  2. rabbitmq延迟队列相关

    https://blog.csdn.net/qq_26656329/article/details/77891793        --------------rabbitmq queue_decla ...

  3. SpringCloud微服务项目实战 - 6.延迟任务

    我没有失约,我与春风共至,我与夏蝉共鸣,我与秋叶共舞,我与凛冬共至,唯独你背道而行! 系列文章目录 项目搭建 App登录及网关 App文章 自媒体平台(博主后台) 内容审核(自动) 延迟任务 - 精准 ...

  4. 黑*头条_第5章_延迟任务精准发布文章(新版)

    黑*头条_第5章_延迟任务精准发布文章(新版) 文章目录 黑*头条_第5章_延迟任务精准发布文章(新版) 1)文章定时发布 2)延迟任务概述 2.1)什么是延迟任务 2.2)技术对比 2.2.1)De ...

  5. 基于rabbitmq延迟插件实现分布式延迟任务

    一.延迟任务的使用场景 1.下单成功,30分钟未支付.支付超时,自动取消订单 2.订单签收,签收后7天未进行评价.订单超时未评价,系统默认好评 3.下单成功,商家5分钟未接单,订单取消 4.配送超时, ...

  6. RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器

    本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记. RabbitMQ 中重要概念 1. 生产者 生产者(producer)创建消息,然后发送到代理 ...

  7. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?

    以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...

  8. C#实现rabbitmq 延迟队列功能

    最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭 ...

  9. Java如何解决mysql读写延迟_java中延迟任务的处理方式

    1.利用延迟队列 延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到-- 应用场景比较多, ...

最新文章

  1. 【数据结构】邻接表的储存结构 建立图的邻接表算法
  2. 【转】AVAudioPlayer播放音乐,最清晰明了
  3. 六:Cocos2d-x的CCLayer
  4. hdu 1568 Fibonacci 对数。。
  5. 配置编译win7+VS2017+opencv4.0.1+contrib4.0.1
  6. 灵动标签调用友情链接
  7. tcp的发送端一个小包就能打破对端的delay_ack么?
  8. Sql Server发布订阅如何添加新表如何不初始化整个快照
  9. ios html 全选文本框,【前端】IOS input输入框按删除键删除字符,删除最后一个字符时,概率性出现光标前面多余一个字符...
  10. 学python要多久-目前Python学习需要多长时间?老男孩Python入门培训
  11. 如何直观的长时间统计Android应用的动态内存消耗
  12. 一个简单的数字幸运抽奖小程序
  13. 系统地编译Hi3519过程及其处理问题思路
  14. svn server 搭建
  15. 联想yoga13装win7步骤介绍
  16. 三大通信协议(1)UART
  17. cisco设备的mib库
  18. 《麻省理工学院公开课:人工智能》笔记二
  19. 出现了一些错误,请尝试重启Geforce Experience
  20. 企业邮箱怎么写加密邮件,企业邮箱支持吗?

热门文章

  1. Java 编码最容易疏忽的 10 大问题!
  2. 如何在支付宝成为增加个人服务器,支付宝支付,服务器如何生成支付订单
  3. php如何模拟网页点击按钮,python模拟点击网页按钮如何实现 python模拟点击网页按钮实现方法...
  4. 适合0基础的web开发系列教程-canvas
  5. 微信小程序黑客马拉松即将开始,来做最酷的 Mini Program Creators! 1
  6. HDU 5752 Sqrt Bo【枚举,大水题】
  7. 炼数成金hadoop视频干货03
  8. 不允许更改采购订单币种
  9. protel四层板及内电层分割入门
  10. WPF 可触摸移动的ScrollViewer控件