KSO - .net6项目中使用RabbitMQ实际项目代码和思路讲解,包括各种踩坑
序章
首先网上有很多的总结和demo ,但是都是比较简单和不符合生产情况的,也没有对数据库的操作,基本上得到消息后也就是直接打印出来,基本上没啥参考价值,废话不多说直接讲解思路。
docker 部署的 参考我另外一遍博客 docker部署rabbitmq
1.总体思路
- 首先我要保证的是项目在启动之初,就要同时启动队列,并进行消费
- 队列的各种配置与RabbitMQ连接对象必须保证单例且必须全局注入
- 支持扩展也就是多个消费者,共同消费一个队列。
- 把方法独立出来,利于以后的扩展和业务增加
- 写入数据库中,保存数据(由于是子线程运行rabbitmq,所以没办法直接得到主线程的容器进行注入因为这个我纠结了两个小时,具体方法看代码)
2.包
下载 nuget包 RabbitMQ.Client 我这里下载的6.4.0
配置信息在web端的appsettings
"RabbitMQOptions": {"UserName": "guest","Password": "guest","Host": "127.0.0.1","Port": 5673,"ExchangeName":"test"},
生产者
接口/// <summary>
/// 生产消息
/// </summary>
public interface IMyPublisher<T> where T : class
{Task PublishAsync(T data, Encoding encoding = null);
}
具体实现public class MyPublisher<T> : IMyPublisher<T> where T : class
{private readonly MyRabbitMQOptions _myOptions;private readonly IConnection _connection;/// <summary>/// 非注入时使用此构造方法/// </summary>public MyPublisher(IConnection connection){_connection = connection;}/// <summary>/// 依赖注入自动走这个构造方法/// </summary>/// <param name="optionsMonitor"></param>/// <param name="factory"></param>public MyPublisher(IOptionsMonitor<MyRabbitMQOptions> optionsMonitor, ConnectionFactory factory){_myOptions = optionsMonitor.CurrentValue;_connection = factory.CreateConnection();}/// <summary>/// 发布消息,什么时候用到就直接注入接口,然后调用传参就可以了/// </summary>public Task PublishAsync(T data, Encoding encoding = null){var type = typeof(T);var queueName = type.FullName ;//这是队列名字,也可以传参,也可以写在配置文件里面,都行// 创建通道var channel = _connection.CreateModel();// 声明一个Exchangechannel.ExchangeDeclare(_myOptions.ExchangeName, ExchangeType.Direct, false, false, null);// 声明一个队列 channel.QueueDeclare(queueName, false, false, false, null);//将队列绑定到交换机channel.QueueBind(queueName, _myOptions.ExchangeName, queueName, null);// 对象转 object[] 发送var msg = JsonConvert.SerializeObject(data);byte[] bytes = (encoding ?? Encoding.UTF8).GetBytes(msg);channel.BasicPublish(_myOptions.ExchangeName, queueName, null, bytes);// 结束channel.Close();_connection.Close();return Task.CompletedTask;}
}
这里生产者,已经做完了,下一步要做的消费者
消费者,
public class EventHandlerService
{private IModel _channel;private string _queueName = "kso1";//队名public static IAdminRepository<SysLog> repository;// repositoy层也就是仓储用于做增删改查,你们改成自己的仓储层public static IServiceProvider service;public async void InsertDb(SysLog sys){var v = service.CreateScope();repository = v.ServiceProvider.GetRequiredService<IAdminRepository<SysLog>>();var task = await repository.InsertAsync(sys);}/// <summary>/// 消息接收方法/// </summary>public static Action<SysLog> MessageReceivedEvent;public async Task Begin(IConnection connection,IServiceProvider _service)//进行得到IServiceProvider 方便下面代码进行注入{service = _service;//创建通道_channel = connection.CreateModel();//消费者var consumer = new EventingBasicConsumer(_channel);try{var sys = new SysLog();//我的日志model类//接收到消息事件consumer.Received += async (ch, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.ToArray());var data = JsonConvert.DeserializeObject<xx>(message);sys = new SysLog() { Id = Guid.NewGuid(), Info = data.xx};if (sys is null){//否定:告诉Broker,这个消息我没有正常消费; requeue: true:重新写入到队列里去; false:你还是删除掉;_channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);}else{if (MessageReceivedEvent == null) MessageReceivedEvent += InsertDb;//如果还有业务逻辑,可以再加如MessageReceivedEvent += InsertDb1;。。一直加到你吐MessageReceivedEvent.Invoke(sys);//确认该消息已被消费_channel.BasicAck(ea.DeliveryTag, false);}};_channel.BasicConsume(_queueName, false, consumer);}catch (Exception ex){//确认该消息已被消费_channel.Dispose();//OnConsumerException(ex);}}/// <summary>/// 异常处理,日志/// </summary>/// <param name="ex">派生类不重写的话,异常被隐藏</param>public void OnConsumerException(Exception ex){//异常处理自己写,一般记个日志就可以了}
}
全局注入
写两个扩展方法 一个是 IServiceCollection 的扩展方法
#region 配置项public static IServiceCollection AddMyRabbitMQ(this IServiceCollection services, IConfiguration configuration){#region 配置项// 从Configuration读取"MyRabbbitMQOptions配置项var optionSection = configuration.GetSection("MyRabbitMQOptions");// 这个myOptions是当前方法使用MyRabbitMQOptions myOptions = new();optionSection.Bind(myOptions);// 加了这行,才可以注入IOptions<MyRabbitMQOptions>或者IOptionsMonitor<MyRabbitMQOptions>services.Configure<MyRabbitMQOptions>(optionSection);#endregion// 加了这行,才可以注入任意类型参数的 IMyPublisher<> 使用services.AddTransient(typeof(IMyPublisher<>), typeof(MyPublisher<>));//services.AddSingleton(typeof(EventHandlerService));// 创建一个工厂对象,并配置单例注入services.AddSingleton(new ConnectionFactory{UserName = myOptions.UserName,Password = myOptions.Password,HostName = myOptions.Host,Port = myOptions.Port});return services;}------------------------------这个扩展方法是 IApplicationBuilder 的----------------------------/// <summary>/// 给app拓展方法/// </summary>/// <remarks>/// 在IoC容器里获取到所有继承自IMyEvetnHandler的实现类,并开启消费者/// </remarks>public static IApplicationBuilder UseMyEventHandler(this IApplicationBuilder app){ var factory = app.ApplicationServices.GetService<ConnectionFactory>(); var ev = app.ApplicationServices.GetService<EventHandlerService>() ;var s = factory?.CreateConnection();Task.Run(() => { ev.Begin(factory?.CreateConnection(), app.ApplicationServices); }); }
在staup(.net6直接在Program里面调用这个扩展方法)
我的是net6的项目 直接在 program 里面 加上这两句 services.AddMyRabbitMQ(configuration);app.UseMyEventHandler();这两个方法大家都知道放在哪吧,我就不细说了。
生产者使用
首先 我们在上面已经全局注入且是单例,在哪个业务层使用,直接构造函数注入一下对象IMyPublisher ,然后使用 PublishAsync方法进行发布消息,交换机和队列都是在配置环境中使用的
总结
这就是整个过程,在其他的博客上面我还没找到类似的比较详细的使用方法
KSO - .net6项目中使用RabbitMQ实际项目代码和思路讲解,包括各种踩坑相关推荐
- (RabbitMQ 二)Springboot项目中使用RabbitMQ的相关依赖
(RabbitMQ 二)Springboot项目中使用RabbitMQ的相关依赖 RabbitMQ系列文章如下: (RabbitMQ 一[转载])windows10环境下的RabbitMQ安装步骤 h ...
- 最近实际项目中遇到的技术问题与解决思路
最近实际项目中遇到的技术问题与解决思路 参考文章: (1)最近实际项目中遇到的技术问题与解决思路 (2)https://www.cnblogs.com/lunlunshiwo/p/9222456.ht ...
- eclipse项目中关于导入的项目里提示HttpServletRequest 不能引用的解决办法
eclipse项目中关于导入的项目里提示HttpServletRequest 不能引用的解决办法 当使用eclipse导入外部的web工程时,有时会提示HttpServletRequest, Ser ...
- git项目中的子git项目_使用子模块和子树管理Git项目
git项目中的子git项目 如果您从事开源开发,则可能与Git一起管理源代码. 您可能遇到过具有大量依赖项和/或子项目的项目. 您如何管理它们? 对于开源组织,为社区和产品实现单源文档和依赖性管理可能 ...
- 设计银行项目中的注册银行用户基本信息的类,包括账户卡号、姓名、身份证号、联系电话、家庭住址。
什么是类和对象? 类是模子,确定对象会拥有的特征(属性)和行为(方法),类的特点:类是对象的类型,具有相同属性和方法的一组对象的集合.对象是类的实例,什么是对象的属性?属性是对象拥有的各种特征:每个对 ...
- antvg2 环图轮播_在vue项目中引用Antv G2,以饼图为例讲解
我就废话不多说了,大家还是直接看代码吧~ npm install @antv/g2 --save template内容: js部分: //引入G2组件 import G2 from "@a ...
- vue项目中将视频链接分享至推特的解决方法及踩坑记录
vue项目中将视频链接分享至推特的解决方法及踩坑记录 将动态改变的视频链接分享至推特,并希望能直接在推特上播放视频的需求实现方法及踩坑记录 如果只要将文本或链接分享到推特,不需要推特识别图片/视频等媒 ...
- springboot项目中的注解 启动项目的方式 解决spring的bean.xml配置不生效 spring的基础JDBC配置
依赖 创建一个 Spring Boot 工程时,可以继承自一个 spring-boot-starter-parent ,也可以不继承 先来看 parent 的基本功能有哪些? 定义了 Java 编译版 ...
- 如何移除项目中无用的 console.log 代码
大家好,我是若川.早些天时,我看到一个后端公众号发<辞退了一个前端>,当时还想着现在后端公众号都开始吊打前端了嘛.其中有个理由就是线上还一堆console.log...我猜很多人都会移除项 ...
最新文章
- Windows Phone 7 开“.NET研究”发之:工具栏
- 也谈大公司病1——正确是最大的错误
- 【代码块】代码块使用注意事项和细节讨论
- 无代码绘制基因表达箱线图
- notepad++以16进制查看文件
- 使用iBatis数据映射框架吧
- 为什么牛顿法下降的速度比梯度下降的快
- 机器人领域会议期刊特点
- 忆芯科技发布新一代国产主控芯片STAR1000P!4月完成量产版本
- Python 元组拆包
- Vue音乐项目笔记(三)
- T-Sql - 数据分租求最大指定字段最大的记录
- 实对称矩阵特征值按大小排序
- stm32毕业设计 单片机智能温控风扇
- 1553B为什么要加耦合器?
- python拆分参数列表_Python序列拆分操作符与映射拆分操作符实例
- Python-生成gif图片验证码
- deepin更新失败_deepin V20 20200826升级失败
- QGIS基于多期哨兵2影像遥感指数阈值法提取冬小麦分布(1)-数据预处理
- 字节跳动测试岗位面试题