由于最近公司做的项目,需要发短信/邮件/第三方接口异步回调信息等的及时处理,自己就简单的研究了以下RabbitMQ在NetCore中的实现。
RabbitMQ是什么具体就不再这里详细介绍了,自己去百度。本文主要把自己封装的RabbitMQ的使用介绍给大家,并提供相应的Nuget包供大家下载使用,谢谢各位大佬,本人能力有限。

MQ连接配置

在appsettings中加入如下内容:

//消息队列"RabbitMQ": {"HostName": "127.0.0.1", // 主机名称"Port": 5672, // 主机端口"UserName": "guest", // 连接账号"Password": "guest" // 连接密码}

自动读入到RabbitMQOption中

MQ交换机类型

这里通过枚举类ExchangeType我定义了四种交换机类型

生产者

生产者对应的IRabbitMQProducer接口以及其实现RabbitMQProducer已经自动通过单例模式注入,使用的时候你只需要注入即可,如果你想自己实现,可以覆盖我的实现类

生产者异常处理机制

生产者投递消息失败可能有以下几种情况:
1.MQ宕机
2.MQ发送数据到交换机失败
3.MQ发送到交换机成功路由到队列失败

对于第1点:
你可以实现IMQCrashHandle写宕机等问题造成的连接失败的公共处理方式
这里考虑的是采取短信或邮件或其他方式通知人员处理MQ.

对于第2,3点:
第1点的问题就会引发2,3点问题
这里采用了MQ消息投递的Confirm异步处理确认模式
这里你可以实现IMQProducerHandle接口对消息投递失败或成功进行相应的处理。

这里你可以在Success方法中对成功的消息做记录等操作,在Failure函数中可以对失败的消息进行自己存入db中等进行处理

消费者

消费者统一一接口IRabbitMQConsumer。

常规的消费者

用于普通的及时发送消息及时处理消息
主要是RabbitMQConsumer类,如果需要使用只需要继承RabbitMQConsumer抽象类即可

消峰消费者

用于对某一有限资源的访问控制等,目前未实现

利用死信队列实现延时处理的消费者

用于对消息进行延迟处理,分为整个管道所有消息同一延时,或每个消息延迟时间不同
延时处理队列可以通过实现RabbitMQDelayDeadLetterConsumer抽象类即可
如果延时消息消费失败,不会有重试机制,需要自己实现IMQConsumerHandle来处理

优先级消费者

目前暂未实现

消费者处理消息结果

在MQAction枚举中定义了消费者处理的枚举。主要有以下四种。

这里注意:对于使用JoinRetryQueue重试策略,次数达到之后消息的处理状态会由RETRY转为FAILURE,如果这时候需要外部接口处理需要实现IMQConsumerHandle中的Failure函数来自己处理消息

根据处理结果可以通过外部接口对消息进行处理

在IMQConsumerHandle接口分别定义了Success Retry Failure Reject分别对应上述的几种消息处理结果
可以针对这几种结果,自己实现该接口对其进行处理,注意一定要处理,因为在消费者内部已经是把该消息消费了

消费者异常处理机制

消费者出现异常,主要是消费失败,消息不能及时从管道中处理。
这里对于失败的消息,主要有以下几种策略进行处理。
在ConsumerRetryPolicy枚举类定义了重试策略

使用例子

初始化MQ

在Net5.0中在Startup.cs ConfigureSevice中加入以下代码

service.InitRabbitMQ(configuration);

在Net6.0直接在Program.cs中加入以下代码

//初始化RabbitMQ
builder.Services.InitRabbitMQ(builder.Configuration);

定义一个消费者

注入消费者发送消息到对应交换机和路由


这里我采用匿名Action来处理失败或成功的消息。

测试结果

RabbitMQ搭建好之后,启动项目即可在MQ界面中看到相应的交换机和队列已经建立成功

消息发送测试

注意消息发送之后都是序列化为字符串了的,需要消费消息需要反序列化才行

发送消息的时候把MQ停止了

可以从下面的截图中看到公共的MQ宕机连接不上的处理接口生效了,以及MQ消息投递失败的Action处理也成功了

正常投递消息并消费成功

正常投递消息模拟消费失败重试

这里我把消息内容从success改为retry表示需要重试

从运行结果可以看到重试了三次之后,状态转为处理失败,这个时候就需要你自己在外部处理消息了,如果外部不处理消息就会丢失

正常投递消息模拟消费直接转失败状态

正常投递消息模拟消费直接转拒绝状态

如果你需要自己实现相应的处理,你可以自己实现以上相应的接口,并配合我之前基于Autofac自动注入类通过Target来动态替换,具体地址为
https://blog.csdn.net/weixin_43872830/article/details/121157725?spm=1001.2014.3001.5502

为消息设置不同的重试时间间隔

这里主要采用延时队列加上死信队列,利用XMessageTTL实现不同时间间隔的重试,过期时间作用于消息上而不是队列上整体设置过期时间

消息的重试次数将由RetryIntervals的集合的长度决定,如果同时设置了RetryCount以及RetryInterval,那么起决定作用的也是RetryIntervals.
运行结果

从结果中我们可以看到10s和20s后分别重试了一次

延迟消息处理例子

消息过期时间相同的实现类

如果需要设置消息不同过期时间,这里的XMessageTTL不需要设置,然后在消息发送的时候设置消息的过期时间即可

启动项目后可以看到相应的交换机和队列已经创建

运行结果:

从结果可以看到10秒之前发送的消息被消费了

从结果可以看到消息进入队列后延时时间不一样,并且不会因为前面消息的过期时间比后面的过期时间长,而阻塞后面消息过期时间之后不能及时出队列

需要注意某个交换机或者队列一旦初始化完成之后,想要在改变交换机或者队列的一些属性就比较困难,比如修改某些属性重新启动程序,会提示mq的错误等,这些需要您自行了解,有一个叫做policy的东西。所以原则上交换机或队列一旦想好怎样设置之后,就不再改动它,特别是已经运行于生产环境中

如果需要使用我封装的RabbitMQ,可以在NuGet上搜索OpenDeepSpace.RabbitMQ进行下载使用与了解

基于NetCore的RabbitMQ使用相关推荐

  1. xms跨平台基础框架 - 基于.netcore

    背景 鄙人经过多年开发,数百个项目"打磨(折磨)",各种国内外框架平台都有涉及,没有一款称心顺手的,原因有三,一是设计反人类,二是不开源根本无法突破框架限制,三是即使开源也是阉割版 ...

  2. 基于.NetCore开发博客项目 StarBlog - (12) Razor页面动态编译

    系列文章 基于.NetCore开发博客项目 StarBlog - (1) 为什么需要自己写一个博客? 基于.NetCore开发博客项目 StarBlog - (2) 环境准备和创建项目 基于.NetC ...

  3. xms应用框架 - 基于.netcore

    xms应用框架 - 基于.netcore 背景 一.xms是什么 二.能干什么 三.目标 四.框架介绍 1.元数据管理 2.组织架构 3.授权体系 4.高度可视化配置 5.业务扩展 6.流程 7.二次 ...

  4. 一个基于NetCore可视化编辑CMS系统

    今天给大家推荐一个基于.NetCore开发的.开源的CMS系统. 项目简介 这是一个支持可视化设计的内容管理系统,可通过托拉拽来实现页面布局,所见即所得.系统采用插件式开发,支持插件扩展.模板扩展,可 ...

  5. 基于.NetCore开发博客项目 StarBlog - (13) 加入友情链接功能

    系列文章 基于.NetCore开发博客项目 StarBlog - (1) 为什么需要自己写一个博客? 基于.NetCore开发博客项目 StarBlog - (2) 环境准备和创建项目 基于.NetC ...

  6. 基于Docker搭建RabbitMQ(多图)

    1.一点废话(可直接跳转至标题2) 通常在拉取镜像之前,除了通过命令执行 docker search xxx 之外,我们还可以通过 Docker 镜像仓库查询指定的镜像. 如下是 rabbitmq 镜 ...

  7. 基于EasyNetQ的RabbitMQ封装类

    最近在捣鼓RabbitMQ,为了方便使用,自己基于EasyNetQ封装了一个类,现在贴出来还望各路大佬神明指点,共同学习. 1 /// <summary> 2 /// RabbitMQ客户 ...

  8. .NET Core微服务之基于EasyNetQ使用RabbitMQ消息队列

    Tip: 此篇已加入.NET Core微服务基础系列文章索引 一.消息队列与RabbitMQ 1.1 消息队列 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含 ...

  9. .NET CoreWebApi基于EasyNetQ使用RabbitMQ消息队列

    一.消息队列与RabbitMQ 1.1 消息队列 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中 ...

最新文章

  1. 【并发编程】Future模式及JDK中的实现
  2. UNION JOIN 连接表
  3. vector和list容器有哪些区别
  4. hadoop将消亡_数据科学家:适应还是消亡!
  5. 第三十期:BAT 为什么都看上了重庆?
  6. (笔记)Mysql命令drop database:删除数据库
  7. android保存字符到sd卡,android 保存TXT文件到SD卡方法
  8. vs如何显示arcgis 二次开发工具控件
  9. 【Ajax技术】解决XHR与中文乱码问题
  10. Django 配置访问静态文件
  11. 日期时间格式与时间戳互转
  12. ENSP直连路由和静态路由配置(含路由表结构分析)
  13. 服务器防火墙firewalld,指定端口开放
  14. 揭秘中国球员十大豪宅
  15. python 安装scapy_安装scapy时出错
  16. 如何卸载avast free antivirus软件?
  17. windows 系统netstat -ano查看机器端口占用情况
  18. 《黑镜》(Black Mirror)
  19. 如何把Mrakdown格式的博客文章原封不动的打印下来?
  20. 佳顺通用进销存系统去广告_母婴收银系统应该如何选择?

热门文章

  1. linux新建挂载目录命令,告诉你Ubuntu添加新分区并设置挂载点的方法及命令
  2. tiny6410刷机教程
  3. mac 命令行 解压7z文件_命令行压缩解压7z
  4. 利用傅里叶变换获取低频和高频部分图像
  5. python OpenCV:绘制一个圆形图片
  6. 【stm32f0】stm32 总中断的打开与关闭
  7. CodeForces 714C Sonya and Queries
  8. [MATLAB]b样条方程基函数方程的表达式, 及n阶基函数作图
  9. Python plot() 画图标记 marker
  10. Dbeaver报错:The server time zone value ‘etd‘ is unrecognized