如何在ASP.NET Core中使用Azure Service Bus Queue
原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
作者:damienbod[1]
译文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:https://www.cnblogs.com/lwqlun/p/10760227.html
作者:Lamond Lu
源代码:https://github.com/lamondlu/AzureServiceBusMessaging
本文展示了如何使用Azure Service Bus Queue, 实现2个ASP.NET Core Api应用之间的消息传输。
配置Azure Service Bus Queue
你可以从官网文档中了解到如何配置一个Azure Service Bus Queue.
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal
这里我们使用Queue或者Topic来实现消息传输。Queue是一种消息传输类型,一旦一个消息被一个消费者接收了,该消息就会从Queue中被移除。
与Queue不同,Topic提供的是一对多的通讯方式。
架构图
整个应用的实现如下:
•Api 1负责发送消息•Api 2负责监听Azure Service Bus,并处理接收到的消息
实现一个Service Bus Queue
这里我们首先需要引入Microsoft.Azure.ServiceBus[2] 程序集。Microsoft.Azure.ServiceBus[3]是Azure Service Bus的客户端库。针对Service Bus的连接字符串我们保存在项目的User Secret中。当部署项目的时候,我们可以使用Azure Key Valut来设置这个Secret值。
在Visual Studio中,右键点击API1, API2项目属性,选择Manage User Secrets就可以管理当前项目使用的所有私密信息。
为了发送向Azure Service Bus Queue发送消息,我们需要创建一个SendMessage
方法,并接收一个消息参数。这里我们创建了一个我们自己的消息内容类型MyPayload
, 将当前该MyPayload
对象序列化成Json字符串, 添加到一个Message
对象中。
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using System.Text;
using System.Threading.Tasks;
namespace ServiceBusMessaging
{
public class ServiceBusSender
{
private readonly QueueClient _queueClient;
private readonly IConfiguration _configuration;
private const string QUEUE_NAME = "simplequeue";
public ServiceBusSender(IConfiguration configuration)
{
_configuration = configuration;
_queueClient = new QueueClient(
_configuration
.GetConnectionString("ServiceBusConnectionString"),
QUEUE_NAME);
}
public async Task SendMessage(MyPayload payload)
{
string data = JsonConvert.SerializeObject(payload);
Message message = new Message(Encoding.UTF8.GetBytes(data));
await _queueClient.SendAsync(message);
}
}
}
在API 1和API 2中,我们需要将ServiceBusSender
注册到应用程序的IOC容器中。这里为了测试方便,我们同时注册Swagger
服务。
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
services.AddScoped<ServiceBusSender>();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new Info
{
Version = "v1",
Title = "Payload View API",
});
});
}
接下来,我们就可以在控制器中通过构造函数注入的方式使用这个服务了。
在API1中,我们创建一个POST方法,这个方法会将API接收到Payload
对象发送到Azure Service Bus Queue中。
[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task<IActionResult> Create([FromBody][Required]Payload request)
{
if (data.Any(d => d.Id == request.Id))
{
return Conflict($"data with id {request.Id} already exists");
}
data.Add(request);
// Send this to the bus for the other services
await _serviceBusSender.SendMessage(new MyPayload
{
Goals = request.Goals,
Name = request.Name,
Delete = false
});
return Ok(request);
}
从Queue中获取消息
为了监听Azure Service Bus Queue, 并处理接收到的消息,我们创建了一个新类ServiceBusConsumer
,ServiceBusConsumer
实现了IServiceBusConsumer
接口。
Queue的连接字符串是使用IConfiguration
读取的。RegisterOnMessageHandlerAndReceiveMessages
方法负责注册消息处理程序ProcessMessagesAsync
处理消息。ProcessMessagesAsync
方法会将得到的消息转换成对象,并调用IProcessData
接口完成最终的消息处理。
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ServiceBusMessaging
{
public interface IServiceBusConsumer
{
void RegisterOnMessageHandlerAndReceiveMessages();
Task CloseQueueAsync();
}
public class ServiceBusConsumer : IServiceBusConsumer
{
private readonly IProcessData _processData;
private readonly IConfiguration _configuration;
private readonly QueueClient _queueClient;
private const string QUEUE_NAME = "simplequeue";
private readonly ILogger _logger;
public ServiceBusConsumer(IProcessData processData,
IConfiguration configuration,
ILogger<ServiceBusConsumer> logger)
{
_processData = processData;
_configuration = configuration;
_logger = logger;
_queueClient = new QueueClient(
_configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);
}
public void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
_queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
private async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));
_processData.Process(myPayload);
await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
_logger.LogDebug($"- Endpoint: {context.Endpoint}");
_logger.LogDebug($"- Entity Path: {context.EntityPath}");
_logger.LogDebug($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
public async Task CloseQueueAsync()
{
await _queueClient.CloseAsync();
}
}
}
其中IProcessData
接口存在于类库项目ServiceBusMessaging
中,它是用来处理消息的。
public interface IProcessData
{
void Process(MyPayload myPayload);
}
在Api 2中,我们创建一个ProcessData
类,它实现了IProcessData
接口。
public class ProcessData : IProcessData
{
public void Process(MyPayload myPayload)
{
DataServiceSimi.Data.Add(new Payload
{
Name = myPayload.Name,
Goals = myPayload.Goals
});
}
}
这里为了简单测试,我们创建了一个静态类
DataServiceSimi
,其中存放了API2中所有保存Payload
对象。同时,我们还创建了一个新的控制器ViewPayloadMessagesController
,在其中添加了一个GET Action,并返回了静态类DataServiceSimi
中的所有数据。
[Route("api/[controller]")]
[ApiController]
public class ViewPayloadMessagesController : ControllerBase
{
[HttpGet]
[ProducesResponseType(StatusCodes.Status200OK)]
public ActionResult<List<Payload>> Get()
{
return Ok(DataServiceSimi.Data);
}
}
最后我们还需要将ProcessData
注册到API2的IOC容器中。
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
services.AddTransient<IProcessData, ProcessData>();
}
最终效果
现在我们分别启用2个Api项目,并在Api 1的Swagger文档界面,调用POST请求,添加一个Payload
操作完成之后,我们访问Api 2的/api/ViewPayloadMessages, 获得结果如下,Api 1发出的消息出现在了Api 2的结果集中,这说明Api 2从Azure Service Bus Queue中获取了消息,并保存在了自己的静态类DataServiceSimi
中。
References
[1]
damienbod: https://damienbod.com/author/damienbod/
[2]
Microsoft.Azure.ServiceBus: https://www.nuget.org/packages/Microsoft.Azure.ServiceBus
[3]
Microsoft.Azure.ServiceBus: https://www.nuget.org/packages/Microsoft.Azure.ServiceBus
.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com
如何在ASP.NET Core中使用Azure Service Bus Queue相关推荐
- 如何在ASP.NET Core中自定义Azure Storage File Provider
主题:如何在ASP.NET Core中自定义Azure Storage File Provider 作者: Lamond Lu 地址: https://www.cnblogs.com/lwqlun/ ...
- asp绑定gridview属性_如何在ASP.NET Core中自定义Azure Storage File Provider
主题:如何在ASP.NET Core中自定义Azure Storage File Provider 作者: Lamond Lu 地址: https://www.cnblogs.com/lwqlun/ ...
- 如何在 ASP.Net Core 中对接 WCF
在 REST API 出现之前,SOAP (Simple Object Access Protocol) 一直都是基于 web 的标准协议,虽然现在 REST 大行其道,但在平时开发中总会遇到对接第三 ...
- 如何在 ASP.NET Core 中使用 HttpClientFactory ?
ASP.Net Core 是一个开源的,跨平台的,轻量级模块化框架,可用它来构建高性能的Web程序,这篇文章我们将会讨论如何在 ASP.Net Core 中使用 HttpClientFactory. ...
- 如何在 ASP.Net Core 中使用 Autofac
依赖注入可以有效的实现对象之间的 松耦合 并能够实现代码的可测试和可维护性,ASP.Net Core 提供了一个极简版的容器实现对 依赖注入 的原生支持,然而内置的依赖注入容器相比成熟的 依赖注入容器 ...
- 如何在 ASP.Net Core 中使用 Lamar
ASP.Net Core 自带了一个极简的 开箱即用 的依赖注入容器,实际上,你还可以使用第三方的 依赖注入容器 来替代它,依赖注入是一种设计模式,它能够有效的实现对象之间的解耦并有利于提高单元测试和 ...
- 如何在 ASP.Net Core 中使用 MediatR
MediatR 是一个 中介者模式 的.NET开源实现, 中介者模式 管控了一组对象之间的相互通讯并有效的减少了对象之间错综复杂的相互依赖,在 中介者模式 中,一个对象不需要直接和另一个对象进行通讯, ...
- 如何在 ASP.Net Core 中使用 NCache
虽然 ASP.Net Core 中缺少 Cache 对象,但它引入了三种不同的cache方式. 内存缓存 分布式缓存 Response缓存 Alachisoft 公司提供了一个开源项目 NCache, ...
- 如何在 ASP.Net Core 中使用 Configuration Provider
ASP.NET Core 是一个开源的,跨平台的,精简的模块化框架,可用于构建高性能,可扩展的web应用程序, ASP.NET Core 中的数据配置常用 k-v 的形式存储,值得注意的是,新的数据配 ...
最新文章
- Iptables防火墙配置详解
- 自定义配置webpack打包文件
- 萤石云 服务器错误 10017
- 深度学习解决多视图非线性数据特征融合问题
- 浅谈虚拟化技术下的云安全如何处置
- 键盘keydown值表
- bootstrap方法_中介效应中的bootstrap方法
- 仿射变换(Affine transformation)与python实践
- 2个线程共同处理冒泡排序 Linux 双线程处理
- mac sqlite可视化工具_Navicat for SQLite 12 for mac(强大数据库管理及开发工具)
- 数据管理与数据治理的区别
- android studio for android learning (二十一 )异步任务AsyncTask加载美女图片攻略及AsyncTask源码详解
- 导数公式、导数运算法则、复合函数求导、幂指函数求导
- java内存模型JMM理解整理
- ios开发之简单的TableView
- 02.虚拟功能介绍虚拟机网络配置xshell远程连接
- linux 配置java环境
- IDEA 设置4个空格代替tab
- 微信开放平台和公总平台关系图
- 毛星云opencv之8.4.4查找和绘制图像轮廓矩
热门文章
- 2018-2019-1 20165211 实验四 外设驱动程序设计
- [SDOI2009]Bill的挑战——全网唯一 一篇容斥题解
- 【MAC】Ncnn 编译so文件方案
- weblogic清除缓存
- Maven发布工程到私服
- gitlab永久设置密码
- Linux常用C函数-接口处理篇(网络通信函数)
- java.lang.NoClassDefFoundError: org.ksoap2.serialization.SoapObject
- 最好的FLV视频下载器 维棠 (支持优酷视频下载、土豆视频下载等)
- 使用Pitcher简化卫语句