MassTransit中RequestResponse基本使用
MassTransit 是一个自由、开源、轻量级的消息总线基于.Net框架, 用于创建分布式应用程序。方便搭建基于消息的松耦合异步通信的应用程序和服务。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。
官网地址:http://masstransit-project.com/
发布订阅模式
这种场景十分常见,发送一个消息(或事件)到消息队列中,有一个或是多个订阅方对预期的消息接收处理。
基于需要搭建了两个WebApi程序,用于模拟发送方和订阅方,其中的RabbitMQ已预先搭建好了,只在程序中引用包配置下即可。
<PackageReference Include="MassTransit" Version="7.2.0" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />
发布端配置
在Startup中增加MassTransit需要的服务及初始化配置。
对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。
对发送方需要发送的消息初始化一个请求客户端,配置请求信息及推送到MQ的地址。
services.AddMassTransit(x =>
{x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});});x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered")));
});
services.AddMassTransitHostedService();
为了快速了解,使用Controller在Action中发起对MQ的消息推送
[ApiController]
[Route("[controller]")]
public class ValueController : ControllerBase
{readonly IPublishEndpoint _publishEndpoint;public ValueController(IPublishEndpoint publishEndpoint){_publishEndpoint = publishEndpoint;}[HttpPost]public async Task<ActionResult> Post(string value){await _publishEndpoint.Publish<ValueEntered>(new{Value = value});return Ok();}
}
订阅端配置
订阅端也创建一个WebApi应用,在Startup中增加MassTransit的服务,使用到的Nuget包和发布端一样。
对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。
为订阅端增加一个订阅处理的Handler,即如下的ValueEnteredEventConsumer
增加一个接受点,指定队列名称,即发送端发送的队列名称,设置该队列消费处理的Consumer,即ValueEnteredEventConsumer
services.AddMassTransit(x =>
{x.AddConsumer<ValueEnteredEventConsumer>();x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});cfg.ReceiveEndpoint("events-valueentered", e =>{e.ConfigureConsumer<ValueEnteredEventConsumer>(context);});});
});
services.AddMassTransitHostedService();
如此一来,通过Postman发送一个请求,经发布端发布一个消息到RabbitMQ,订阅端侦听消息,处理消息,一切都很熟悉。
请求响应模式
在发布订阅的基础上,改变以往的习惯,当发布一个消息后,等待订阅方的处理,并将消息推送回RabbitMQ,发送方接受到处理后的消息继续执行。
请求端
在Startup中新加上一个用于发送消息(CheckOrderStatus)的请求客户端及指定消息队列名称(为每一个消息创建一个单独的队列)。
x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));
增加一个Controller及Action,来请求及获取处理结果(OrderStatusResult)。
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{private readonly IRequestClient<CheckOrderStatus> _client;public OrderController(IRequestClient<CheckOrderStatus> client){_client = client;}public async Task<OrderStatusResult> Get(string id){var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });return response.Message;}
}
响应端
同样在响应端Startup中对新的消息设置下消息侦听队列以及相应的Handler如下的ValueEnteredEventConsumer去消费消息并返回处理结果。
x.AddConsumer<CheckOrderStatusConsumer >();
x.UsingRabbitMq((context, cfg) =>
{// ...cfg.ReceiveEndpoint("events-checkorderstatus", e =>{e.ConfigureConsumer<CheckOrderStatusConsumer >(context);});
});
Consumer中获取请求参数,执行请求,返回执行结果。
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{public async Task Consume(ConsumeContext<CheckOrderStatus> context){if (context.Message.OrderId == "9527"){throw new InvalidOperationException("Order not found");}Console.WriteLine($"OrderId:{context.Message.OrderId}");await context.RespondAsync<OrderStatusResult>(new{OrderId = context.Message.OrderId,Timestamp = Guid.NewGuid().ToString(),StatusCode = "1",StatusText = "Close"});}
}
这样一来,当请求端发起一个消息(事件)到RabbitMQ,响应端侦听并处理完毕返回处理结果到RabbitMQ,请求端依照响应结果继续执行后续请求。
HTTP方式差异
与以往的Http请求方式有所不同,通过httpClient.PostAsync发送请求,接收端处理并返回结果,而走requestClient发送请求到RabbitMQ,再由RabbitMQ推送到侦听节点消费并返回结果,如下第一二部分结构。
MassTransit中RequestResponse基本使用相关推荐
- .NET Core微服务之基于MassTransit实现数据最终一致性(Part 1)
Tip: 此篇已加入.NET Core微服务基础系列文章索引 一.预备知识:数据一致性 关于数据一致性的文章,园子里已经有很多了,如果你还不了解,那么可以通过以下的几篇文章去快速地了解了解,有个感性认 ...
- 基于Abp VNext框架设计 - Masstransit分布式消息
abp 通过IDistributedEventBus接口集成自IEventBus实现分布式事件消息的发布订阅. IEventBus在什么时机触发PublishAsync? 当前UnitOfWork完成 ...
- GNU Make 使用手册(于凤昌中译版)
GNU Make 使用手册(中译版) 翻译:于凤昌 GNU make Version 3.79 April 2000 Richard M. Stallman and Roland McGrath 1 ...
- 如何优雅的使用RabbitMQ
RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具.消息队列的使用场景大概有3种: 1.系统集成,分布式系统的设 ...
- 如何优雅的使用RabbitMQ?
RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具.消息队列的使用场景 大概有3种: 1.系统集成,分布式系统的 ...
- AspNetCoreMassTransit Courier实现分布式事务
在之前的一篇博文中,CAP框架可以方便我们实现非实时.异步场景下的最终一致性,而有些用例总是无法避免的需要在实时.同步场景下进行,可以借助Saga事务来解决这一困扰.在一些博文和仓库中也搜寻到了.Ne ...
- MassTransitamp;amp;Sagas分布式服务开发ppt分享
saga,与分布式相关,最早被定义在Hector Garcia-Molina和Kenneth Salem的论文"Sagas"中.这篇论文提出了一个saga机制来作为分布式事务的替代 ...
- 面试:第十二章:所有总结
Java基础 java基本类型哪些,所占字节 byte :1个字节 short :2个字节 char :2个字节 int :4个字节 long :8个字节 float :4个字节 double :8个 ...
- linux内核分析(转自某位大哥网上的笔记)
启动 当PC启动时,Intel系列的CPU首先进入的是实模式,并开始执行位于地址0xFFFF0处的代码,也就是ROM-BIOS起始位置的代码.BIOS先进行一系列的系统自检,然后初始化位于地址0的中断 ...
最新文章
- “积水上报”广获好评 畅移信息接棒 “互联网+政务”落地
- R语言:绘制知识图谱
- 华为服务器MLC硬盘ID号,RH2288H RH5885H V3 3.5寸 SAS SATA华为服务器硬盘架子 支架
- arcgis select by attributes一次选多个_地理工具学习--arcgis篇:单工具学习(2)
- 【cocos2d-js官方文档】二十五、Cocos2d-JS v3.0中的单例对象
- 可以储存照片的字段类型是_sql server 中 哪个字段类型可以储存图象?
- android 编译 oserror,编译android kernel时,关闭error, forbidden warning
- python之类的封装、多态、继承
- linux下无权限安装anaconda和tensorflow-gpu
- 【转】PCDATA和CDATA的区别究竟是什么呢?
- js获取html标签中的数据
- Activiti7入门,Activiti7 数据库表结构详细解析
- Python安装word2vec环境依赖
- java模拟京东登陆_模拟登陆京东并访问我的订单
- 三星集团继承人李在镕将接受韩国检方质询
- 一文读懂SpringBoot定时任务
- Introducing the Universal CRT
- web网站访问计数器
- 基于Android平台的汽车租赁系统:项目测试心得
- 卸载 UniAccessAgent 软件
热门文章
- 安卓相机 高帧率_Android MediaCodec和摄像头:如何实现更高的帧速率从相机获取帧原始数据?...
- 第三课 Makefile文件的制作(上)
- WordCount--统计输入文件的字符数、行数、单词数(java)--初级功能
- jQuery遍历not的用法
- eclipse启动tomcat无法访问
- BZOJ1001 狼抓兔子 终于过了!
- 动态的管理ASP.NET DataGrid数据列【转载】
- 计算机网络硬件的作用是什么,网络技术在计算机软硬件的作用
- 南京铁道学院计算机应用,南京铁道职业技术学院交通运营管理专业怎么样
- APP测试流程和测试点