对于消息中间件RabbitMQ,想必各位小伙伴并不陌生,其广泛应用程度不言而喻,此前我们也在许多课程以及诸多专栏文章中介绍了它的应用,其应用场景也是相当广泛的,像什么消息异步通信、服务模块解耦、高并发流量削峰、订单超时未支付自动失效等等都是实际项目中最为常见的场景。本文我们将重点介绍并实现RabbitMQ的死信与延时队列,并将两者做一个简单的对比!

内容

对于RabbitMQ的死信队列,此前我们在"Java秒杀系统"这一技术专栏中已经有重点介绍过了,在那里我们是将其应用于 "订单超时未支付自动失效"这一业务场景中,简而言之,"死信队列"是一种特殊的"队列",跟普通的队列相比,具有"延迟处理任务"的特性。

而在消息中间件RabbitMQ的架构组件中,也存在着跟"死信队列"在功能特性方面几乎相同的组件,那就是"延迟队列/延时队列",同样也具有"延迟、延时处理任务"的功效!

当然啦,这两者还是有一丢丢区别的,最直观的当然是名字上啦,从名字上你就可以看出来两者的"处事风格"是不一样的,具体体现在:

一、创建上的差异:

(1)RabbitMQ的死信队列DeadQueue是由"死信交换机DLX"+"死信路由DLK"组成的,当然,可能还会有"TTL",而DLX和DLK又可以绑定指向真正的队列RealQueue,这个队列RealQueue便是"消费者"真正监听的对象.

(2)而RabbitMQ的延迟/延时队列DelayedQueue 则是由普通的队列来创建即可,唯一不同的地方在于其绑定的交换机为自定义的交换机,即"CustomExchange",在创建该交换机时只需要指定其消息的类型为 "x-delayed-message"即可."消费者"真正监听的队列也是它本人,即DelayedQueue

画外音:从这一点上看,延迟/延时队列的创建相对而言简单一些!

二、功能特性上的差异:

(1)死信队列在实际应用时虽然可以实现"延时、延迟处理任务"的功效,但进入死信中的消息却依然保留了队列的特性,即"FIFO" ~ 先进先出,而不管先后进入队列中消息的TTL的值. 即假设先后进入死信的消息为A、B、C,各自的TTL分别为:10s、3s、5s,理论上TTL先后到达的顺序是:B、C、A,然后从死信出来,最终被路由到真正的队列中,即消息被消费的先后顺序应该为:B、C、A,然而现实却是残酷的,其最终消费的消息的顺序为:A、B、C,即"消息是怎么进去的,就怎么出来",保留了所谓的FIFO特性.

(2)或许是因为死信有这种缺陷,所以RabbitMQ提供了另一种组件,即"延迟队列",它可以很完美的解决上面死信出现的问题,即最终消费的消息的顺序为:B、C、A,我们将在下面用实际的代码进行实战实现与演练.

三、插件安装上的差异:

(1)死信不需要额外的插件

(2)但是延迟队列在实际项目使用时却需要在Mq Server中安装一个插件,它的名字叫做:"rabbitmq_delayed_message_exchange",其安装过程可以参考链接: 里面就提供了Windows环境和Linux环境下的插件的安装过程(很简单,只需要不到3步的步骤.)

四、代码的实战实现~RabbitMQ的死信队列

说了这么多,想必有些小伙伴有点不耐烦了,下面我将采用实际的代码对上面所介绍的几点区别进行实现与演练(代码都是基于Spring Boot2.0搭建的项目环境实现与测试的)

(1)首先,我们需要创建死信队列以及真正的队列,并实现相关的绑定:

//构建订单超时未支付的死信队列消息模型    @Bean    public Queue successKillDeadQueue(){        Map argsMap= Maps.newHashMap();        argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));        argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);    }    //基本交换机    @Bean    public TopicExchange successKillDeadProdExchange(){        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);    }    //创建基本交换机+基本路由 -> 死信队列 的绑定    @Bean    public Binding successKillDeadProdBinding(){        return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));    }    //真正的队列    @Bean    public Queue successKillRealQueue(){        return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);    }    //死信交换机    @Bean    public TopicExchange successKillDeadExchange(){        return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);    }    //死信交换机+死信路由->真正队列 的绑定    @Bean    public Binding successKillDeadBinding(){        return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));    }

(2)将项目运行起来,登录RabbitMQ的后端控制台,可以看到成功创建了相应的死信队列和真正的队列等组件,如下图所示:

(3)紧接着,我们在Controller中建立一个请求方法,用于接收前端请求过来的消息,并将该消息附以TTL值,塞入死信队列中,如下所示:

//死信队列-生产者    @RequestMapping(value = "dead/msg/send",method = RequestMethod.GET)    @ResponseBody    public BaseResponse sendDQMsg(@RequestParam String msg,@RequestParam Long ttl){        BaseResponse response=new BaseResponse(StatusCode.Success);        try {            Message realMsg=MessageBuilder.withBody(msg.getBytes("UTF-8")).build();            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());            rabbitTemplate.convertAndSend(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"), env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"), realMsg, message -> {                MessageProperties mp=message.getMessageProperties();                mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);                //TODO:动态设置TTL                mp.setExpiration(String.valueOf(ttl));                log.info("死信队列生产者-发出消息:{} TTL:{}",msg,ttl);                return message;            });        }catch (Exception e){            response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());        }        return response;    }

(4)最后是写一个Spring Bean类充当消费者,在其中监听"实际队列"的消息:

@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")    public void consumeExpireOrder(@Payload byte[] msg){        try {            log.info("死信队列-监听者-接收消息:{}",new String(msg,"UTF-8"));        }catch (Exception e){            log.error("死信队列-监听者-发生异常:",e.fillInStackTrace());        }    }

最后,我们进入测试环节,打开Postman,前后输入3次不同的请求信息,其中各自的TTL也不尽相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最终在Console控制台等待,你会发现消费者监听的消息的顺序为:A、B、C,而不是C、B、A,如下图所示:

五、代码的实战实现~RabbitMQ的延迟/延时队列

很明显,由于死信存在的这个缺陷,故而其在上面的应用场景中是不太适用的!即死信队列在 消息的TTL不一致,且后入死信的消息TTL小于前入的消息TTL的应用场景中是不适用的,而像"订单超时未支付"的应用场景,因为大家都一样,都是固定的30min或者 1h,故而这种场景,死信是相当适合的。

因此,为了解决实际项目中"TTL不一致且不固定"的应用场景,我们需要搬上"延迟/延时队列"(当然啦,Redisson的延迟/延迟队列也是可以实现的!),下面我们用代码加以实现!

(1)首先是创建"延迟/延时队列"等相关的组件,如下所示;

//TODO:RabbitMQ延迟队列    @Bean    public Queue delayQueue(){        return QueueBuilder.durable(env.getProperty("mq.kill.delay.queue")).build();    }    @Bean    public CustomExchange delayExchange(){        Map map=Maps.newHashMap();        map.put("x-delayed-type","direct");        return new CustomExchange(env.getProperty("mq.kill.delay.exchange"),"x-delayed-message",true,false,map);    }    @Bean    public Binding delayBinding(){        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(env.getProperty("mq.kill.delay.routingKey")).noargs();    }

(2)其生产者发送消息的代码我们仍然是放在一个Controller的请求方法中,如下所示:

//延迟队列-生产者    @RequestMapping(value = "delay/msg/send",method = RequestMethod.GET)    @ResponseBody    public BaseResponse sendDelayMsg(@RequestParam String msg,@RequestParam Long ttl){        BaseResponse response=new BaseResponse(StatusCode.Success);        try {            String info=msg;            Message realMsg=MessageBuilder.withBody(info.getBytes("UTF-8")).build();            rabbitTemplate.convertAndSend(env.getProperty("mq.kill.delay.exchange"),env.getProperty("mq.kill.delay.routingKey"),                    realMsg, new MessagePostProcessor() {                @Override                public Message postProcessMessage(Message message) throws AmqpException {                    MessageProperties mp=message.getMessageProperties();                    mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);                    mp.setHeader("x-delay",ttl);                    log.info("延迟队列生产者-发出消息:{} TTL:{}",msg,ttl);                    return message;                }            });        }catch (Exception e){            response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());        }        return response;    }

(3)最后是用于监听延迟队列中消息的消费者的代码,如下所示:

/** * 延时队列-消息监听器-消费者 * @Author:debug (SteadyJack) * @Link: weixin-> debug0868 qq-> 1948831260**/@Componentpublic class DelayQueueMqListener {    private static final Logger log= LoggerFactory.getLogger(DelayQueueMqListener.class);    //消息监听    @RabbitListener(queues = {"${mq.kill.delay.queue}"})    public void consumeMsg(@Payload byte[] msg){        try {            String info=new String(msg,"UTF-8");            log.info("延时队列监听到消息:{}  ",info);        }catch (Exception e){            log.error("延时队列-消息监听器-消费者-消息监听-发生异常:",e.fillInStackTrace());        }    }}

(4)将项目跑起来,可以看到RabbitMQ的后端控制台已经建立了该队列,如下图所示:

(5)最后,我们打开postman,前后输入3次不同的请求信息,其中各自的TTL也不尽相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最终在Console控制台等待,你会发现消费者监听的消息的顺序为:A、B、C,而不是C、B、A,如下图所示:

从该运行结果上看,会发现这才是我们真正想要的结果,即按照时间TTL的大小来决定消息被消费的先后顺序,而且,你可以看出消费时的时间跟发出的时间刚好差 TTL !

在文章的最后的,我们简单总结一下本文所讲的内容,即主要介绍、对比并实战了RabbitMQ中两款具有"延时、延迟处理任务"功效的组件,即"死信队列"和"延迟队列",其差异性主要体现在:创建上的不同、功能特性的不同、插件安装上的不同等方面。

总体来说,如果是想追求消息传输的稳定性、可靠性且TTL是固定的话,那么建议选择"死信队列",因为消息从一开始就在队列中待着,等到TTL一到才被路由到真正的队列!而"延迟队列"则不同,即发送出去的消息需要等待 TTL 的时间才进入"延迟队列",如果在等待的期间,Mq Server 宕机了,那很可能消息就丢失了…..

课程观看: https://www.ixigua.com/i6806173584910713356/

c++ 队列_RabbitMQ的死信与延迟队列,你真的会用吗?相关推荐

  1. win server 缓冲区队列不足_有赞延迟队列设计

    延迟队列,顾名思义它是一种带有延迟功能的消息队列. 那么,是在什么场景下我才需要这样的队列呢? 背景 我们先看看以下业务场景: 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存? 如何定期检 ...

  2. Rabbitmq超级详细的笔记,包括安装,基本命令,rabbitmq的七种消息模式,以及死信队列,延迟队列,优先级队列和惰性队列的介绍

    RabbitMQ 文章目录 RabbitMQ 1 RabbitMQ介绍 1.1 基本介绍 1.2 RabbitMQ的安装 1.2.1 ubuntu20.04 安装rabbitmq 1.2.2 cent ...

  3. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?

    以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...

  4. 你真的知道怎么实现一个延迟队列吗 ?

    作者:xiewang,腾讯 IEG 运营开发工程师 前言 延迟队列是我们日常开发过程中,经常接触并需要使用到的一种技术方案.前些时间在开发业务需求时,我也遇到了一个需要使用到延迟消息队列的需求场景,因 ...

  5. python延时队列_如何通过Python实现RabbitMQ延迟队列

    最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用rabbitmq做异步处理任务的中间件,所 ...

  6. 理论 | 六种延迟队列的实现方案

    这是小小本周的第五篇,本篇将会说出六种延迟队列的基本实现方案 延迟队列的基本应用 延迟队列,顾名思义,就是具有队列的特性,再增加一个延迟消费队列的功能,可以指定队列中的消息在哪个时间点被消费.延迟队列 ...

  7. RabbitMQ 延迟队列-对于入门来说可以快速上手

    RabbitMQ 延迟队列-非常非常实用 RabbitMQ 延迟队列-非常非常实用 一.使用场景 二.消息延迟推送的实现 三.项目具体实现 RabbitMQ 延迟队列-非常非常实用 一.使用场景 ​ ...

  8. 消息队列之延迟队列超详细入门教程速看

    一. 延迟队列的应用场景 1.具体应用 关于消息队列我们已经很熟悉了,我们知道在消息队列中可以实现延迟队列效果,那你知道延迟队列有哪些使用场景吗?这里我给大家总结了延迟队列的几个经典使用场景,看看你的 ...

  9. SylixOS中断延迟队列

    概念 内核在处理中断请求时,要求在单位时间内可以处理尽可能多的中断,也就是系统要求处理中断的吞吐率要尽可能地大.这就要求中断处理程序要尽可能地短小精悍,并且不能有耗时操作.但是大多数的中断处理程序是很 ...

  10. Springboot----项目整合微信支付(引入延迟队列实现订单过期取消以及商户主动查单)

    前言 目前更新的是Springboot项目整合微信支付系列的文章,可以在我的主页中找到该系列其他文章,这一系列的文章将会系统介绍如何在项目中引入微信支付的下单.关单.处理回调通知等功能.由于前面创作经 ...

最新文章

  1. 学python有哪些书推荐-Python 有哪些入门学习方法和值得推荐的经典教材?
  2. Unsupported Hardware Detected
  3. 心动的本质是什么_《心动的信号3》:在“烟火气”里嗑糖,素人恋爱究竟有多上头?...
  4. zap安装提示java_使用API调用进行ZAP身份验证
  5. 别再用if-else了,用注解去代替他吧
  6. C语言程序设计double,C语言中double类型数据占字节数为
  7. java 内存接口_java中多态机制的内存解析、抽象类、接口
  8. linux gettimeofday 头文件,linux-时间编程-time、gmtime、localtime、asctime、ctime、gettimeofday、sleep、usleep...
  9. hdu 4405 Aeroplane chess 概率dp
  10. 爬虫_4、requests的post方法以及json字符串处理
  11. position四个属性值的关系
  12. [Unity3D]推荐几个不错的网站
  13. 性能工具之ab压力测试工具及ab命令详解
  14. 7天从代码入门到开发应用,怎样快速提高代码能力?
  15. 什么叫无差别伤害_无差别伤害背后的差别
  16. 2020年中国各省GDP简析
  17. 什么是Linux 的xxd
  18. 电脑回收站的东西删了怎么恢复?60%的人都用过这2个方法
  19. 常规保养配件信息查询api接口
  20. 国内云服务器怎么选配置?如何低价购买国内云主机?

热门文章

  1. MFC中如何在CMainFrame类中访问CxxxView视图类中的成员
  2. python-获取满足条件的索引值np.where
  3. 在QGIS中使用GEE插件
  4. 使用envi对图像进行对比度拉伸并保存
  5. day6--pandas
  6. Doc命令行执行php中文乱码问题
  7. 山西汾阳中学2021高考成绩查询,2021山西高考成绩查询时间
  8. hadoop ubantu环境搭建_创帆云大数据教程系列1-搭建基于docker的hadoop环境安装规划、容器通信及zookeeper...
  9. 实习成长之路:面试官说的MySQL高可用-------主备一致到底是什么?
  10. 题目:学习成绩 = 90分的同学用A表示,60 - 89分之间的用B表示,60分以下的用C表示