1. Exchange(交换机)的作用

在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,

队列再将消息以推送或者拉取方式给消费者进行消费

创建消息                          路由键              pull/push

生产者------------>交换机------------>队列------------>消费者

交换机原理图见:images/01

2. Exchange(交换机)的类型

1.直连交换机:Direct Exchange  (接收一个信息发送给对应的队列)

直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。

这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。

注1:什么是路由键

每个消息都有一个称为路由键(routing key)的属性,它其实就是一个简单的字符串

注2:直连交换机适用场景

有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

2.主题交换机:Topic Exchange (一个交换机可以把信息发送给多个队列)

直连交换机的缺点!

直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的routing_key,

假设每个交换机上都绑定一堆的routing_key连接到各个队列上。那么消息的管理就会异常地困难。

所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,

主题交换机会根据这个规则将数据发送到对应的(多个)队列上。

主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中

*表示一个单词

#表示任意数量(零个或多个)单词。

示例:

队列Q1绑定键为 *.TT.*

队列Q2绑定键为TT.#

如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到

如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到

3.扇形交换机:Fanout Exchange (接收信息之后发送给所有队列)

扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。

扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,

所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

这个交换机没有路由键概念,就算你绑了路由键也是无视的。

4.首部交换机:Headers exchange

5.默认交换机

实际上是一个由RabbitMQ预先声明好的名字为空字符串的直连交换机(direct exchange)。它有一个特殊的属性使得它对于

简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

如:当你声明了一个名为”hello”的队列,RabbitMQ会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为”hello”。

因此,当携带着名为”hello”的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为”hello”的队列中

类似amq.*的名称的交换机:

这些是RabbitMQ默认创建的交换机。这些队列名称被预留做RabbitMQ内部使用,不能被应用使用,否则抛出403 (ACCESS_REFUSED)错误

6.Dead Letter Exchange(死信交换机)

在默认情况,如果消息在投递到交换机时,交换机发现此消息没有匹配的队列,则这个消息将被悄悄丢弃。

为了解决这个问题,RabbitMQ中有一种交换机叫死信交换机。当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,

等待重试或者人工干预。这个过程中的exchange和queue就是所谓的”Dead Letter Exchange 和 Queue

3. 交换机的属性

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

Name:交换机名称

Durability:是否持久化。如果持久性,则RabbitMQ重启后,交换机还存在

Auto-delete:当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它

Arguments:扩展参数

4. 综合案例:交换机的使用

rabbitmq02                     #主模块

rabbitmq-provider            #生产者

rabbitmq-consumer            #消费者

0.给子模块添加依赖

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

<optional>true</optional>

</dependency>

1.直连交换机(Direct Exchange)

1.RabbitmqDirectConfig

关键代码:

new Queue("directQueue", true);//true 是否持久

new DirectExchange("directExchange");

BindingBuilder.bind(directQueue()).to(directExchange()).with("directRouting");

2.SendMessageController

关键代码:rabbitTemplate.convertAndSend("directExchange", "directRouting", map);

sendDirectMessage:进行消息推送(也可以改为定时任务,具体看需求)

3.查看rabbitmq管理界面

我们目前还创建消费者rabbitmq-consumer,消息没有被消费的,我们去rabbitMq管理页面看看,是否推送成功

Overview选项卡,可以查看到刚刚创建的消息

4.创建消息接收监听类DirectReceiver

注1:新版jdk日期及格式化

LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

注2:rabbitTemplate和amqpTemplate有什么关系

查看源码中会发现rabbitTemplate实现自amqpTemplate接口,两者使用起来并无区别,功能一致

注3:不要@Configuration写成了@Configurable,这两个长得很像

@Configuration该注解是可以用来替代XML文件。

手动new出来的对象,正常情况下,Spring是无法依赖注入的,这个时候可以使用@Configurable注解

2.主题交换机(Topic Exchange)

RabbitTopicConfig

//绑定键

public final static String people = "people.*";

public final static String man = "people.man";

public final static String woman = "people.woman";

3.扇形交换机(Fanout Exchange)

//因为是扇型交换机, 路由键无需配置,配置也不起作用,两处地方均未配置路由键

BindingBuilder.bind(queueA()).to(fanoutExchange());

rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE_NAME,null, map);

附录一:创建消息对应的模拟数据,它是一个Map集合

/**

* 创建消息对应的模拟数据,它是一个Map集合

*/

private Map<String, Object> getMap() {

String messageId = UUID.randomUUID().toString().replace("-", "");

String messageData = "test message, Hello!";

String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

Map<String, Object> map = new HashMap<String, Object>();

map.put("messageId", messageId);

map.put("messageData", messageData);

map.put("createTime", createTime);

return map;

}

1. 场景:“订单下单成功后,15分钟未支付自动取消”

1.传统处理超时订单

采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性能会有很大的要求,

并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,

即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,

然后再做其他的业务操作

2.rabbitMQ延时队列方案

一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,

并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失

2. TTL和DLX

rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换器(DLX)和设置过期时间(TTL)结合起来实现延迟队列

1.TTL

TTL是Time To Live的缩写, 也就是生存时间。

RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。

如果两种方式一起使用消息对TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。

默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消息,否则丢弃。

设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。

设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。

2.DLX和死信队列

DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

死信队列是指队列(正常)上的消息(过期)变成死信后,能够后发送到另外一个交换机(DLX),然后被路由到一个队列上,

这个队列,就是死信队列

成为死信一般有以下几种情况:

消息被拒绝(basic.reject or basic.nack)且带requeue=false参数

消息的TTL-存活时间已经过期

队列长度限制被超越(队列满)

注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,

注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明

x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键

3. 延迟队列

通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费

注1:延迟队列(即死信队列)产生流程见“images/01 死信队列产生流程.png”

4. 开发步骤

1.生产者创建一个正常消息,并添加消息过期时间/死信交换机/死信路由键这3个参数

关键代码1

new Queue(name, durable, exclusive, autoDelete, arguments);

new Queue(NORMAL_QUEUE, true, false, false, map)

参数说明:

name:队列名字

durable:true则持久队列

exclusive:如果我们声明一个排他队列(该队列将仅由声明者的连接使用),则为true

autoDelete:服务器不再使用时应删除队列,则为true

arguments:用于声明队列的参数

map.put("x-message-ttl", 10000);//message在该队列queue的存活时间最大为10秒

map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)

map.put("x-dead-letter-routing-key", DELAY_ROUTING_KEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键

关键代码2

new DirectExchange(NORMAL_EXCHANGE, true, false);

2.消费者A

正常情况下,由消费者A去消费队列“normal-queue”中的消息,但实际上没有,而是等消息过期

3.消费者B

消息过期后,变成死信,根据配置会被投递到DLX,然后根据死信路由键投到死信队列(即延时队列)中

4. json转换

1.生产者

见资料:ProviderRabbitTemplateConfig.java

2.消费者

见资料:ConsumerRabbitTemplateConfig

注1:巨坑

使用最新版本的springboot2.3.4.RELEASE由于版本冲突会出现对象转换错误的异常,

换回到2.2.0.RELEASE即可解决问题

附录一:英文

delay:延迟的

normal:正常

exchange、route、queue

Receiver

附录二:关键代码

//durable:true则持久队列

//exclusive:如果我们声明一个排他队列(该队列将仅由声明者的连接使用),则为true

//autoDelete:服务器不再使用时应删除队列,则为true

//arguments:用于声明队列的参数

String name = "normalQueue";

boolean durable = true;

boolean exclusive = false;

boolean autoDelete = false;

Map<String, Object> arguments = new HashMap<String, Object>();

arguments.put("x-message-ttl", 10000);//message在该队列queue的存活时间最大为10秒

arguments.put("x-dead-letter-exchange", "deadExchange"); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)

arguments.put("x-dead-letter-routing-key", "dead_queue_route");//x-dead-letter-routing-key参数是给这个DLX指定路由键

return new Queue(name, durable, exclusive, autoDelete, arguments);

RabbitMQ队列,直连队列,主题队列,扇形队列,死信队列,延迟相关推荐

  1. springboot 与rabbitmq集成+生产者投递确认+消费者手动确认+TTL+死信队列+延时队列

    1.生产者的消息可靠性投递机制 1)springboot yml文件配置 spring:rabbitmq:host: 10.0.23.83username: lifwepassword: 123456 ...

  2. RabbitMQ(六)死信队列

    6.1概念 ​ 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer 从 ...

  3. RabbitMq(五) -- 死信队列和延迟队列

    1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue ...

  4. Rabbitmq死信队列

    目录 1.什么是死信队列 2.产生死信队列的原因 3.代码实现---直连交换机 3.1.导入依赖 3.2.配置rabbitmq连接信息 3.3.编写配置类 3.4.编写生产者 3.5.编写消费者 3. ...

  5. rabbitMQ学习-死信队列

    死信队列 死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致qu ...

  6. RabbitMQ死信队列,延时队列

    死信队列 消息被消费方否定确认,使用channel.basicNack或channel.basicReject, 并且此时requeue属性被设置为false. 消息在队列的存活时间超过设置的TTL时 ...

  7. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  8. Java秒杀系统实战系列~RabbitMQ死信队列处理超时未支付的订单(转)

    转自: https://juejin.cn/post/6844903903130042376 文末有源代码,非常棒 摘要: 本篇博文是"Java秒杀系统实战系列文章"的第十篇,本篇 ...

  9. SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压

    1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...

  10. RabbitMQ高级特性(五):RabbitMQ之死信队列DLX

    一.死信队列简介 (1)死信队列 死信队列,英文缩写:DLX .Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就 ...

最新文章

  1. Exchange 2010向外网发邮件的配置
  2. 遥感图像解译与单相机摄影测量
  3. 纯css用图片代替checkbox和radio,无js实现方法
  4. (转)java redis使用之利用jedis实现redis消息队列
  5. 2017.9.24 森林 失败总结
  6. android wine教程_技术|如何在 Android 上借助 Wine 来运行 Windows Apps
  7. shell 脚本检测端口状态
  8. 文件管理器android实现,基于Android的文件管理器的设计与实现
  9. Ubuntu20.04安装网易云音乐播放器
  10. ubuntu 禁用触摸板
  11. Mac使用终端命令合并分区
  12. 5.2 网络数据Excel存储
  13. echarts的渐变色配置 LinearGradient, 饼图默认渐变颜色设置不同的角度
  14. WCFService服务配置
  15. gcc、 binutils、gdb
  16. 计算机进行注销后会进入什么状态,请问电脑里注销是什么意思呀
  17. 计算机毕业设计Java宠物医院管理系统(源码+系统+mysql数据库+lw文档
  18. 如何在word(wps)文档中插入一条水平直线(横线)
  19. Kubernetes基于canel的网络策略
  20. 双十字星K线图解中双十字星的出现该如何解读

热门文章

  1. [Android] 百度地图API Android相关配置教程(包含获取包名、发布版SHA1和开发版SHA1)
  2. 在线解析短视频去水印工具
  3. Notepad ++ 汉化版
  4. 动手学深度学习-windows下环境配置
  5. Element-UI省市区(县)三级联动---基于VUX移动框架的x-address组件
  6. htlm5实习报告_Wa zhu ti网站html5搭建设计毕业论文+html源码+实习报告+答辩问题
  7. python金融数据分析与挖掘实战_[套装书]Python数据分析与挖掘实战(第2版)+Python金融大数据挖掘与分析全流程详解+Python金融数据分析(3册)...
  8. rtklib-RINEX文件读取-rinex.c解析(一)
  9. 专访全面智能CTO陈章:想法只有在技术落地的那一刻才有撬动地球的能力
  10. 41局域网交换机及其基本原理