先决条件
本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表联系我们。

1.工作队列

(使用.NET客户端)

在第一篇教程RabbitMQ教程C#版 “Hello World”中,我们编写了两个程序,用于从一个指定的队列发送和接收消息。在本文中,我们将创建一个工作队列,用于在多个工作线程间分发耗时的任务。

工作队列(又名:任务队列)背后的主要想法是避免立即执行资源密集型、且必须等待其完成的任务。相反的,我们把这些任务安排在稍后完成。我们可以将任务封装为消息并把它发送到队列中,在后台运行的工作进程将从队列中取出任务并最终执行。当您运行多个工作线程,这些任务将在这些工作线程之间共享。

这个概念在Web应用程序中特别有用,因为在一个HTTP请求窗口中无法处理复杂的任务。

2.准备

我们将略微修改上一个示例中的Send程序,以其可以在命令行发送任意消息。
这个程序将调度任务到我们的工作队列中,所以让我们把它命名为NewTask

像教程[1],我们需要生成两个项目:

dotnet new console --name NewTask

mv NewTask/Program.cs NewTask/NewTask.cs

dotnet new console --name Worker

mv Worker/Program.cs Worker/Worker.cs

cd NewTask

dotnet add package RabbitMQ.Client

dotnet restore

cd ../Worker

dotnet add package RabbitMQ.Client

dotnet restore

var message = GetMessage(args);

var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();

properties.Persistent = true;

channel.BasicPublish(exchange: "",

routingKey: "task_queue",

basicProperties: properties,

body: body);

从命令行参数获取消息的帮助方法:

private static string GetMessage(string[] args)

{

return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");

}

我们旧的Receive.cs脚本也需要进行一些更改:它需要为消息体中的每个点模拟一秒种的时间消耗。它将处理由RabbitMQ发布的消息,并执行任务,因此我们把它复制到Worker项目并修改:

// 构建消费者实例。

var consumer = new EventingBasicConsumer(channel);

// 绑定消息接收事件。

consumer.Received += (model, ea) =>

{

var body = ea.Body;

var message = Encoding.UTF8.GetString(body);

Console.WriteLine(" [x] Received {0}", message);

// 模拟耗时操作。

int dots = message.Split('.').Length - 1;

Thread.Sleep(dots * 1000);

Console.WriteLine(" [x] Done");

};

channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);

模拟虚拟任务的执行时间:

int dots = message.Split('.').Length - 1;

Thread.Sleep(dots * 1000);

3.循环调度

使用任务队列的优点之一是能够轻松地并行工作。如果我们正在积累积压的工作,我们仅要增加更多的工作者,并以此方式可以轻松扩展。

首先,我们尝试同时运行两个Worker实例。他们都会从队列中获取消息,但究竟如何?让我们来看看。

您需要打开三个控制台,两个运行Worker程序,这些控制台作为我们的两个消费者 - C1和C2。

# shell 1

cd Worker

dotnet run

# => [*] Waiting for messages. To exit press CTRL+C

# shell 2

cd Worker

dotnet run

# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们将发布一些新的任务。一旦你已经运行了消费者,你可以尝试发布几条消息:

# shell 3

cd NewTask

dotnet run "First message."

dotnet run "Second message.."

dotnet run "Third message..."

dotnet run "Fourth message...."

dotnet run "Fifth message....."

让我们看看有什么发送到了我们的Worker程序:

# shell 1

# => [*] Waiting for messages. To exit press CTRL+C

# => [x] Received 'First message.'

# => [x] Received 'Third message...'

# => [x] Received 'Fifth message.....'

# shell 2

# => [*] Waiting for messages. To exit press CTRL+C

# => [x] Received 'Second message..'

# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。消费者数量平均的情况下,每个消费者将会获得相同数量的消息。这种分配消息的方式称为循环(Round-Robin)。请尝试开启三个或更多的Worker程序来验证。

4.消息确认

处理一项任务可能会需要几秒钟的时间。如果其中一个消费者开启了一项长期的任务并且只完成了部分就挂掉了,您可能想知道会发生什么?在我们当前的代码中,一旦RabbitMQ把消息分发给了消费者,它会立即将这条消息标记为删除。在这种情况下,如果您停掉某一个Worker,我们将会丢失这条正在处理的消息,也将丢失所有分发到该Worker但尚未处理的消息。

但是我们不想丢失任何一个任务。如果一个Worker挂掉了,我们希望这个任务能被重新分发给其他Worker。

为了确保消息永远不会丢失,RabbitMQ支持消息确认机制。消费者回发一个确认信号Ack(nowledgement)给RabbitMQ,告诉它某个消息已经被接收、处理并且可以自由删除它。

如果一个消费者在还没有回发确认信号之前就挂了(其通道关闭,连接关闭或者TCP连接丢失),RabbitMQ会认为该消息未被完全处理,并将其重新排队。如果有其他消费者同时在线,该消息将会被会迅速重新分发给其他消费者。这样,即便Worker意外挂掉,也可以确保消息不会丢失。

没有任何消息会超时;当消费者死亡时,RabbitMQ将会重新分发消息。即使处理消息需要非常非常长的时间也没关系。

默认情况下,手动消息确认模式是开启的。在前面的例子中,我们通过将autoAck(“自动确认模式”)参数设置为true来明确地关闭手动消息确认模式。一旦完成任务,是时候删除这个标志并且从Worker手动发送一个恰当的确认信号给RabbitMQ。

// 构建消费者实例。

var consumer = new EventingBasicConsumer(channel);

// 绑定消息接收事件。

consumer.Received += (model, ea) =>

{

var body = ea.Body;

var message = Encoding.UTF8.GetString(body);

Console.WriteLine(" [x] Received {0}", message);

// 模拟耗时操作。

int dots = message.Split('.').Length - 1;

Thread.Sleep(dots * 1000);

Console.WriteLine(" [x] Done");

// 手动发送消息确认信号。

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

};

// autoAck:false - 关闭自动消息确认,调用`BasicAck`方法进行手动消息确认。

// autoAck:true  - 开启自动消息确认,当消费者接收到消息后就自动发送ack信号,无论消息是否正确处理完毕。

channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

使用上面这段代码,我们可以确定的是,即使一个Worker在处理消息时,我们通过使用CTRL + C来终止它,也不会丢失任何消息。Worker挂掉不久,所有未确认的消息将会被重新分发。

忘记确认
遗漏BasicAck是一个常见的错误。这是一个很简单的错误,但导致的后果却是严重的。当客户端退出时(看起来像是随机分发的),消息将会被重新分发,但是RabbitMQ会吃掉越来越多的内存,因为它不能释放未确认的消息。
为了调试这种错误,您可以使用rabbitmqctl来打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,删除sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

5.消息持久化

我们已经学习了如何确保即使消费者挂掉,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务还是会丢失。

当RabbitMQ退出或崩溃时,它会忘记已存在的队列和消息,除非告诉它不要这样做。为了确保消息不会丢失,有两件事是必须的:我们需要将队列和消息标记为持久

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为了做到这一点,我们需要把队列声明是持久的(Durable)

// 声明队列,通过指定durable参数为`true`,对消息进行持久化处理。

channel.QueueDeclare(queue: "hello",

durable: true,

exclusive: false,

autoDelete: false,

arguments: null);

虽然这个命令本身是正确的,但是它在当前设置中不会起作用。那是因为我们已经定义过一个名为hello的队列,并且这个队列不是持久化的。RabbitMQ不允许使用不同的参数重新定义已经存在的队列,并会向尝试执行该操作的程序返回一个错误。但有一个快速的解决办法 - 让我们用不同的名称声明一个队列,例如task_queue

channel.QueueDeclare(queue: "task_queue",

durable: true,

exclusive: false,

autoDelete: false,

arguments: null);

注意,该声明队列QueueDeclare方法的更改需要同时应用于生产者和消费者代码。

此时,我们可以确定的是,即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久的(Persistent) - 通过将IBasicProperties.Persistent设置为true

// 将消息标记为持久性。

var properties = channel.CreateBasicProperties();

properties.Persistent = true;

关于消息持久性的说明
将消息标记为Persistent并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但当RabbitMQ接收到消息并且尚未保存消息时仍有一段时间间隔。此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存中,并没有真正写入磁盘。消息的持久化保证并不健壮,但对于简单的任务队列来说已经足够了。如果您需要一个更加健壮的保证,可以使用发布者确认。

6.公平调度

您可能已经注意到调度仍然无法完全按照我们期望的方式工作。例如,在有两个Worker的情况下,假设所有奇数消息都很庞大、偶数消息都很轻量,那么一个Worker将会一直忙碌,而另一个Worker几乎不做任何工作。是的,RabbitMQ并不知道存在这种情况,它仍然会平均地分发消息。

发生这种情况是因为RabbitMQ只是在消息进入队列后就将其分发。它不会去检查每个消费者所拥有的未确认消息的数量。它只是盲目地将第n条消息分发给第n位消费者。

为了改变上述这种行为,我们可以使用参数设置prefetchCount = 1basicQos方法。

这就告诉RabbitMQ同一时间不要给一个Worker发送多条消息。或者换句话说,不要向一个Worker发送新的消息,直到它处理并确认了前一个消息。
相反,它会这个消息调度给下一个不忙碌的Worker。

channel.BasicQos(0, 1, false);

关于队列大小的说明
如果所有的Worker都很忙,您的队列可能会被填满。请留意这一点,可以尝试添加更多的Worker,或者使用其他策略。

7.组合在一起

我们NewTask.cs类的最终代码:

using System;

using RabbitMQ.Client;

using System.Text;

class NewTask

{

public static void Main(string[] args)

{

// 实例化连接工厂。

var factory = new ConnectionFactory() { HostName = "localhost" };

// 创建连接、信道。

using(var connection = factory.CreateConnection())

using(var channel = connection.CreateModel())

{

// 声明队列,标记为持久性。

channel.QueueDeclare(queue: "task_queue",

durable: true,

exclusive: false,

autoDelete: false,

arguments: null);

// 获取发送消息。

var message = GetMessage(args);

var body = Encoding.UTF8.GetBytes(message);

// 将消息标记为持久性。

var properties = channel.CreateBasicProperties();

properties.Persistent = true;

// 发送数据包。

channel.BasicPublish(exchange: "",

routingKey: "task_queue",

basicProperties: properties,

body: body);

Console.WriteLine(" [x] Sent {0}", message);

}

Console.WriteLine(" Press [enter] to exit.");

Console.ReadLine();

}

private static string GetMessage(string[] args)

{

return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");

}

}

(NewTask.cs源码)

还有我们的Worker.cs

using System;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Text;

using System.Threading;

class Worker

{

public static void Main()

{

// 实例化连接工厂。

var factory = new ConnectionFactory() { HostName = "localhost" };

// 创建连接、信道。

using(var connection = factory.CreateConnection())

using(var channel = connection.CreateModel())

{

// 声明队列,标记为持久性。

channel.QueueDeclare(queue: "task_queue",

durable: true,

exclusive: false,

autoDelete: false,

arguments: null);

// 告知RabbitMQ,在未收到当前Worker的消息确认信号时,不再分发给消息,确保公平调度。

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

Console.WriteLine(" [*] Waiting for messages.");

// 构建消费者实例。

var consumer = new EventingBasicConsumer(channel);

// 绑定消息接收事件。

consumer.Received += (model, ea) =>

{

var body = ea.Body;

var message = Encoding.UTF8.GetString(body);

Console.WriteLine(" [x] Received {0}", message);

// 模拟耗时操作。

int dots = message.Split('.').Length - 1;

Thread.Sleep(dots * 1000);

Console.WriteLine(" [x] Done");

// 手动发送消息确认信号。

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

};

channel.BasicConsume(queue: "task_queue",

autoAck: false,

consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");

Console.ReadLine();

}

}

}

(Worker.cs源码)

使用消息确认机制和BasicQ您可以创建一个工作队列。即使RabbitMQ重新启动,通过持久性选项也可让任务继续存在。

有关IModel方法和IBasicProperties的更多信息,您可以在线浏览RabbitMQ .NET客户端API参考。

现在,我们可以继续阅读教程[3],学习如何向多个消费者发送相同的消息。

8.写在最后

本文翻译自RabbitMQ官方教程C#版本。本文介绍如与官方有所出入,请以官方最新内容为准。

水平有限,翻译的不好请见谅,如有翻译错误还请指正。

  • 原文链接:RabbitMQ tutorial - Work Queues

  • 实验环境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code

  • 最后更新:2018-04-03

相关文章:

  • .net core 使用Redis的发布订阅

  • RabbitMQ知多少

  • RabbitMQ系列教程之四:路由(Routing)

  • RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

  • RabbitMQ系列教程之二:工作队列(Work Queues)

  • 如何优雅的使用RabbitMQ

  • .NET 使用 RabbitMQ 图文简介

  • RabbitMQ 高可用集群搭建及电商平台使用经验总结

  • .NET Core 使用RabbitMQ

  • ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线

  • RabbitMQ教程C#版 “Hello World”

原文地址:https://www.cnblogs.com/esofar/p/rabbitmq-work-queues.html


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com 

RabbitMQ教程C#版 - 工作队列相关推荐

  1. [译]RabbitMQ教程C#版 - 远程过程调用(RPC)

    先决条件 本教程假定 RabbitMQ 已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难, ...

  2. [译]RabbitMQ教程C#版 - 发布订阅

    先决条件 本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难,可以 ...

  3. RabbitMQ教程C#版 “Hello World”

    先决条件 本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助 如果您在阅读本教程时遇到困难,可以 ...

  4. RabbitMQ教程大全看这一篇就够了-java版本

    目录 什么是RabbitMQ? RabbitMQ 核心概念 Docker 安装 RabbitMQ RabbitMQ 控制台页面介绍 RabbitMQ 交换机 Exchange 介绍 Direct Ex ...

  5. RabbitMQ教程总结

    [译]RabbitMQ教程一 主要通过Hello Word对RabbitMQ有初步认识 [译]RabbitMQ教程二 工作队列,即一个生产者对多个消费者 循环分发.消息确认.消息持久.公平分发 [译] ...

  6. RabbitMQ教程远程过程调用RPC

    前言:在前面的教程里我们学习了工作队列,实现了将工作任务发给不同的工人,如果任务是需要在另一台计算机上运行,我们如何实现运行远程计算机上的一个函数任务并等待其返回的结果呢,这种模式通常被称为远程过程调 ...

  7. RabbitMQ教程 3.发布/订阅(Publish/Subscribe)

    搜索:Java课代表,关注公众号,及时获取更多Java干货. 3 发布/订阅(Publish/Subscribe) 在上一节中,我们创建了一个工作队列.其目的是将每个任务只分发给一个worker.本节 ...

  8. serv-u 自定义html,Serv-U架设教程_Serv-U使用教程图文版

    Serv-U是一款很好用的FTP服务器软件,本文就给大家详细介绍一下<Serv-U架设教>,希望对广大新手有用. Serv-U架设教程_Serv-U使用教程图文版: 1.到文末下载 Ser ...

  9. iOS游戏框架Sprite Kit基础教程——Swift版上册

    iOS游戏框架Sprite Kit基础教程--Swift版上册 试读下载地址:http://pan.baidu.com/s/1qWBdV0C  介绍:本教程是国内唯一的Swift版的Spritekit ...

最新文章

  1. 值得推荐的C/C++框架和库 【强烈推荐】
  2. 使用GetProcAddress获取ZwUnmapViewOfSection函数指针
  3. dom4kj解析xml
  4. openstack及组件简要介绍
  5. MATLAB基本用法介绍
  6. DataList多行数据后如何添加一条分隔线
  7. 一天一个Java基础——序列化
  8. Linux expect脚本使用详细说明及示例
  9. Ubuntu gerrit 安装配置
  10. python3怎么安装opencv_Python:即使安装了opencv,也无法导入cv2(如何为python3安装opencv3)...
  11. 基于BlueZ 的BLE蓝牙开发
  12. 修改整合的Jfinal的Model自动绑定表插件 AutoTableBindPlugin
  13. Intouch2020安装与授权
  14. 蓝海灵豚发票管理系统
  15. matlab中gen2par函数,R语言中的par()函数终于明白了
  16. 那些年陪伴我们搬砖的心灵的音乐
  17. Android分步注册,Activity由B返回A修改再前往B,B中已填项不变
  18. 05.SQL Server(高级查询)
  19. 学习(一)C#利用窗体打开Excel文件进行正常访问和写入
  20. 20个非常有用的Python单行代码

热门文章

  1. 基于Azure Blob冷存储的数据压缩备份总结
  2. bitmapdata的知识点
  3. 程序内存一直在泄漏,原来是异步死循环了 !
  4. Visual Studio 2022 Preview 3和2019 16.11发布
  5. 记一次 .NET 某云采购平台API 挂死分析
  6. 【12图】你管这破玩意叫Pulsar
  7. 如何在没有 System.Drawing.Common 的情况下使用 C# 获取图片格式
  8. 在 Azure App Service 上运行 .NET 6 预览版
  9. 当 .NET 5 遇上OpenTelemetry,会碰撞出怎样的火花?
  10. 如何在 .Net Core 中使用 IHostedService