ASP.NET Core 2.0利用MassTransit集成RabbitMQ
在ASP.NET Core上利用MassTransit来集成使用RabbitMQ真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用RabbitMQ的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。
MassTransit
先看看MassTransit是个什么宝贝(MassTransit官网的简介):
MassTransit是一个免费的开源轻量级消息总线,用于使用.NET框架创建分布式应用程序。MassTransit在现有的顶级消息传输上提供了一系列广泛的功能,从而以开发人员友好的方式使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式。
通俗描述:
MassTransit就是一套基于消息服务的高级封装类库,下游可联接RabbitMQ、Redis、MongoDb等服务。
github官网:https://github.com/MassTransit/MassTransit
RabbitMQ
RabbitMQ是成熟的MQ队列服务,是由 Erlang 语言开发的 AMQP 的开源实现。关于介绍RabbitMQ的中文资料也很多,有需要可以自行查找。我这里贴出其官网与下载安装的链接,如下:
官网:http://www.rabbitmq.com
下载与安装:http://www.rabbitmq.com/download.html
实现代码
通过上面的介绍,咱们已对MassTransit与RabbitMQ有了初步了解,那么现在来看看如何在ASP.NET Core上优雅的使用RabbitMQ吧。
1、创建一个名为“RabbitMQHelp.cs”公共类,用于封装操作RabbitMQ的公共方法,并通过Nuget来管理并引用“MassTransit”与“MassTransit.RabbitMQ”类库。
2、“RabbitMQHelp.cs”公共类主要对外封装两个静态方法,其代码如下:
using MassTransit;
using MassTransit.RabbitMqTransport;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Lezhima.Comm
{
/// <summary>
/// RabbitMQ公共操作类,基于MassTransit库
/// </summary>
public class RabbitMQHelp
{
#region 交换器
/// <summary>
/// 操作日志交换器
/// 同时需在RabbitMQ的管理后台创建同名交换器
/// </summary>
public static readonly string actionLogExchange = "Lezhima.ActionLogExchange";
#endregion
#region 声明变量
/// <summary>
/// MQ联接地址,建议放到配置文件
/// </summary>
private static readonly string mqUrl = "rabbitmq://192.168.6.181/";
/// <summary>
/// MQ联接账号,建议放到配置文件
/// </summary>
private static readonly string mqUser = "admin";
/// <summary>
/// MQ联接密码,建议放到配置文件
/// </summary>
private static readonly string mqPwd = "admin";
#endregion
/// <summary>
/// 创建连接对象
/// 不对外公开
/// </summary>
private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null)
{
//通过MassTransit创建MQ联接工厂
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(mqUrl), hst =>
{
hst.Username(mqUser);
hst.Password(mqPwd);
});
registrationAction?.Invoke(cfg, host);
});
}
/// <summary>
/// MQ生产者
/// 这里使用fanout的交换类型
/// </summary>
/// <param name="obj"></param>
public async static Task PushMessage(string exchange, object obj)
{
var bus = CreateBus();
var sendToUri = new Uri($"{mqUrl}{exchange}");
var endPoint = await bus.GetSendEndpoint(sendToUri);
await endPoint.Send(obj);
}
/// <summary>
/// MQ消费者
/// 这里使用fanout的交换类型
/// consumer必需是实现IConsumer接口的类实例
/// </summary>
/// <param name="obj"></param>
public static void ReceiveMessage(string exchange, object consumer)
{
var bus = CreateBus((cfg, host) =>
{
//从指定的消息队列获取消息 通过consumer来实现消息接收
cfg.ReceiveEndpoint(host, exchange, e =>
{
e.Instance(consumer);
});
});
bus.Start();
}
}
}
3、“RabbitMQHelp.cs”公共类已经有了MQ“生产者”与“消费者”两个对外的静态公共方法,其中“生产者”方法可以在业务代码中直接调用,可传递JSON、对象等类型的参数向指定的交换器发送数据。而“消费者”方法是从指定交换器中进行接收绑定,但接收到的数据处理功能则交给了“consumer”类(因为在实际项目中,不同的数据有不同的业务处理逻辑,所以这里我们直接就通过IConsumer接口交给具体的实现类去做了)。那么,下面我们再来看看消费者里传递进来的“consumer”类的代码吧:
using MassTransit;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Lezhima.Storage.Consumer
{
/// <summary>
/// 从MQ接收并处理数据
/// 实现MassTransit的IConsumer接口
/// </summary>
public class LogConsumer : IConsumer<ActionLog>
{
/// <summary>
/// 重写Consume方法
/// 接收并处理数据
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public Task Consume(ConsumeContext<ActionLog> context)
{
return Task.Run(async () =>
{
//获取接收到的对象
var amsg = context.Message;
Console.WriteLine($"Recevied By Consumer:{amsg}");
Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}");
});
}
}
}
调用代码
1、生产者调用代码如下:
/// <summary>
/// 测试MQ生产者
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<MobiResult> AddMessageTest()
{
//声明一个实体对象
var model = new ActionLog();
model.ActionLogId = Guid.NewGuid();
model.CreateTime = DateTime.Now;
model.UpdateTime = DateTime.Now;
//调用MQ
await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model);
return new MobiResult(1000, "操作成功");
}
2、消费者调用代码如下:
using Lezhima.Storage.Consumer;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;
namespace Lezhima.Storage
{
class Program
{
static void Main(string[] args)
{
var conf = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", true, true)
.Build();
//调用接收者
RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange,
new LogConsumer()
);
Console.ReadLine();
}
}
}
总结
1、基于MassTransit库使得我们使用RabbitMQ变得更简洁、方便。而基于再次封装后,生产者与消费者将不需要关注具体的业务,也跟业务代码解耦了,更能适应项目的需要。
2、RabbitMQ的交换器需在其管理后台自行创建,而这里使用的fanout类型是因为其发送速度最快,且能满足我的项目需要,各位可视自身情况选用不同的类型。fanout类型不会存储消息,必需要消费者绑定交换器后才会发送给消费者。
原文链接:https://www.cnblogs.com/Andre/p/9579764.html
.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com
ASP.NET Core 2.0利用MassTransit集成RabbitMQ相关推荐
- ASP.NET Core 5.0 Web API 自动集成Swashbuckle
ASP.NET Core 5.0 Web API与开放源代码项目 Swashbuckle.AspNetCore 的维护人员合作,ASP.NET Core API 模板包含对 Swashbuckle 的 ...
- Amazing ASP.NET Core 2.0
前言 ASP.NET Core 的变化和发展速度是飞快的,当你发现你还没有掌握 ASP.NET Core 1.0 的时候, 2.0 已经快要发布了,目前 2.0 处于 Preview 1 版本,意味着 ...
- diskgeniusv4.4.0_.NET Core 3.0及ASP.NET Core 3.0前瞻
(给DotNet加星标,提升.Net技能) 转自:LineZerocnblogs.com/linezero/p/netcore3 前几天微软发布了< .NET Core 3.0 Preview ...
- Centos7 Docker Jenkins ASP.NET Core 2.0 自动化发布和部署
写在前面 Docker一直很火热,一直想把原本的Jenkins自动部署工具搬到Docker上面,无奈今年一直忙于各种事情,迟迟未实施这个事情,正好迎来了dotnet core 2.0 的正式发布,升级 ...
- 从头编写 asp.net core 2.0 web api 基础框架 (4) EF配置
第一部分: https://www.cnblogs.com/frank0812/p/11165940.html 第二部分:https://www.cnblogs.com/frank0812/p/111 ...
- 避免在 ASP.NET Core 3.0 中为启动类注入服务
本篇是如何升级到ASP.NET Core 3.0系列文章的第二篇. Part 1 - 将.NET Standard 2.0 类库转换为.NET Core 3.0 类库 Part 2 - IHostin ...
- [翻译] ASP.NET Core 3.0 的新增功能
全文翻译自微软官方文档英文版 What's new in ASP.NET Core 3.0 本文重点介绍了 ASP.NET Core 3.0 中最重要的更改,并提供相关文档的连接. Blazor Bl ...
- .NET Core 3.0及ASP.NET Core 3.0 前瞻
前几天微软发布了 .NET Core 3.0 Preview 9 ,这是.NET Core 3.0 最后一个预览版. [翻译] .NET Core 3.0 Preview 9 发布 .NET Core ...
- ASP.NET Core 3.0 上的gRPC服务模板初体验(多图)
早就听说ASP.NET Core 3.0中引入了gRPC的服务模板,正好趁着家里电脑刚做了新系统,然后装了VS2019的功夫来体验一把.同时记录体验的过程.如果你也想按照本文的步骤体验的话,那你得先安 ...
最新文章
- php查询一对多,PHP并输出一对多结果
- Hyper-v副本容量规划器
- puts遇到空格无法输出_ACM输出超限|puts与printf
- Python 列表 min() 方法
- 自定义列_如何对Pandas DataFrame进行自定义排序
- 数据挖掘中的KNN算法实现论文
- windows系统安装wget指令
- 勒索病毒防护形式_病毒的完整形式是什么?
- 复盘 2019 ,展望 2020
- Win10怎样关闭自动维护
- spring集成kafka运行时报错:Failed to construct kafka producer] with root cause
- NAS私有云存储 - 搭建Nextcloud私有云盘并公网远程访问
- 使用云效 修改 layui 环境变量
- 限流算法-常见的4种限流算法
- 2021-2025年中国智能眼镜行业市场供需与战略研究报告
- TSDB学习 ---- Facebook TSDB 论文翻译
- 2018最新破解pycharm安装过程(含注册码)
- ios12怎么滑屏解锁_iOS13屏蔽更新描述文件失效了怎么办?iOS13屏蔽系统更新教程...
- [JavaWeb]_[初级]_[对Html特殊符号进行转义防止XSS攻击和反转义]
- Linux Mint 19.1 配置开发环境记录【含:输入法安装、字体模糊解决等问题】