1. 说明

  在企业应用系统领域,会面对不同系统之间的通信、集成与整合,尤其当面临异构系统时,这种分布式的调用与通信变得越发重要。其次,系统中一般会有很多对实时性要求不高的但是执行起来比较较耗时的地方,比如发送短信,邮件提醒,更新文章阅读计数,记录用户操作日志等等,如果实时处理的话,在用户访问量比较大的情况下,对系统压力比较大。

面对这些问题,我们一般会将这些请求,放在消息队列MQ中处理;异构系统之间使用消息进行通讯。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

  MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。

   RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

  消息传递相较文件传递与远程过程调用(RPC)而言,似乎更胜一筹,因为它具有更好的平台无关性,并能够很好地支持并发与异步调用。所以如果系统中出现了如下情况:

  • 对操作的实时性要求不高,而需要执行的任务极为耗时;
  • 存在异构系统间的整合;

  一般的可以考虑引入消息队列。对于第一种情况,常常会选择消息队列来处理执行时间较长的任务。引入的消息队列就成了消息处理的缓冲区。消息队列引入的异步通信机制,使得发送方和接收方都不用等待对方返回成功消息,就可以继续执行下面的代码,从而提高了数据处理的能力。尤其是当访问量和数据流量较大的情况下,就可以结合消息队列与后台任务,通过避开高峰期对大数据进行处理,就可以有效降低数据库处理数据的负荷。

  本文简单介绍在RabbitMQ这一消息代理工具,以及在.NET中如何使用RabbitMQ.

2. 搭建环境

  2.1 安装Erlang语言运行环境

  由于RabbitMQ使用Erlang语言编写,所以先安装Erlang语言运行环境。具体移步博客:windows配置Erlang环境

  2.2 安装RabbitMQ服务端

  地址 http://www.rabbitmq.com/

  下载安装。

  使RabbitMQ以Windows Service的方式在后台运行:打开cmd切换到sbin目录下执行

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start

  现在RabbitMQ的服务端已经启动起来了。

  要查看和控制RabbitMQ服务端的状态,可以用rabbitmqctl这个脚本。

  比如查看状态:

rabbitmqctl status

  

  假如显示node没有连接上,需要到C:\Windows目录下,将.erlang.cookie文件,拷贝到用户目录下 C:\Users\{用户名},这是Erlang的Cookie文件,允许与Erlang进行交互。

  使用命令查看用户:

rabbitmqctl list_users

  RabbitMQ会为我们创建默认的用户名guest和密码guest,guest默认拥有RabbitMQ的所有权限。

  一般的,我们需要新建一个我们自己的用户,设置密码,并授予权限,并将其设置为管理员,可以使用下面的命令来执行这一操作:

rabbitmqctl  add_user  JC JayChou   //创建用户JC密码为JayChou
rabbitmqctl  set_permissions  JC ".*"  ".*"  ".*"    //赋予JC读写所有消息队列的权限
rabbitmqctl  set_user_tags JC administrator    //分配用户组

  修改JC密码为123:

rabbitmqctl change_password JC  123

  删除用户JC:

rabbitmqctl delete_user  JC

  也可以开启rabbitmq_management插件,在web界面查看和管理RabbitMQ服务

rabbitmq-plugins enable rabbitmq_management  

  2.3下载RabbitMQ的Client端dll

  下载地址:http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/

  本人下载了这个 rabbitmq-dotnet-client-3.6.6-dotnet-4.5.zip

  解压,我们需要的是这个文件,以后会引用到vs的项目中:

3.使用

  3.1在使用RabitMQ之前,先对几个概念做一下说明

  

  RabbitMQ是一个消息代理。他从消息生产者(producers)那里接收消息,然后把消息送给消息消费者(consumer)在发送和接受之间,他能够根据设置的规则进行路由,缓存和持久化。

  一般提到RabbitMQ和消息,都用到一些专有名词。

  • 生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用"P"来表示:

    

  • 队列(queue)就是邮箱的名称。消息通过你的应用程序和RabbitMQ进行传输,它们只能存储在队列(queue)中。 队列(queue)容量没有限制,你要存储多少消息都可以——基本上是一个无限的缓冲区。多个生产者(producers)能够把消息发送给同一个队列,同样,多个消费者(consumers)也能从同一个队列(queue)中获取数据。队列可以画成这样(图上是队列的名称):

    

  • 消费(Consuming)和获取消息是一样的意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它画作"C":

    

  通常,消息生产者,消息消费者和消息代理不在同一台机器上。

3.2 Hello Word

  下面来展示简单的RabbitMQ的使用:

    

3.2.1 首先创建名为ProjectSend的控制台项目,需要引用RabbitMQ.Client.dll。这个程序作为Producer生产者,用来发送数据:

static void Main(string[] args){var factory = new ConnectionFactory();factory.HostName = "localhost";//RabbitMQ服务在本地运行factory.UserName = "guest";//用户名factory.Password = "guest";//密码
    using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare("hello", false, false, false, null);//创建一个名称为hello的消息队列string message = "Hello World"; //传递的消息内容var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("", "hello", null, body); //开始传递Console.WriteLine("已发送: {0}", message);<br>          Console.ReadLine();}}
}</pre>

  

  首先,需要创建一个ConnectionFactory,设置目标,由于是在本机,所以设置为localhost,如果RabbitMQ不在本机,只需要设置目标机器的IP地址或者机器名称即可,然后设置前面创建的用户名和密码。

  紧接着要创建一个Channel,如果要发送消息,需要创建一个队列,然后将消息发布到这个队列中。在创建队列的时候,只有RabbitMQ上该队列不存在,才会去创建。消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化和然后转化为二进制数组。

  现在客户端发送代码已经写好了,运行之后,消息会发布到RabbitMQ的消息队列中,现在需要编写服务端的代码连接到RabbitMQ上去获取这些消息。

3.2.2创建名为ProjectReceive的控制台项目,引用RabbitMQ.Client.dll。作为Consumer消费者,用来接收数据:

static void Main(string[] args){var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "guest";factory.Password = "guest";
        using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare("hello", false, false, false, null);var consumer = new EventingBasicConsumer(channel);channel.BasicConsume("hello", false, consumer);consumer.Received += (model, ea) =&gt;{var body = ea.Body;var message = Encoding.UTF8.GetString(body); Console.WriteLine("已接收: {0}", message);   };Console.ReadLine(); }}}</pre>

  和发送一样,首先需要定义连接,然后声明消息队列。要接收消息,需要定义一个Consume,然后在接收消息的事件中处理数据。

3.2.3 现在发送和接收的客户端都写好了,让我们编译执行起来

  发送消息:

  现在,名为hello的消息队列中,发送了一条消息。这条消息存储到了RabbitMQ的服务器上了。使用rabbitmqctl 的list_queues可以查看所有的消息队列,以及里面的消息个数,可以看到,目前Rabbitmq上只有一个消息队列,里面只有一条消息:

  也可以在web管理界面查看此queue的相关信息:

  接收消息:

  既然消息已经被接收了,那我们再来看queue的内容:

  可见,消息中的内容在接收之后已被删除了。

3.3 工作队列

  前面的例子展示了如何在指定的消息队列发送和接收消息。

  现在我们创建一个工作队列(work queue)来将一些耗时的任务分发给多个工作者(workers):

  

  工作队列(work queues, 又称任务队列Task Queues)的主要思想是为了避免立即执行并等待一些占用大量资源、时间的操作完成。而是把任务(Task)当作消息发送到队列中,稍后处理。一个运行在后台的工作者(worker)进程就会取出任务然后处理。当运行多个工作者(workers)时,任务会在它们之间共享。

  这个在网络应用中非常有用,它可以在短暂的HTTP请求中处理一些复杂的任务。在一些实时性要求不太高的地方,我们可以处理完主要操作之后,以消息的方式来处理其他的不紧要的操作,比如写日志等等。

准备

  在第一部分,发送了一个包含“Hello World!”的字符串消息。现在发送一些字符串,把这些字符串当作复杂的任务。这里使用time.sleep()函数来模拟耗时的任务。在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。比如"Hello..."就会耗时3秒钟。

对之前示例的send.cs做些简单的调整,以便可以发送随意的消息。这个程序会按照计划发送任务到我们的工作队列中。

static void Main(string[] args)
{var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";
using (var connection = factory.CreateConnection())
{using (var channel = connection.CreateModel()){channel.QueueDeclare("hello", false, false, false, null);string message = GetMessage(args);var properties = channel.CreateBasicProperties();properties.DeliveryMode = 2;var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("", "hello", properties, body);Console.WriteLine(" set {0}", message);}
}Console.ReadKey();

}

private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : “Hello World!”);
}

接着我们修改接收端,让他根据消息中的逗点的个数来Sleep对应的秒数:

static void Main(string[] args)
{var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";
using (var connection = factory.CreateConnection())
{using (var channel = connection.CreateModel()){channel.QueueDeclare("hello", false, false, false, null);var consumer = new QueueingBasicConsumer(channel);channel.BasicConsume("hello", true, consumer);while (true){var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();var body = ea.Body;var message = Encoding.UTF8.GetString(body);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine("Received {0}", message);Console.WriteLine("Done");}}
}

}

轮询分发

  使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

现在,我们先启动两个接收端,等待接受消息,然后启动一个发送端开始发送消息。

  在cmd条件下,发送了5条消息,每条消息后面的逗点表示该消息需要执行的时长,来模拟耗时的操作。

  然后可以看到,两个接收端依次接收到了发出的消息:

默认,RabbitMQ会将每个消息按照顺序依次分发给下一个消费者。所以每个消费者接收到的消息个数大致是平均的。 这种消息分发的方式称之为轮询(round-robin)。

3.4 消息响应

当处理一个比较耗时得任务的时候,也许想知道消费者(consumers)是否运行到一半就挂掉。在当前的代码中,当RabbitMQ将消息发送给消费者(consumers)之后,马上就会将该消息从队列中移除。此时,如果把处理这个消息的工作者(worker)停掉,正在处理的这条消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望该消息会重新发送给其他的工作者(worker)。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ才会释放并删除这条消息。

如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。

消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

消息响应默认是开启的。在之前的例子中使用了no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

channel.BasicConsume("hello", false, consumer);

while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

var body = ea.Body;
var message = Encoding.UTF8.GetString(body);int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);Console.WriteLine("Received {0}", message);
Console.WriteLine("Done");channel.BasicAck(ea.DeliveryTag, false);

}

现在,可以保证,即使正在处理消息的工作者被停掉,这些消息也不会丢失,所有没有被应答的消息会被重新发送给其他工作者.

一个很常见的错误就是忘掉了BasicAck这个方法,这个错误很常见,但是后果很严重. 当客户端退出时,待处理的消息就会被重新分发,但是RabitMQ会消耗越来越多的内存,因为这些没有被应答的消息不能够被释放。调试这种case,可以使用rabbitmqct打印messages_unacknoledged字段。

rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

3.5 消息持久化

前面已经搞定了即使消费者down掉,任务也不会丢失,但是,如果RabbitMQ Server停掉了,那么这些消息还是会丢失。

当RabbitMQ Server 关闭或者崩溃,那么里面存储的队列和消息默认是不会保存下来的。如果要让RabbitMQ保存住消息,需要在两个地方同时设置:需要保证队列和消息都是持久化的。

首先,要保证RabbitMQ不会丢失队列,所以要做如下设置:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

虽然在语法上是正确的,但是在目前阶段是不正确的,因为我们之前已经定义了一个非持久化的hello队列。RabbitMQ不允许我们使用不同的参数重新定义一个已经存在的同名队列,如果这样做就会报错。现在,定义另外一个不同名称的队列:

bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclare 这个改动需要在发送端和接收端同时设置。

现在保证了task_queue这个消息队列即使在RabbitMQ Server重启之后,队列也不会丢失。 然后需要保证消息也是持久化的, 这可以通过设置IBasicProperties.SetPersistent 为true来实现:

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

需要注意的是,将消息设置为持久化并不能完全保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,但是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要消息队列持久化的强保证,可以使用publisher confirms

3.6 公平分发

你可能会注意到,消息的分发可能并没有如我们想要的那样公平分配。比如,对于两个工作者。当奇数个消息的任务比较重,但是偶数个消息任务比较轻时,奇数个工作者始终处理忙碌状态,而偶数个工作者始终处理空闲状态。但是RabbitMQ并不知道这些,他仍然会平均依次的分发消息。

为了改变这一状态,我们可以使用basicQos方法,设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工作者发送多于1个的消息,或者换句话说。在一个工作者还在处理消息,并且没有响应消息之前,不要给他分发新的消息。相反,将这条新的消息发送给下一个不那么忙碌的工作者。

channel.BasicQos(0, 1, false); 

3.7 完整实例

现在将所有这些放在一起:

发送端代码如下:

static void Main(string[] args)
{var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";
using (var connection = factory.CreateConnection())
{using (var channel = connection.CreateModel()){bool durable = true;channel.QueueDeclare("task_queue", durable, false, false, null);string message = GetMessage(args);var properties = channel.CreateBasicProperties();properties.SetPersistent(true);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("", "task_queue", properties, body);Console.WriteLine(" set {0}", message);}
}Console.ReadKey();

}

private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : “Hello World!”);
}

接收端代码如下:

static void Main(string[] args)
{var factory = new ConnectionFactory();factory.HostName = "localhost";factory.UserName = "yy";factory.Password = "hello!";
using (var connection = factory.CreateConnection())
{using (var channel = connection.CreateModel()){bool durable = true;channel.QueueDeclare("task_queue", durable, false, false, null);channel.BasicQos(0, 1, false);var consumer = new QueueingBasicConsumer(channel);channel.BasicConsume("task_queue", false, consumer);while (true){var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();var body = ea.Body;var message = Encoding.UTF8.GetString(body);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine("Received {0}", message);Console.WriteLine("Done");channel.BasicAck(ea.DeliveryTag, false);}}
}

}

4 管理界面

RabbitMQ管理界面,通过该界面可以查看RabbitMQ Server 当前的状态,该界面是以插件形式提供的,并且在安装RabbitMQ的时候已经自带了该插件。需要做的是在RabbitMQ控制台界面中启用该插件,命令如下:

rabbitmq-plugins enable rabbitmq_management

现在,在浏览器中输入 http://server-name:15672/ server-name换成机器地址或者域名,如果是本地的,直接用localhost(RabbitMQ 3.0之前版本端口号为55672)在输入之后,弹出登录界面,使用我们之前创建的用户登录。

 .

在该界面上可以看到当前RabbitMQServer的所有状态。

5 总结

本文简单介绍了消息队列的相关概念,并介绍了RabbitMQ消息代理的基本原理以及在Windows 上如何安装RabbitMQ和在.NET中如何使用RabbitMQ。消息队列在构建分布式系统和提高系统的可扩展性和响应性方面有着很重要的作用,希望本文对您了解消息队列以及如何使用RabbitMQ有所帮助。


其他参考:
RabbitMQ教程C#版 - “Hello World”
C# 操作rabbitmq(一)

C#使用RabbitMQ(转)相关推荐

  1. RabbitMQ 入门系列(11)— RabbitMQ 常用的工作模式(simple模式、work模式、publish/subscribe模式、routing模式、topic模式)

    1. simple 模式 simple 模式是最简单最常用的模式 2. work 模式 work 模式有多个消费者 消息产生者将消息放入队列.生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到 ...

  2. Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程)

    1. 安装 rabbitmq 的 golang 包 golang 可使用库 github.com/streadway/amqp 操作 rabbitmq .使用下面命令安装 RabbitMQ . go ...

  3. RabbitMQ 入门系列(4)— RabbitMQ 启动、停止节点和应用程序、用户管理、权限配置

    1. 服务器管理 我们使用 "节点" 来指代 RabbitMQ 实例,当我们谈到 RabbitMQ 节点时指的是 RabbitMQ 应用程序和其所在的 Erlang 节点. 1.1 ...

  4. RabbitMQ 入门系列(3)— 生产者消费者 Python 代码实现

    生产者消费者代码示例 上一章节中对消息通信概念做了详细的说明,本章节我们对 RabbitMQ 生产者和消费者代码分别做一示例说明. 1. 生产者代码 #!/usr/bin/env python # c ...

  5. RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器

    本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记. RabbitMQ 中重要概念 1. 生产者 生产者(producer)创建消息,然后发送到代理 ...

  6. RabbitMQ 入门系列(1)— Ubuntu 安装 RabbitMQ 及配置

    1. RabbitMQ 简介 消息 (Message) 是指在应用间传送的数据.消息可以非常简单,比如只包含文本字符串.JSON等,也可以很复杂,比如内嵌对象. 消息队列中间件(Message Que ...

  7. RabbitMQ超详细安装教程(Linux)

    目录 1.简介 2.下载安装启动RabbitMQ 2.1.下载RabbitMQ 2.2.下载Erlang 2.3.安装Erlang 2.4.安装RabbitMQ 2.5.启动RabbitMQ服务 3. ...

  8. 第五节 RabbitMQ在C#端的应用-消息收发

    原文:第五节 RabbitMQ在C#端的应用-消息收发 版权声明:未经本人同意,不得转载该文章,谢谢 https://blog.csdn.net/phocus1/article/details/873 ...

  9. RabbitMQ学习笔记一:本地Windows环境安装RabbitMQ Server

    一:安装RabbitMQ需要先安装Erlang语言开发包,百度网盘地址:http://pan.baidu.com/s/1jH8S2u6.直接下载地址:http://erlang.org/downloa ...

  10. RabbitMQ使用及与spring boot整合

    1.MQ 消息队列(Message Queue,简称MQ)--应用程序和应用程序之间的通信方法 应用:不同进程Process/线程Thread之间通信 比较流行的中间件: ActiveMQ Rabbi ...

最新文章

  1. SQL GROUP BY 语句
  2. 理解CNN卷积层与池化层计算
  3. 苹果宣布3月7日召开发布会 预期推iPad 3
  4. mysql 优化器关联查询_MySQL 查询优化器(二)
  5. 虚拟化基础架构Windows 2008篇之2-域用户与域用户组管理
  6. 2019一级计算机等级考试试题,2019年全国计算机等级考试一级练习试题及答案(一)...
  7. SD从零开始29-30
  8. 天梯赛L2-10:排座位
  9. 五大优秀的数据库设计工具
  10. 百度世界2021:百度大脑升级、昆仑芯2量产、智能云加速AI落地爆发
  11. 滑动窗口平均值c语言,数据流滑动窗口平均值 · sliding window average from data stream...
  12. 安卓QQ聊天记录导出、备份完全攻略
  13. 人机交互大作业文档预览
  14. php 动态倒计时计数器跳转至另一个页面,JavaScript_基于JavaScript实现网页倒计时自动跳转代码,用JS实现网页上的自动跳转功 - phpStudy...
  15. Android集成谷歌定位sdk,Android 地图sdk,集成了百度、高德、谷歌三种地图
  16. 瀚高数据库命令备份还原
  17. Linux下构建一个deb软件安装包
  18. ICASPP2022论文阅读记录2 - Transformer-S2A
  19. ssm与springboot常见注解
  20. R语言 | 编写自己的函数

热门文章

  1. Flash动作补间动画
  2. GridView的DataKeyNames属性(转)
  3. Linux入门-安装篇(Debian 服务器版)
  4. 天朝四大不正经社交软件 第一名“亮了”
  5. WSUS 3.0 SP2 部署安装
  6. 3.啊哈!算法 --- 一大波数正在靠近——枚举!很暴力
  7. 11.RabbitMQ实战 --- 提升性能,保障安全
  8. 17.企业应用架构模式 --- 会话状态模式
  9. 5.Shell 编程从入门到精通 --- 基本文本处理
  10. 20. 远程端口查看