如何优雅的使用RabbitMQ?
RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具。消息队列的使用场景
大概有3种:
1、系统集成,分布式系统的设计。各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即“通过消息传递的架构”。
2、当系统中的同步处理方式严重影响了吞吐量,比如日志记录。假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式去消费日志消息。
3、系统的高可用性,比如电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。如果能够将请求转发到消息队列,再由服务器去消费这些消息将会使得请求变得平稳,提高系统的可用性。
![](https://imgsa.baidu.com/exp/w=500/sign=05361613fadeb48ffb69a1dec01e3aef/241f95cad1c8a78631dbc67c6e09c93d71cf50cb.jpg)
一、开始使用RabbitMQ
RabbitMQ官网提供了详细的安装步骤,另外官网还提供了RabbitMQ在六种场景的使用教程。其中教程1、3、6将覆盖99%的使用场景,所以正常来说只需要搞清楚这3个教程即可快速上手。
二、简单分析
我们以官方提供的教程1做个简单梳理:该教程展示了Producer如何向一个消息队列(message queue)发送一个消息(message),消息消费者(Consumer)收到该消息后消费该消息。
1、producer端:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
while (Console.ReadLine() != null)
{
using (var channel = connection.CreateModel())
{
//创建一个名叫"hello"的消息队列
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
//向该消息队列发送消息message
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
该段代码非常简单,几乎到了无法精简的地步:创建了一个信道(channel)->创建一个队列->向该队列发送消息。
![](https://imgsa.baidu.com/exp/w=500/sign=15dd29ad7fc6a7efb926a826cdfbafe9/a71ea8d3fd1f413465c6890c2c1f95cad0c85ecb.jpg)
2、Consumer端
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//创建一个名为"hello"的队列,防止producer端没有创建该队列
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
//回调,当consumer收到消息后会执行该函数
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
//消费队列"hello"中的消息
channel.BasicConsume(queue: "hello",
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
该段代码可以理解为:创建信道->创建队列->定义回调函数->消费消息。
该实例描述了Send/Receive模式,可以简单理解为1(producer) VS 1(consumer)的场景;
实例3则描述了Publish/Subscriber模式,即1(producer) VS 多个(consumer);
在以上两个示例中,producer只需要发送消息即可,并不关心consumer的返回结果。实例6则描述了一个RPC调用场景,producer发送消息后还要接收consumer的返回结果,这一场景看起来跟使用消息队列的目的有点相悖。因为使用消息队列的目的之一就是要异步,但是这一场景似乎又将异步变成了同步,不过这一场景也很有用,比如一个用户操作产生了一个消息,应用服务收到该消息后执行了一些逻辑并使得数据库发生了变化,UI会一直等待应用服务的返回结果才刷新页面。
![](https://imgsa.baidu.com/exp/w=500/sign=7ff173c26c380cd7e61ea2ed9145ad14/9c16fdfaaf51f3de7b0035169deef01f3a297916.jpg)
三、 发现抽象
我桌子上放着一本RabbitMQ in Action,另外官网提供的文档也很详细,我感觉在一个月内我就能精通RabbitMQ,到时候简历上又可以写上“精通…”,感觉有点小得意呢... ,但是我知道这并不是使用RabbitMQ的最佳方式。
我们知道合理的抽象可以帮我们隐藏掉一些技术细节,让我们将重心放在核心业务上,比如一个人问你:“大雁塔如何走?”你的回答可能是“小寨往东,一直走两站,右手边”,如果你回答:“右转45度,向前走100米,再转90度…”,对方就会迷失在这些细节中。
![](https://imgsa.baidu.com/exp/w=500/sign=03ee64165f4e9258a63486eeac83d1d1/c9fcc3cec3fdfc0398886fcadd3f8794a5c226cb.jpg)
消息队列的使用过程中实际隐藏着一种抽象——服务总线(Service Bus)。
我们在回头看第一个例子,这个例子隐含的业务是:ClientA发送一个指令,ClientB收到该指令后做出反应。如果是这样,我们为什么要关心如何创建channel,如何创建一个queue? 我仅仅是要发送一个消息而已。另外这个例子写的其实不够健壮:
没有重试机制:如果ClientB第一次没有执行成功如何对该消息处理?
没有错误处理机制:如果ClientB在重试了N次之后还是异常如何处理该消息?
没有熔断机制;
如何对ClientA做一个schedule(计划安排),比如定时发送等;
没有消息审计机制;
无法对消息的各个状态做追踪;
事物处理等。
服务总线正是这种场景的抽象,并且为我们提供了这些机制,让我们赶快来看个究竟吧。
四、初识MassTransit
MassTransit是.NET平台下的一款开源免费的ESB产品,官网:http://masstransit-project.com/,GitHub 700 star,500 Fork,类似的产品还有NServiceBus,之所以要选用MassTransit是因为他要比NServiceBus轻量级,另外在MassTransit开发之初就选用了RabbitMQ作为消息传输组建;同时我想拿他跟NServiceBus做个比较,看看他们到底有哪些侧重点。
1、新建控制台应用程序:Masstransit.RabbitMQ.GreetingClient
使用MassTransit可以从Nuget中安装:
1
Install-Package MassTransit.RabbitMQ
2、创建服务总线,发送一个命令
static void Main(string[] args)
{
Console.WriteLine("Press 'Enter' to send a message.To exit, Ctrl + C");
var bus = BusCreator.CreateBus();
var sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.GreetingQueue}");
while (Console.ReadLine()!=null)
{
Task.Run(() => SendCommand(bus, sendToUri)).Wait();
}
Console.ReadLine();
}
private static async void SendCommand(IBusControl bus,Uri sendToUri)
{
var endPoint =await bus.GetSendEndpoint(sendToUri);
var command = new GreetingCommand()
{
Id = Guid.NewGuid(),
DateTime = DateTime.Now
};
await endPoint.Send(command);
Console.WriteLine($"send command:id={command.Id},{command.DateTime}");
}
这一段代码隐藏了众多关于消息队列的细节,将我们的注意力集中在发送消息上,同时ServiceBus提供的API也更接近业务,我们虽然发送的是一个消息,但是在这种场景下体现出来是一个命令,Send(command)这一API描述了我们的意图。
3、服务端接收这一命令
新建一个命令台控制程序:Masstransit.RabbitMQ.GreetingServer
var bus = BusCreator.CreateBus((cfg, host) =>
{
cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingQueue, e =>
{
e.Consumer<GreetingConsumer>();
});
});
这一代码可以理解为服务端在监听消息,我们在服务端注册了一个名为“GreetingConsumer”的消费者,GreetingConsumer的定义:
public class GreetingConsumer :IConsumer<GreetingCommand>
{
public async Task Consume(ConsumeContext<GreetingCommand> context)
{
await Console.Out.WriteLineAsync($"receive greeting commmand: {context.Message.Id},{context.Message.DateTime}");
}
}
该consumer可以消费类型为GreetingCommand的消息。这一实例几乎隐藏了有关RabbitMQ的技术细节,将代码中心放在了业务中,将这两个控制台应用跑起来试试:
![](https://imgsa.baidu.com/exp/w=500/sign=9f95684bbade9c82a665f98f5c8080d2/fd039245d688d43f8a591d80741ed21b0ff43bcb.jpg)
五、实现Publish/Subscribe模式
发布/订阅模式使得基于消息传递的软件架构成为可能,这一能力表现为ClientA发送消息X,ClientB和ClientC都可以订阅消息X。
1、我们在上面的例子中改造一下,当GreetingConsumer收到GreetingCommand后发送一个GreetingEvent:
var greetingEvent = new GreetingEvent()
{
Id = context.Message.Id,
DateTime = DateTime.Now
};
await context.Publish(greetingEvent);
2、新建控制台程序Masstransit.RabbitMQ.GreetingEvent.SubscriberA用来订阅GreetingEvent消息:
var bus = BusCreator.CreateBus((cfg, host) =>
{
cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingEventSubscriberAQueue, e =>
{
e.Consumer<GreetingEventConsumer>();
});
});
bus.Start();
定义GreetingEventConsumer:
public class GreetingEventConsumer:IConsumer<Greeting.Message.GreetingEvent>
{
public async Task Consume(ConsumeContext<Greeting.Message.GreetingEvent> context)
{
await Console.Out.WriteLineAsync($"receive greeting event: id {context.Message.Id}");
}
}
这一代码跟Masstransit.RabbitMQ.GreetingServer接受一个命令几乎一模一样,唯一的区别在于:
在Send/Receive模式中Client首先要获得对方(Server)的终结点(endpoint),直接向该终结点发送命令。Server方监听自己的终结点并消费命令。
而Publish/Subscribe模式中Client publish一个事件,SubscriberA在自己的终结点(endpointA)监听事件,SubscriberB在自己的终结点(endpointB)监听事件。
3、根据上面的分析再定义一个Masstransit.RabbitMQ.GreetingEvent.SubscriberB
4、将4个控制台应用程序跑起来看看
![](https://imgsa.baidu.com/exp/w=500/sign=88f171becacec3fd8b3ea775e689d4b6/5bafa40f4bfbfbed9a87ba0d71f0f736aec31fe1.jpg)
六、实现RPC模式
这一模式在Masstransit中被称作Request/Response模式,通过IRequestClient<IRequest, IResponse> 接口来实现相关操作。一个相关的例子在官方的github。
结束语:本篇文章分析了如何使用Masstransit来抽象业务,避免直接使用具体的消息队列,当然本文提到的众多服务总线机制,如“重试、熔断等”并没有在该文中出现,需要大家进一步去了解该项目。
通过对Masstransit的一些试用和NServiceBus的对比,Masstransit在实际项目中很容易上手并且免费,各种API定义的也非常清晰,但是官方的文档有点过于简单,实际使用中还需要去做深入的研究。作为.NET平台下为数不多的ESB开源产品,其关注程度还是不够,期待大家为开源项目做出贡献。
来源:https://jingyan.baidu.com/article/e75057f2eaa4c6ebc91a893b.html
如何优雅的使用RabbitMQ?相关推荐
- 如何优雅的使用RabbitMQ
RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具.消息队列的使用场景大概有3种: 1.系统集成,分布式系统的设 ...
- ASP.NET Core 2.0利用MassTransit集成RabbitMQ
在ASP.NET Core上利用MassTransit来集成使用RabbitMQ真的很简单,代码也很简洁.近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用RabbitMQ的调用 ...
- RabbitMQ教程C#版 - 工作队列
先决条件 本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难,可以 ...
- RabbitMQ教程C#版 “Hello World”
先决条件 本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难,可以 ...
- RabbitMQ知多少
1.引言 RabbitMQ--Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适.RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议 ...
- RabbitMQ系列教程之四:路由(Routing)
在上一个教程中,我们构建了一个简单的日志系统,我们能够向许多消息接受者广播发送日志消息. 在本教程中,我们将为其添加一项功能 ,这个功能是我们将只订阅消息的一个子集成为可能. 例如,我们可以只将关键的 ...
- RabbitMQ系列教程之三:发布\/订阅(Publish\/Subscribe)
在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅 ...
- RabbitMQ系列教程之二:工作队列(Work Queues)
今天开始RabbitMQ教程的第二讲,废话不多说,直接进入话题. (使用.NET 客户端 进行事例演示) 在第一个教程中,我们编写了一个从命名队列中发送和接收消息的程序.在本教程中,我们将创建一个 ...
- .NET Core微服务之基于MassTransit实现数据最终一致性(Part 1)
Tip: 此篇已加入.NET Core微服务基础系列文章索引 一.预备知识:数据一致性 关于数据一致性的文章,园子里已经有很多了,如果你还不了解,那么可以通过以下的几篇文章去快速地了解了解,有个感性认 ...
最新文章
- jenkins集群测试环境原理
- SSL 1760——商店选址问题(最短路)
- java der pem_JAVA解析各种编码密钥对(DER、PEM、openssh公钥)
- [蓝桥杯][基础练习VIP]完美的代价-贪心
- 【渝粤题库】陕西师范大学202231财务管理Ⅰ 作业(高起专)
- Linux Shell脚本入门教程系列之(六)Shell数组
- 修改mysql限制ip_MySQL 修改账号的IP限制条件
- 转载《Python与开源GIS教程》随书源码网址
- python笔试题 github_在GitHub上收获6519颗星星的Python面试题资源,到底有多牛?
- 浅谈(Java)AIO-异步IO
- react小书没读完的记录
- java jxls_java操作excel及jxls(Z)
- 什么是SWFObject?
- linux的tar命令详情;linux多个文件压缩打包到一个压缩文件
- 数据库服务器文件路径,服务器数据库的文件路径
- unity找到指定名称的一个物体的子物体,多个子物体有相同的名称
- 电脑硬件:谈谈cpu后边数字及字母的意思
- 分布式计算的详细笔记
- 崇启大桥崇明段发生重大交通事故致5人死亡-重大交通事故-崇启大桥
- 你是将才还是帅才?--将才与帅才的十二个差异
热门文章
- java recv failed,jmeter压测报错Unrecognized Windows Sockets error: 0: recv failed
- python 傅立叶函数_python 图像的离散傅立叶变换实例
- 张一春教授计算机辅助教学,我校特邀评审专家张一春教授来校做专题讲座
- 使用opencv读取图片错误([ WARN:0@13.701] global D:\a\opencv-python\opencv-python\opencv\modules\imgcodecs\..)
- 31. 如何计算对象已死(引用计数器算法、可达性分析算法)32.对象是否可 GC?33. Minor GC 和 Full GC
- 13_线性回归分析、线性模型、损失函数、最小二乘法之梯度下降、回归性能评估、sklearn回归评估API、线性回归正规方程,梯度下降API、梯度下降 和 正规方程对比
- nginx反向代理和rewrite进行解决跨域问题、去掉url中的一部分字符串,通过nginx正则生成新的url
- 1、打印二进制机器码,程序内存分析,大端序小端序,指针数组,数组指针,数组的三种访问方式,typedef,#if-0-#endif,求数组大小,括号表达式
- Opencms中要注意的地方
- pythorch基本信息查询