前言

  最近需要使用到消息队列相关技术,于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题,归纳了一下,大概有以下几种:

  1. 临时异常,如数据库网络闪断、http请求临时失效等;

  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行;

  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理;

  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等;

  5. 非法异常,一些伪造、攻击类型的消息。

  针对这些异常,我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。

方案

  1. 消息均使用Exchange进行通讯,方式可以是direct或topic,不建议fanout。

  2. 根据业务在Exchange下分配一个或多个Queue,同时设置一个审计线程(Audit)监听所有Queue,用于记录消息到MongoDB,同时又不阻塞正常业务处理。

  3. 生产者(Publisher)在发布消息时,基于AMQP协议,生成消息标识MessageId和时间戳Timestamp,根据消息业务添加头信息Headers便于跟踪。

  

  4. 消费者(Comsumer)消息处理失败时,则把消息发送到重试交换机(Retry Exchange),并设置过期(重试)时间及更新重试次数;如果超过重试次数则删除消息。

  5. 重试交换机Exchange设置死信交换机(Dead Letter Exchange),消息过期后自动转发到业务交换机(Exchange)。

  6. WebApi可以根据消息标识MessageId、时间戳Timestamp以及头信息Headers在MongoDB中对消息进行检索或重试。

  

  注:选择MongoDB作为存储介质的主要原因是其对头信息(headers)的动态查询支持较好,同等的替代产品还可以是Elastic Search这些。

生产者(Publisher)

  1. 设置断线自动恢复

  var factory = new ConnectionFactory{Uri = new Uri("amqp://guest:guest@192.168.132.137:5672"),AutomaticRecoveryEnabled = true  };

  2. 定义Exchange,模式为direct

  channel.ExchangeDeclare("Exchange", "direct");

  3. 根据业务定义QueueA和QueueB

  channel.QueueDeclare("QueueA", true, false, false);channel.QueueBind("QueueA", "Exchange", "RouteA");channel.QueueDeclare("QueueB", true, false, false);channel.QueueBind("QueueB", "Exchange", "RouteB");

  4. 启动消息发送确认机制,即需要收到RabbitMQ服务端的确认消息

  channel.ConfirmSelect();

  5. 设置消息持久化

  var properties = channel.CreateBasicProperties();properties.Persistent = true;

  6. 生成消息标识MessageId、时间戳Timestamp以及头信息Headers

  properties.MessageId = Guid.NewGuid().ToString("N");properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());properties.Headers = new Dictionary<string, object>  {{ "key", "value" + i}};

  7. 发送消息,偶数序列发送到QueueA(RouteA),奇数序列发送到QueueB(RouteB)

  channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);

  8. 确定收到RabbitMQ服务端的确认消息

  var isOk = channel.WaitForConfirms();if (!isOk){throw new Exception("The message is not reached to the server!");} 

  完整代码

效果:QueueA和QueueB各一条消息,QueueAudit两条消息

 

  注:Exchange下必须先声明Queue才能接收到消息,上述代码并没有QueueAudit的声明;需要手动声明,或者先执行下面的消费者程序进行声明。

正常消费者(ComsumerA)

  1. 设置预取消息,避免公平轮训问题,可以根据需要设置预取消息数,这里是1

  _channel.BasicQos(0, 1, false);

  

  2. 声明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");_channel.QueueDeclare("QueueA", true, false, false);_channel.QueueBind("QueueA", "Exchange", "RouteA");

  3. 编写回调函数

 注:设置了RabbitMQ的断线恢复机制,当RabbitMQ连接不可用时,与MQ通讯的操作会抛出AlreadyClosedException的异常,导致主线程退出,哪怕连接恢复了,程序也无法恢复,因此,需要捕获处理该异常。

异常消费者(ComsumerB)

  1. 设置预取消息

  _channel.BasicQos(0, 1, false);

  2. 声明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");_channel.QueueDeclare("QueueB", true, false, false);_channel.QueueBind("QueueB", "Exchange", "RouteB");

  3.  设置死信交换机(Dead Letter Exchange)

  var retryDic = new Dictionary<string, object>  {

      {"x-dead-letter-exchange", "Exchange"},{"x-dead-letter-routing-key", "RouteB"}};_channel.ExchangeDeclare("Exchange_Retry", "direct");_channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);_channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");

  4. 重试设置,3次重试;第一次1秒,第二次10秒,第三次30秒

  _retryTime = new List<int>  {1 * 1000,10 * 1000,30 * 1000  };

  5. 获取当前重试次数

  var retryCount = 0;if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount")){retryCount = (int)ea.BasicProperties.Headers["retryCount"];_logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");}

  6. 发生异常,判断是否可以重试

  private bool CanRetry(int retryCount){return retryCount <= _retryTime.Count - 1;} 

  7. 可以重试,则启动重试机制

审计消费者(Audit Comsumer)

  1. 声明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");_channel.QueueDeclare("QueueAudit", true, false, false);_channel.QueueBind("QueueAudit", "Exchange", "RouteA");_channel.QueueBind("QueueAudit", "Exchange", "RouteB");

  2. 排除死信Exchange转发过来的重复消息

  if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death")){...}

  3. 生成消息实体

  var message = new Message{MessageId = ea.BasicProperties.MessageId,Body = ea.Body,Exchange = ea.Exchange,Route = ea.RoutingKey};

  4. RabbitMQ会用bytes来存储字符串,因此,要把头中bytes转回字符串

  if (ea.BasicProperties.Headers != null){var headers = new Dictionary<string, object>();foreach (var header in ea.BasicProperties.Headers){if (header.Value is byte[] bytes){headers[header.Key] = Encoding.UTF8.GetString(bytes);}else{headers[header.Key] = header.Value;}}message.Headers = headers;}

  5. 把Unix格式的Timestamp转成UTC时间

  if (ea.BasicProperties.Timestamp.UnixTime > 0){message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);message.Timestamp = offset.UtcDateTime;}

  6. 消息存入MongoDB

  _mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);

  MongoDB记录:

  

  重试记录:

  

消息检索及重发(WebApi)

  1. 通过消息Id检索消息

  

  2. 通过头消息检索消息

  

  

  3. 消息重发,会重新生成MessageId

  

  

Ack,Nack,Reject的关系

  1. 消息处理成功,执行Ack,RabbitMQ会把消息从队列中删除。

  2. 消息处理失败,执行Nack或者Reject:

  a) 当requeue=true时,消息会重新回到队列,然后当前消费者会马上再取回这条消息;

  b) 当requeue=false时,如果Exchange有设置Dead Letter Exchange,则消息会去到Dead Letter Exchange;

  c) 当requeue=false时,如果Exchange没设置Dead Letter Exchange,则消息从队列中删除,效果与Ack相同。

  3. Nack与Reject的区别在于:Nack可以批量操作,Reject只能单条操作。

  

RabbitMQ自动恢复

连接(Connection)恢复

  1. 重连(Reconnect)

  2. 恢复连接监听(Listeners)

  3. 重新打开通道(Channels)

  4. 恢复通道监听(Listeners)

  5. 恢复basic.qos,publisher confirms以及transaction设置

  

拓扑(Topology)恢复

  1. 重新声明交换机(Exchanges)

  2. 重新声明队列(Queues)

  3. 恢复所有绑定(Bindings)

  4. 恢复所有消费者(Consumers)

异常处理机制

  1. 临时异常,如数据库网络闪断、http请求临时失效等

  通过短时间重试(如1秒后)的方式处理,也可以考虑Nack/Reject来实现重试(时效性更高)。

  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行

  通过长时间重试(如1分钟、30分钟、1小时、1天等),等待B任务先执行完的方式处理。

  

  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理

  等系统修正后,通过消息重发的方式处理。

  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等

  等系统恢复后,通过消息重发的方式处理。

  5. 非法异常,一些伪造、攻击类型的消息

  多次重试失败后,消息从队列中被删除,也可以针对此业务做进一步处理。

源码地址

https://github.com/ErikXu/RabbitMesage

相关文章:

  • CAP带你轻松玩转ASP.NETCore消息队列

  • .NetCore Cap 结合 RabbitMQ 实现消息订阅

  • [译]RabbitMQ教程C#版 - 发布订阅

原文地址: https://www.cnblogs.com/Erik_Xu/p/9515208.html


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

RabbitMQ一个简单可靠的方案(.Net Core实现)相关推荐

  1. 如何实现一个简单的网络帧同步方案

    在网络游戏中,网络同步方案大概有下面三种: 状态同步,即state synchronization 快照同步,即 snapshot synchronization 帧同步,即lock step syn ...

  2. 微信抢红包代码 python_用 Python 实现一个简单的微信红包算法

    今年过年各位一定在微信里抢了不少红包.那么当别人是手气王而你只抢到1分钱的时候,你有没有想过,如果你来实现红包的分配算法,会怎么写? 这里我给一个简单的实现方案. 基本思路就是,有多少个红包,就循环多 ...

  3. Net Core下使用RabbitMQ比较完备两种方案(虽然代码有点惨淡,不过我会完善)

    一.前言     上篇说给大家来写C#和Java的方案,最近工作也比较忙,迟到了一些,我先给大家补上C#的方案. 二.使用的插件     HangFire 一个开源的.NET任务调度框架,最大特点在于 ...

  4. ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

    很长一段时间以来,我都在思考如何在ASP.NET Core的框架下,实现一套完整的事件驱动型架构.这个问题看上去有点大,其实主要目标是为了实现一个基于ASP.NET Core的微服务,它能够非常简单地 ...

  5. 第四章 .net core做一个简单的登录

    项目目标部署环境:CentOS 7+ 项目技术点:.netcore2.0 + Autofac +webAPI + NHibernate5.1 + mysql5.6 + nginx 开源地址:https ...

  6. 路由到另外一个页面_一个简单的Vue按钮级权限方案

    在年初开发一个中后台管理系统,功能涉及到了各个部门(产品.客服.市场等等),在开始的版本中,我和后端配合使用了花裤衩手摸手系列的权限方案,前期非常nice,但是慢慢的随着功能增多.业务越来越复杂,就变 ...

  7. 一个简单粗暴的前后端分离方案

    项目背景 刚刚参加完一个项目,背景:后端是用java,后端服务已经开发的差不多了,现在要通过web的方式对外提供服务,也就是B/S架构.后端专注做业务逻辑,不想在后端做页面渲染的事情,只向前端提供数据 ...

  8. 给 asp.net core 写一个简单的健康检查

    给 asp.net core 写一个简单的健康检查 Intro 健康检查可以帮助我们知道应用的当前状态是不是处于良好状态,现在无论是 docker 还是 k8s 还是现在大多数的服务注册发现大多都提供 ...

  9. 为什么函数lamda显示权限不足_一个简单的Vue按钮级权限方案

    在年初开发一个中后台管理系统,功能涉及到了各个部门(产品.客服.市场等等),在开始的版本中,我和后端配合使用了花裤衩手摸手系列的权限方案,前期非常nice,但是慢慢的随着功能增多.业务越来越复杂,就变 ...

最新文章

  1. Windows/Linux下引用jar包,并用javac/java编译运行
  2. emacs Linux Java编程环境_Linux下搭建用emacs查看代码的开发环境
  3. secureCRt中文乱码问题
  4. 聚集云原生,可观测性的实践与探索 | 线下技术沙龙
  5. js获取datagrid行,但是行改变了肿么办?
  6. Java死了还是无敌?
  7. ps 和 kill 结合使用
  8. Java笔记-使用CXF开发WebService服务器
  9. 二维正则表达式v0.1
  10. logging 模块 与 logging 固定模块
  11. Java获取接口所有实现类的方式
  12. mysql建表的字段类型和约束条件
  13. linux应用之Lamp(apache+mysql+php)的源码安装(centos)
  14. UIScrollView总结
  15. linux中删除用户显示已登录,linux下用户及用户组:查看,新增,删除
  16. mybitis SQL insert into 多条数据
  17. 微信支付超详细教程(附商城订单处理逻辑)
  18. Python使用pyserial实现串口收发
  19. word有空白段删不掉 解决办法
  20. IP网络摄像头实现远程监控、直播的思路

热门文章

  1. MySQL安装时出现的问题
  2. stl中Priority Queues(优先队列)的基本用法
  3. asp.net web常用控件FileUpload(文件上传控件)
  4. Linux运维实战之DNS的高级配置(转发器、视图等)
  5. MYSQL性能优化分享(分库分表)
  6. [1197]约瑟夫问题 (循环链表)SDUT
  7. Python 学习笔记(三)Function
  8. 指定特定的内容为首页
  9. MSSQL 2008 企业管理器打开命令
  10. .NET 为大型应用接入 ApplicationStartupManager 启动流程框架