RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

RabbitMQ安装

RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。

1.首先安装erlang

rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

2.然后安装socat

yun install socat

3.最后安装RabbitMQ

rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

RabbitMQ常用命令

启用Web控制台

rabbitmq-plugins enable rabbitmq_management

开启服务

systemctl start rabbitmq-server.service

停止服务

systemctl stop rabbitmq-server.service

查看服务状态

systemctl status rabbitmq-server.service

查看RabbitMQ状态

rabbitmqctl status

添加用户赋予管理员权限

rabbitmqctl  add_user  username  password
rabbitmqctl  set_user_tags  username  administrator

查看用户列表

rabbitmqctl list_users

删除用户

rabbitmqctl delete_user username

修改用户密码

rabbitmqctl oldPassword Username newPassword

访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。

.NET Core 使用RabbitMQ

通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

定义生产者
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip
};//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//声明一个队列
channel.QueueDeclare("hello", false, false, false, null);Console.WriteLine("\nRabbitMQ连接成功,请输入消息,输入exit退出!");string input;
do
{input = Console.ReadLine();var sendBytes = Encoding.UTF8.GetBytes(input);//发布消息channel.BasicPublish("", "hello", null, sendBytes);} while (input.Trim().ToLower()!="exit");
channel.Close();
connection.Close();
定义消费者
            //创建连接工厂ConnectionFactory factory = new ConnectionFactory{UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip};//创建连接var connection = factory.CreateConnection();//创建通道var channel = connection.CreateModel();//事件基本消费者EventingBasicConsumer consumer = new EventingBasicConsumer(channel);//接收到消息事件consumer.Received += (ch, ea) =>{var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine($"收到消息: {message}");//确认该消息已被消费channel.BasicAck(ea.DeliveryTag, false);};//启动消费者 设置为手动应答消息channel.BasicConsume("hello", false, consumer);Console.WriteLine("消费者已启动");Console.ReadKey();channel.Dispose();connection.Close();
运行

启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

修改一下消费者的代码:

//接收到消息事件
consumer.Received += (ch, ea) =>
{var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine($"收到消息: {message}");Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");Thread.Sleep(10000);//确认该消息已被消费channel.BasicAck(ea.DeliveryTag, false);Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
};

演示:

从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。但是 header模式在实际使用中较少,所以这里只介绍前三种模式。

Exchange不是消费者关心的,所以消费者的代码完全不用变,用上面的消费者就行了。
由于避免文章过长,影响阅读,所以只贴了部分代码,但是demo里面是完整可运行的,详细代码请查看demo。

Direct Exchange

所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);//定义一个队列
channel.QueueDeclare(queueName, false, false, false, null);//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);

运行:

Fanout Exchange

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

static void Main(string[] args)
{string exchangeName = "TestFanoutChange";string queueName1 = "hello1";string queueName2 = "hello2";string routeKey = "";//创建连接工厂ConnectionFactory factory = new ConnectionFactory{UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip};//创建连接var connection = factory.CreateConnection();//创建通道var channel = connection.CreateModel();//定义一个Direct类型交换机channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);//定义队列1channel.QueueDeclare(queueName1, false, false, false, null);//定义队列2channel.QueueDeclare(queueName2, false, false, false, null);//将队列绑定到交换机channel.QueueBind(queueName1, exchangeName, routeKey, null);channel.QueueBind(queueName2, exchangeName, routeKey, null);//生成两个队列的消费者ConsumerGenerator(queueName1);ConsumerGenerator(queueName2);Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");string input;do{input = Console.ReadLine();var sendBytes = Encoding.UTF8.GetBytes(input);//发布消息channel.BasicPublish(exchangeName, routeKey, null, sendBytes);} while (input.Trim().ToLower() != "exit");channel.Close();connection.Close();
}/// <summary>
/// 根据队列名称生成消费者
/// </summary>
/// <param name="queueName"></param>
static void ConsumerGenerator(string queueName)
{//创建连接工厂ConnectionFactory factory = new ConnectionFactory{UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip};//创建连接var connection = factory.CreateConnection();//创建通道var channel = connection.CreateModel();//事件基本消费者EventingBasicConsumer consumer = new EventingBasicConsumer(channel);//接收到消息事件consumer.Received += (ch, ea) =>{var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine($"Queue:{queueName}收到消息: {message}");//确认该消息已被消费channel.BasicAck(ea.DeliveryTag, false);};//启动消费者 设置为手动应答消息channel.BasicConsume(queueName, false, consumer);Console.WriteLine($"Queue:{queueName},消费者已启动");
}

运行:

Topic Exchange

所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.*” 只会匹配到“XiaoChen.money”。

所以,Topic Exchange 使用非常灵活。

string exchangeName = "TestTopicChange";
string queueName = "hello";
string routeKey = "TestRouteKey.*";//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip
};//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);//定义队列1
channel.QueueDeclare(queueName, false, false, false, null);//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");string input;
do
{input = Console.ReadLine();var sendBytes = Encoding.UTF8.GetBytes(input);//发布消息channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

运行

Demo下载:DotNetCore.RabbitMQ

最后:欢迎加入 .net core 交流群一起学习,群号:4656606

.NET Core 使用RabbitMQ相关推荐

  1. net core 使用 rabbitmq

    windows环境安装: https://www.cnblogs.com/ericli-ericli/p/5902270.html .NET Core 使用RabbitMQ https://www.c ...

  2. .Net Core 集成 RabbitMQ 订阅与发送

    什么是RabbitMQ? 专业理解: MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced MessageQueue 高级消息队 ...

  3. .NET Core微服务之基于EasyNetQ使用RabbitMQ消息队列

    Tip: 此篇已加入.NET Core微服务基础系列文章索引 一.消息队列与RabbitMQ 1.1 消息队列 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含 ...

  4. 消息队列,我只选RabbitMQ!

    高并发架构是架构师的必修课,而消息队列,则是王冠上最闪亮的那颗明珠!能否驾驭消息队列这款高并发神器,亦成为架构师的试金石.作为专注.NET领域十多年的老架构师,下面从队列本质.技术选型.实战应用三个方 ...

  5. RabbitMQ教程C#版 - 工作队列

    先决条件 本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难,可以 ...

  6. RabbitMQ教程C#版 “Hello World”

    先决条件 本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难,可以 ...

  7. .NET CoreWebApi基于EasyNetQ使用RabbitMQ消息队列

    一.消息队列与RabbitMQ 1.1 消息队列 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中 ...

  8. 机票预定系统类图_电商系统延时任务机制源码分享

    需求分析: 在javashop电商系统中,各种促销活动都有开始时间和结束时间,想要让一个活动在预定的时间开始或结束,使用定时任务轮询,存在耗性能并且不能在准确的时间点开始或结束的缺点,为了可以在指定的 ...

  9. 北京Dotnet分享会 || 精英论坛第三期

    编者按: 没有一成不变的定律,没有长久不衰的流行,更没有一劳永逸的侥幸, 只有自己刻苦努力.脚踏实地.兢兢业业的学习和工作, 才会成为这个社会永远不会被淘汰的中流砥柱. 一.昨夜西风凋碧树 昨夜西风凋 ...

最新文章

  1. linux系统格式化磁盘
  2. java no cache_java – 在我的部署中设置Cache-Control no-cache,no-store是什么?
  3. linux-Centos 7下bond与vlan技术的结合
  4. VS 2005 2008 项目模板丢失问题
  5. 只懂黑盒测试也能学会代码覆盖率分析和精准测试
  6. 设计模式的C++实现 2.工厂模式
  7. javascript笔记——图片大小检测
  8. 文字处理技术:表格与形状的布局差异
  9. 人工智能:一种现代方法 第四版 翻译序言
  10. python日期计算,Python 日期的转换及计算的具体使用详解
  11. 台式计算机没有声音图标,台式电脑没声音,小扬声器图标也没有。
  12. 图片处理-填充图片-numpy.pad
  13. 微信公众平台网页服务器,微信公众号——网页端
  14. 49 把字符串转换成整数
  15. google浏览器字体模糊问题(类似分辨率问题)解决办法
  16. 自由编写一个创意进度展示条
  17. puppy linux 默认密码,puppylinux使用手册.doc
  18. 自然数与有理数的双射函数
  19. 数字信号处理实验一(离散时间信号的MATLAB实现)
  20. 21天学通Java学习笔记-Day01

热门文章

  1. boost::fusion::traits用法的测试程序
  2. boost::fusion::push_back用法的测试程序
  3. Boost:获取随机数的实例
  4. ITK:将图像投射为另一种类型
  5. VTK:PolyData之TubeFilter
  6. VTK:Filtering之SurfaceFromUnorganizedPoints
  7. OpenCV用方形棋盘进行相机校准
  8. Qt Creator优化移动设备的应用程序
  9. C语言将正整数转换为字符串(附完整源码)
  10. QT的QStateMachine类的使用