目录:

  1. 轮询调度(Round-robin dispatching):即依次分配分配任务给worker。

  2. 消息答复(Message acknowledgement):在consumer处理完之后,进行消息答复。避免杀掉worker后,message消息。

  3. 消息持久化(Message durability):在RabbitMQ server停止后,确保message不会丢失。需要持久化queue和message

  4. 公平调度(Fair dispatch):为了使worker不会出现有的一直在busy,而有的一致很闲的状态。使用的是 channel.BasicQos(0, 1, false) ,确保worker确认完成上一个任务后,才会分配下一个。

  5. 代码

简述

在第一个教程中,我们讲了在一个指定的queue中发送和接收message. 下面我们讲一个用于在多个worker之间分配费时任务的工作队列(Work Queue).

Work Queue的主要思想就是避免立即做一个资源集中型任务并且还必须等待它完成。

我们把任务封装成一个message,并且发送到队列。这里面的worker实际就是consumer,之后会由它们执行这些任务。

我们会统计字符串中的 . 来使程序sleep。即使用Thread.Sleep()。例如Hello...会花费3秒。

在这里我们的producer叫做NewTask。而我们的consumer叫做worker。

它们可以在上一节的基础上做一些修改得到

发送消息代码修改

var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;channel.BasicPublish(exchange: "",routingKey: "task_queue",basicProperties: properties,body: body);

private static string GetMessage(string[] args)
{return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

接收消息代码修改

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);

这个里面的消息依然是自动答复的

上面的修改是为了模拟真实耗时任务。

轮询调度(Round-robin dispatching)

使用队列任务的一个好处就是能很容易的进行并发任务。

首先,我们尝试同时运行两个worker。它们可以同时从队列中取到message。那么具体是怎样呢?

你需要打开三个控制台程序,两个运行worker程序。

# shell 1 cd Worker
dotnet run# => [*] Waiting for messages. To exit press CTRL+C

# shell 2 cd Worker
dotnet run# => [*] Waiting for messages. To exit press CTRL+C

第三个用来发布new tasks.

# shell 3 cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."

我们看下两个worker中的结果:

默认情况下,RabbitMQ会轮询发送每个message。所以,平均来说,每个consumer会得到相同数量的messages . 这种分发message的方式叫做轮询。

同时,注意到,queue中的message只能发到一个worker里,即两个worker里的task不会重复,即这是一种点对点的方式。

消息答复(Message acknowledgement)

你想下,如果一个consumer正在进行一个长任务(long task),并且就完成了一部分就死掉了。那么会发生什么呢?在我们当前的代码里,一旦RabbitMQ发送了一个message到一个consumer,那么RabbitMQ里的message立刻就会被标记为删除(deletion)。 在这种情况下,如果你杀死一个worker,我们将会丢失这个worker正在处理的message,我们也会丢失所有已经分配到这个worker但还没处理的messages。

注意:默认情况下,并不是会等每个task在consumer中执行完才会分发下一个message,也有可能是一下分发好多条。具体可以通过设置。

但是,我们不想丢失任务tasks。如果一个worker死掉了,我们想要task会被发送到另一个worker。

为了message不再丢失,RabbitMQ引入了message acknowledge。一个ack 会在一个message被接收,处理后被consumer发送回来,并且RabbitMQ把它标记为删除。

如果一个consumer还没发送一个ack就死掉了。RabbitMQ会认为它没被完全处理,并且re-queue 它。如果线上同时还有其他的consumer,那么RabbitMQ会很快的把它发送到另一个consumer。这样即使worker突然死了,也没有message会丢失了。

消息过时是不存在的。RabbitMQ将重发这个message,当consumer死掉时。即使在处理message时花费很长时间,也没有关系(因为不存在过时)

Manual message acknowledgment(手动消息答复) 默认是开启的。在前一个例子中,我们通过设置autoAck为true把它关闭了。

现在,我们把手动消息答复打开(即autoAck设置为false),并且,一旦我们做完了一个task,我们就发送一个确认(a acknowledgment).

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);  //消息答复
};
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);//设置手动消息答复开启

这样,即使我们杀死了一个worker,我们的message也不会丢失了。

Acknowledgement必须和接收message的通道是同一个。否则会报 channel-level protocol exception。

那么,如果我们忘记发acknowledgement会怎么样呢?

忘记BasicAck是一个常发生的错误,但是后果却很严重。当你的client退出后,messages也会被重发。但是RabbitMQ会吃掉(消耗)越来越多的内存,随着它无法释放任何unacked messages.

你可以通过message_unacknowledged打印出没确认的message

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Windows上

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化(Message durability)

我们已经学了当consumer被杀死时,使task不丢失。但是如果我们的RabbitMQ server停止了,我们的task依然会丢失。

想要server停止时,messages不丢失,需要标记queue和message是持久化的(durable)。

首先,我们标记queue是durable

channel.QueueDeclare(queue: "hello",durable: true,  //标记queue为durableexclusive: false,autoDelete: false,arguments: null);

虽然上面的代码本身是正确的,但是在目前却不会生效。因为我们之前已经定义过了一个hello的queue,它是not durable。RabbitMQ不会允许你使用不同的参数重新定义一个已经存在的queue,并且会报错。

这里,我们先直接声明一个不同名称的queue。如下

channel.QueueDeclare(queue: "task_queue",durable: true,  //标记queue为durableexclusive: false,autoDelete: false,arguments: null);

其中,QueueDeclare需要被应用到producer和consumer的代码里。

现在,我们标记messages为persistent(永恒的)。通过设置IBasicProperties.SetPersistent为true.

var properties = channel.CreateBasicProperties();
properties.Persistent = true;  //设置message是persistent

公平调度(Fair dispatch)

你可能已经注意到,上面的调度仍然不能按我们想要的工作。它可能出现两个worker一个一直很忙,一个一直很闲(任务执行时间不一样)。

这是因为RabbitMQ会被分发,当message输入一个queue。它不会看一个consumer未完成的queue , 它仅仅盲目的分发第几个到第几个consumer.

为了改变行为,我们可以使用BasicQos,并且prefetchCount=1。这个会告诉RabbityMQ每次给只会给worker一个message。或者说,RabbitRQ在worker处理并且确认之前不会分发一个新的message。也可以说,RabbitMQ会分发给下一个不忙的worker。

channel.BasicQos(0, 1, false);

注意queue的大小

如果你的所有worker都是busy的,说明你的queue已经满了。你应该对此保持关注,并且或者你可以增加更多的worker或者有一些其他策略。

代码

NewTask.cs

using System;using RabbitMQ.Client;using System.Text;
class NewTask
{public static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())  //创建连接using(var channel = connection.CreateModel())   //创建channel
        {channel.QueueDeclare(queue: "task_queue",  //声明一个durable的queuedurable: true,exclusive: false,autoDelete: false,arguments: null);var message = GetMessage(args);   //取得messagevar body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();  //设置message是persistentproperties.Persistent = true;channel.BasicPublish(exchange: "",  //发送routingKey: "task_queue",basicProperties: properties,body: body);Console.WriteLine(" [x] Sent {0}", message);}Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}private static string GetMessage(string[] args){return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");}
}

Worker.cs

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;using System.Threading;
class Worker
{public static void Main(){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())  //建立连接using(var channel = connection.CreateModel())  //建立通道(channel)
        {channel.QueueDeclare(queue: "task_queue",  //声明queue是durabledurable: true,exclusive: false,autoDelete: false,arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);  //设置公平的调度策略(fair dispatch)
Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);  //回调函数consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] Received {0}", message);int dots = message.Split('.').Length - 1;  //模拟真实业务花费一些时间Thread.Sleep(dots * 1000);Console.WriteLine(" [x] Done");channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);  //消息答复
            };channel.BasicConsume(queue: "task_queue",  //发送并且设置手动消息答复开启autoAck: false,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}
}

参考网址:RabbitMQ

转载于:https://www.cnblogs.com/Vincent-yuan/p/10934973.html

RabbitMQ学习之Work Queues(2)相关推荐

  1. RabbitMQ学习

    RabbitMQ学习 1.概述 用于进程通信的中间件. 优势: 劣势: 1.应用解耦:提高了系统容错性和可维护性 1.系统依赖越多不能保证MQ的高可用 2.异步提速:提升用户体验和系统吞吐量 2.复杂 ...

  2. RabbitMQ 学习笔记

    RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...

  3. Rabbitmq学习笔记(尚硅谷2021)

    Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...

  4. RabbitMQ学习笔记(高级篇)

    RabbitMQ学习笔记(高级篇) 文章目录 RabbitMQ学习笔记(高级篇) RabbitMQ的高级特性 消息的可靠投递 生产者确认 -- confirm确认模式 生产者确认 -- return确 ...

  5. Rabbitmq学习笔记教程-尚硅谷

    Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...

  6. RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列

    上一篇已经讲了Rabbitmq如何在Windows平台安装,不懂请移步:RabbitMQ学习系列一:windows下安装RabbitMQ服务 一.理论: .net环境下,C#代码调用RabbitMQ消 ...

  7. RabbitMQ学习总结 第一篇:理论篇

    目录 RabbitMQ学习总结 第一篇:理论篇 RabbitMQ学习总结 第二篇:快速入门HelloWorld RabbitMQ学习总结 第三篇:工作队列Work Queue RabbitMQ学习总结 ...

  8. RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)

    RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...

  9. rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

    rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

最新文章

  1. 【Qt】qss样式表之:QCalendarWidget,日历窗口样式表设置
  2. 计算机主机的作用和性能指标,Cpu是什么 cpu性能指标主要有哪几个方面【详细介绍】...
  3. 用 Flask 来写个轻博客 (19) — 以 Bcrypt 密文存储账户信息与实现用户登陆表单
  4. python目录及文件操作
  5. Vue——Vue-Router的push和replace方法[Uncaught (in promise) Error]解决方案
  6. Java变量名的命名方式
  7. CoreAnimation编程指南(二)渲染架构
  8. python怎么用gamma函数_如何通俗的理解伽马(gamma)函数
  9. 身高2m,体重2kg,这样的数据“看上去很好”?
  10. Spark源码分析之Master注册机制原理
  11. Linux主机如何连接刀片机,刀片服务器RAID配置及Linux操作系统的安装.doc
  12. Java并发——Synchronized关键字和锁升级,详细分析偏向锁和轻量级锁的升级
  13. mysql查询每个表占用空间,【MySQL】查询所有数据库占用磁盘空间大小和单个库中所有表的大小...
  14. 171.Excel表列序号
  15. (转)左耳朵耗子:技术人如何更好地把控发展趋势?
  16. torch tensor去掉1维_代数拓扑笔记(1) —— 胞腔复形
  17. Hadoop技术之Hadoop HA 机制详解
  18. vb/vb.net开发技巧荟萃(七)
  19. Dell戴尔笔记本电脑G15 5515 Ryzen Edition原装出厂Windows11系统恢复原厂oem系统
  20. 赚钱大师小程序【最新版5.9.9】商城/佣金即时提现/分销推广/话费充值/美团饿了么外卖

热门文章

  1. oracle安装过程中内核参数详解
  2. [Python] L1-026. I Love GPLT-PAT团体程序设计天梯赛GPLT
  3. 蓝桥杯 ADV-96 算法提高 复数求和
  4. POJ 1321-棋盘问题-简单搜索DFS
  5. 音乐 美术 计算机期末考试表,贵阳初中学业水平考试信息技术、音乐、美术考评标准出炉!...
  6. 哪个计算机无法做到双屏显示,[工具/ PC]如何在计算机上实现双屏显示?
  7. 安装python时需要勾选_工业洗衣机安装时需要注意什么?
  8. 以太坊搭建联盟链_区块链知识普及:什么是以太坊
  9. sql中查询类型为int的字段,返回null的异常
  10. hbase Java API 介绍及使用示例