前言

上篇讨论过消息投递和消息消费过程中如何确保可靠传输,也提及到消息到达RabbitMQ中到被消费前也需要可靠的留存,可因许多的不确定因素会影响着消息的存在与否。

消息中转点

生产者发送消息到RabbitMQ中,如果交换机根据自身类型和RoutingKey能够匹配到队列,则存入相关队列,但当匹配不到队列时,遇到两种情况而使得消息走向不同的方向,消息可能会丢失或是发回给生产者,这取决于生产者对消息的配置。

  • 生产者设置了Mandatory且为true,则消息回退给生产者。

  • 当生产者为设置Mandatory或是设置为false时,为了避免消息丢失,可以由交换机路由给备份交换机负责去搞定存储。

Mandatory

生产者发送消息时,可以设置一个参数mandatory,来决定消息到达RabbitMQ后,如果出现交换机根据自身类型及RoutingKey找不到合适的队列情况下,消息的一个走向。

  • 当mandatory为true时,消息则返回给生产者。

  • 当mandatory为false时,消息则被丢弃。

生产者代码

当在BasicPublish方法参数中设置mandatory为true且队列暂不声明时,仅有一个交换机,消息将会被返回。

var connFactory = new ConnectionFactory
{HostName = "xxx.xxx.xxx.xxx",Port = 5672,UserName = "rabbitmqdemo",Password = "rabbitmqdemo@test",VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{using (var channel = conn.CreateModel()){var exchangeName = "mandatory_publishsubscribe_exchange";channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");while (true){Console.WriteLine("消息内容(exit退出):");var message = Console.ReadLine();if (message.Trim().ToLower() == "exit"){break;}var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: exchangeName, routingKey: "", mandatory: true, basicProperties: null, body: body);Console.WriteLine("消息内容发送完毕:" + message);}}
}

生产者发送消息,交换机收到消息但无对应队列,消息被返回。

为了直观的知道消息返回到了生产者,我们可以增加一个监听器,来监听返回的消息。

监听回退消息

当mandatory设置为true,消息回退时可以监听消息

channel.BasicReturn += new EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>((sender, e) =>
{var message = Encoding.UTF8.GetString(e.Body.ToArray());Console.WriteLine($"收到回退消息:{message}");
});

生产者发送消息,因无匹配队列,消息被返回,可以直观的看到返回的消息。

备份交换机

当mandatory设置为false时,消息被丢失了,这种情况可不太好。可以使用备份交换机来存储原要被丢弃的消息,当需要这些消息的时候,还能拿到这些消息。实际上备份交换机没有什么特殊,和主交换机是一样的只是充当备份的角色。

生产者代码

  1. 在创建主交换机的时候,给定参数argument,设置该主交换机的备份交换机,指定备份交换机名称。

  2. 然后声明备份交换机并绑定一个队列,用于存储被丢弃的消息。

  3. 发送消息时mandatory参数设置为false。

var connFactory = new ConnectionFactory
{HostName = "xxx.xxx.xxx.xxx",Port = 5672,UserName = "rabbitmqdemo",Password = "rabbitmqdemo@test",VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{using (var channel = conn.CreateModel()){var exchangeName = "aedemo_publishsubscribe_exchange";var alternateExchangeName = "aedemo_ae_publishsubscribe_exchange";var arguments = new Dictionary<string, object>{{ "alternate-exchange", alternateExchangeName }};channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", arguments: arguments);channel.ExchangeDeclare(exchange: alternateExchangeName, type: "fanout");var alternateExchangeQueueName = alternateExchangeName + "_worker";channel.QueueDeclare(queue: alternateExchangeQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: alternateExchangeQueueName, exchange: alternateExchangeName, routingKey: "");while (true){Console.WriteLine("消息内容(exit退出):");var message = Console.ReadLine();if (message.Trim().ToLower() == "exit"){break;}var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: exchangeName, routingKey: "", mandatory: false, basicProperties: null, body: body);Console.WriteLine("消息内容发送完毕:" + message);}}
}

启动程序,可以从Web面板中看到主交换机和备份交换机都创建完毕,并且主交换机打上了有AE的标记。

生产者发送消息,经主交换机匹配但无合适队列后,转发给备份交换机,路由到其队列存储。

注:推荐使用Fanout类型的交换机,如果其他比如Direct,当主交换机转发到备份交换机,在进行匹配时候,如果消息给定的RoutingKey没有匹配到相应的队列,消息则会被丢失,这样一来,最初的预想就出现偏差了。

持久化

当RabbitMQ在异常情况下,比如系统宕机、重启、关闭等,可能会导致数据丢失,可靠性降低。针对这种情况,RabbitMQ提供了持久化机制,将消息本身和元数据(队列、交换机、绑定信息)都保存到磁盘中。具体分为三类持久化

  • 交换机持久化

  • 队列持久化

  • 消息持久化

交换机持久化

当RabbitMQ遇到异常情况(如服务重启)后,如果没有设置交换机持久化,那么交换机相关数据则会被丢失,生产者再发送消息到指定交换机时就失败了。

服务重启异常

1、在Web中新建一个交换机,指定非持久化模式。

2、新建一个队列,指定非持久化模式。

3、设置交换机和队列的绑定关系。

4、生产者前部分正常发送消息,中间经服务重启后,交换机、队列及绑定关系都被清除,生产继续发送消息,出现异常。

持久化设置

在声明交换机时可以指定durable参数设置为true(Web面板中也可设置)。

channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", durable: true, arguments: null);

RabbitMQ服务重启,生产者继续发送消息给交换机。

队列持久化

队列的持久化是队列声明时设置durable参数为true,如果队列不持久化,异常情况(如服务重启)后,队列元数据丢失,存储在内的消息也就丢失了。

服务重启异常

1、Web中创建一个交换机并设置为持久化模式。

2、创建一个队列并设置为非持久化模式

3、设置交换机和队列的绑定关系。

4、生产者前部分正常发送消息,中间经服务重启后,队列及绑定关系被清除,生产继续发送消息,匹配队列失败,消息被回退给生产者。

持久化设置

在声明队列时可以指定durable参数设置为true(Web面板中也可设置)。

channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

RabbitMQ服务重启,生产者继续发送消息给交换机。

消息持久化

队列的持久化仅能保证其自身的数据不丢失,而其存储的消息却不能保证不会丢失。

持久化设置

需要对消息消息设置持久化,以确保消息本身不会因异常情况(如服务重启)而丢失。在发送消息时,可以设置消息的基础属性,来支持消息的持久化。

var basicProperties = channel.CreateBasicProperties();
basicProperties.DeliveryMode = 2;// 1非持久化 2持久化channel.BasicPublish(exchange: exchangeName, routingKey: "", mandatory: true, basicProperties: basicProperties, body: body);

如此一来,当异常情况(如服务重启后),消息还是存在的。

注:消息持久化会影响性能,仅确保有价值的消息持久化,来权衡可靠与吞吐量。

2022-08-25,望技术有成后能回来看见自己的脚步

.Net CoreRabbitMQ消息存储可靠机制(下)相关推荐

  1. 网络:TCP维护安全可靠机制提供的定时器

    一.TCP为维护安全可靠机制提供了七大定时器 1.连接建立(connectionestablishment)"定时器: 在发送SYN报文段建立一条新连接时启动.如果在75秒内没有收到响应,连 ...

  2. 碳交易机制下考虑需求响应的综合能源系统优化运行论文复现——附代码

    目录 摘要: 一.区域综合能源系统(IEHS)的基本结构: 二.IEHS 优化运行模型: 三.求解方法: 四.求解结果: 五.Matlab复现代码 摘要: 综合能源系统是实现"双碳" ...

  3. .Net Discovery 系列之六--深入浅出.Net实时编译机制(下)

    接上文 在初始化时,HashTable中各个方法指向的并不是对应的内存入口地址,而是一个JIT预编译代理,这个函数负责将方法编译为本地代码.注意,这里JIT还没有进行编译,只是建立了方法表! 下表(表 ...

  4. storm消息可靠机制(ack)的原理和使用

    关于storm的基础,参照我这篇文章:流式计算storm 关于并发和并行,参照我这篇文章:并发和并行 关于storm的并行度解释,参照我这篇文章:storm的并行度解释 关于storm的流分组策略,参 ...

  5. 深度残差收缩网络:(四)注意力机制下的阈值设置

    对于基于深度学习的分类算法,其关键不仅在于提取与标签相关的目标信息,剔除无关的信息也是非常重要的,所以要在深度神经网络中引入软阈值化.阈值的自动设置,是深度残差收缩网络的核心贡献.需要注意的是,软阈值 ...

  6. 1.综合能源系统优化运行(碳交易机制下考虑需求响应的综合能源系统优化运行)

    目录 复现文章: 主程序: 结果图 结论 私聊即可! 主要内容: 碳交易机制下考虑需求响应的综合能源系统优化运行--魏震波 摘要: 综合能源系统是实现"双碳"目标的有效途径,为进一 ...

  7. 碳交易机制下考虑需求响应的综合能源系统优化运行

    碳交易机制下考虑需求响应的综合能源系统优化运行 首先,根据负荷响应特性将需求响应分为价格型和替代型 2 类,分别建立了基于价格弹性矩阵的价格型需求响应模型,及考虑用能侧电能和热能相互转换的替代型需求响 ...

  8. 碳交易机制下考虑需求响应的综合能源系统优化运行综合能源系统是实现“双碳”目标的有效途径

    碳交易机制下考虑需求响应的综合能源系统优化运行综合能源系统是实现"双碳"目标的有效途径,为进一步挖掘其需求侧可调节潜力对碳减排的作用,提出了一种碳交易机制下考虑需求响应的综合能源系 ...

  9. 22.碳交易机制下考虑需求响应的综合能源系统优化运行

    说明书 MATLAB代码:碳交易机制下考虑需求响应的综合能源系统优化运行 注意:另外还有含义柔性负荷.蓄冷式空调.共享储能以及碳捕集的综合能源系统优化运行代码,欢迎咨询 关键词:需求响应 碳交易机制 ...

最新文章

  1. No module named 'xxx’
  2. slam中特征点归一化原因以及方法
  3. 修复错误ModuleNotFoundError: No module named ‘pip‘
  4. 已知旋转矩阵求角度_如何推导旋转矩阵
  5. 【新星计划】MATLAB-冒号:符号详解
  6. html页面加文字横向滚动,js实现文字横向滚动
  7. golang防止MySQL注入_mysql – 如何最大限度地降低golang服务中下游服务中SQL注入的风险?...
  8. uva 437——The Tower of Babylon
  9. C 结构体嵌套一级指针 二级指针 动态分配内存
  10. js 值太大自动转换bignumber
  11. [golang]如何看懂调用堆栈
  12. 【Spring MVC】学习笔记汇总
  13. CSRF(跨站请求伪造)攻击 --
  14. 7-8 mmh学长的Excel表格 (20分)
  15. POJ 3378 树状数组+DP+离散化+高精度
  16. gx works2产品id_gx works2中文版下载|
  17. chrome浏览器提示“adobe flash player不是最新版本!”
  18. 泛型和容器--2--容器
  19. 前端开发框架:Ajax的基本入门和使用。
  20. 【2019阅读书单2020阅读计划】

热门文章

  1. 大数据与云计算学习(2)
  2. Android逆向之破解某僵尸游戏
  3. python画一朵“玫瑰”
  4. React 测试教程
  5. 第一周-PDCA学习模型
  6. 【设计模式】设计模式总结 ( 七大设计原则 | 创建型模式 | 结构型模式 | 行为型模式 ) ★★★
  7. IT知识图谱(只是从CSDN中把图片,一个个下载了)
  8. 《基于区块链技术的虚假新闻检测方法》文献阅读笔记+总结
  9. 慎入!超详细240页PPT!史上最强大的计算机网络导论!
  10. 美团动态线程池实践思路,开源了