MassTransit 是一个自由、开源、轻量级的消息总线基于.Net框架, 用于创建分布式应用程序。方便搭建基于消息的松耦合异步通信的应用程序和服务。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。

官网地址:http://masstransit-project.com/

发布订阅模式

这种场景十分常见,发送一个消息(或事件)到消息队列中,有一个或是多个订阅方对预期的消息接收处理。

基于需要搭建了两个WebApi程序,用于模拟发送方和订阅方,其中的RabbitMQ已预先搭建好了,只在程序中引用包配置下即可。

<PackageReference Include="MassTransit" Version="7.2.0" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />

发布端配置

在Startup中增加MassTransit需要的服务及初始化配置。

  • 对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。

  • 对发送方需要发送的消息初始化一个请求客户端,配置请求信息及推送到MQ的地址。

services.AddMassTransit(x =>
{x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});});x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered")));
});
services.AddMassTransitHostedService();

为了快速了解,使用Controller在Action中发起对MQ的消息推送

[ApiController]
[Route("[controller]")]
public class ValueController : ControllerBase
{readonly IPublishEndpoint _publishEndpoint;public ValueController(IPublishEndpoint publishEndpoint){_publishEndpoint = publishEndpoint;}[HttpPost]public async Task<ActionResult> Post(string value){await _publishEndpoint.Publish<ValueEntered>(new{Value = value});return Ok();}
}

订阅端配置

订阅端也创建一个WebApi应用,在Startup中增加MassTransit的服务,使用到的Nuget包和发布端一样。

  • 对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。

  • 为订阅端增加一个订阅处理的Handler,即如下的ValueEnteredEventConsumer

  • 增加一个接受点,指定队列名称,即发送端发送的队列名称,设置该队列消费处理的Consumer,即ValueEnteredEventConsumer

services.AddMassTransit(x =>
{x.AddConsumer<ValueEnteredEventConsumer>();x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});cfg.ReceiveEndpoint("events-valueentered", e =>{e.ConfigureConsumer<ValueEnteredEventConsumer>(context);});});
});
services.AddMassTransitHostedService();

如此一来,通过Postman发送一个请求,经发布端发布一个消息到RabbitMQ,订阅端侦听消息,处理消息,一切都很熟悉。

请求响应模式

在发布订阅的基础上,改变以往的习惯,当发布一个消息后,等待订阅方的处理,并将消息推送回RabbitMQ,发送方接受到处理后的消息继续执行。

请求端

在Startup中新加上一个用于发送消息(CheckOrderStatus)的请求客户端及指定消息队列名称(为每一个消息创建一个单独的队列)。

x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));

增加一个Controller及Action,来请求及获取处理结果(OrderStatusResult)。

[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{private readonly IRequestClient<CheckOrderStatus> _client;public OrderController(IRequestClient<CheckOrderStatus> client){_client = client;}public async Task<OrderStatusResult> Get(string id){var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });return response.Message;}
}

响应端

同样在响应端Startup中对新的消息设置下消息侦听队列以及相应的Handler如下的ValueEnteredEventConsumer去消费消息并返回处理结果。

x.AddConsumer<CheckOrderStatusConsumer >();
x.UsingRabbitMq((context, cfg) =>
{// ...cfg.ReceiveEndpoint("events-checkorderstatus", e =>{e.ConfigureConsumer<CheckOrderStatusConsumer >(context);});
});

Consumer中获取请求参数,执行请求,返回执行结果。

public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{public async Task Consume(ConsumeContext<CheckOrderStatus> context){if (context.Message.OrderId == "9527"){throw new InvalidOperationException("Order not found");}Console.WriteLine($"OrderId:{context.Message.OrderId}");await context.RespondAsync<OrderStatusResult>(new{OrderId = context.Message.OrderId,Timestamp = Guid.NewGuid().ToString(),StatusCode = "1",StatusText = "Close"});}
}

这样一来,当请求端发起一个消息(事件)到RabbitMQ,响应端侦听并处理完毕返回处理结果到RabbitMQ,请求端依照响应结果继续执行后续请求。

HTTP方式差异

与以往的Http请求方式有所不同,通过httpClient.PostAsync发送请求,接收端处理并返回结果,而走requestClient发送请求到RabbitMQ,再由RabbitMQ推送到侦听节点消费并返回结果,如下第一二部分结构。

MassTransit中RequestResponse基本使用相关推荐

  1. .NET Core微服务之基于MassTransit实现数据最终一致性(Part 1)

    Tip: 此篇已加入.NET Core微服务基础系列文章索引 一.预备知识:数据一致性 关于数据一致性的文章,园子里已经有很多了,如果你还不了解,那么可以通过以下的几篇文章去快速地了解了解,有个感性认 ...

  2. 基于Abp VNext框架设计 - Masstransit分布式消息

    abp 通过IDistributedEventBus接口集成自IEventBus实现分布式事件消息的发布订阅. IEventBus在什么时机触发PublishAsync? 当前UnitOfWork完成 ...

  3. GNU Make 使用手册(于凤昌中译版)

    GNU Make 使用手册(中译版) 翻译:于凤昌 GNU make Version 3.79 April 2000 Richard M. Stallman and Roland McGrath 1 ...

  4. 如何优雅的使用RabbitMQ

    RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具.消息队列的使用场景大概有3种: 1.系统集成,分布式系统的设 ...

  5. 如何优雅的使用RabbitMQ?

    RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具.消息队列的使用场景 大概有3种: 1.系统集成,分布式系统的 ...

  6. AspNetCoreMassTransit Courier实现分布式事务

    在之前的一篇博文中,CAP框架可以方便我们实现非实时.异步场景下的最终一致性,而有些用例总是无法避免的需要在实时.同步场景下进行,可以借助Saga事务来解决这一困扰.在一些博文和仓库中也搜寻到了.Ne ...

  7. MassTransitamp;amp;Sagas分布式服务开发ppt分享

    saga,与分布式相关,最早被定义在Hector Garcia-Molina和Kenneth Salem的论文"Sagas"中.这篇论文提出了一个saga机制来作为分布式事务的替代 ...

  8. 面试:第十二章:所有总结

    Java基础 java基本类型哪些,所占字节 byte :1个字节 short :2个字节 char :2个字节 int :4个字节 long :8个字节 float :4个字节 double :8个 ...

  9. linux内核分析(转自某位大哥网上的笔记)

    启动 当PC启动时,Intel系列的CPU首先进入的是实模式,并开始执行位于地址0xFFFF0处的代码,也就是ROM-BIOS起始位置的代码.BIOS先进行一系列的系统自检,然后初始化位于地址0的中断 ...

最新文章

  1. “积水上报”广获好评 畅移信息接棒 “互联网+政务”落地
  2. R语言:绘制知识图谱
  3. 华为服务器MLC硬盘ID号,RH2288H RH5885H V3 3.5寸 SAS SATA华为服务器硬盘架子 支架
  4. arcgis select by attributes一次选多个_地理工具学习--arcgis篇:单工具学习(2)
  5. 【cocos2d-js官方文档】二十五、Cocos2d-JS v3.0中的单例对象
  6. 可以储存照片的字段类型是_sql server 中 哪个字段类型可以储存图象?
  7. android 编译 oserror,编译android kernel时,关闭error, forbidden warning
  8. python之类的封装、多态、继承
  9. linux下无权限安装anaconda和tensorflow-gpu
  10. 【转】PCDATA和CDATA的区别究竟是什么呢?
  11. js获取html标签中的数据
  12. Activiti7入门,Activiti7 数据库表结构详细解析
  13. Python安装word2vec环境依赖
  14. java模拟京东登陆_模拟登陆京东并访问我的订单
  15. 三星集团继承人李在镕将接受韩国检方质询
  16. 一文读懂SpringBoot定时任务
  17. Introducing the Universal CRT
  18. web网站访问计数器
  19. 基于Android平台的汽车租赁系统:项目测试心得
  20. 卸载 UniAccessAgent 软件

热门文章

  1. 安卓相机 高帧率_Android MediaCodec和摄像头:如何实现更高的帧速率从相机获取帧原始数据?...
  2. 第三课 Makefile文件的制作(上)
  3. WordCount--统计输入文件的字符数、行数、单词数(java)--初级功能
  4. jQuery遍历not的用法
  5. eclipse启动tomcat无法访问
  6. BZOJ1001 狼抓兔子 终于过了!
  7. 动态的管理ASP.NET DataGrid数据列【转载】
  8. 计算机网络硬件的作用是什么,网络技术在计算机软硬件的作用
  9. 南京铁道学院计算机应用,南京铁道职业技术学院交通运营管理专业怎么样
  10. APP测试流程和测试点