什么是工作队列?

工作队列(又名任务队列)是RabbitMQ提供的一种消息分发机制。当一个Consumer实例正在进行资源密集任务的时候,后续的消息处理都需要等待这个实例完成正在执行的任务,这样就导致消息挤压,后续的消息不能及时的处理。

RabbitMQ的工作队列机制允许同一个Consumer的多个实例同时监听消息队列,如果发现某个Consumer实例的处理任务未完成,就自动将消息转给其他未工作的Consumer实例,从而达到平衡负载的效果。

消息确认机制

前一篇笔记中,Receive程序创建消息消费者的时候,我们将autoAck参数设置为true, 即自动确认消息。

channel.BasicConsume(queue: "hello", autoAck: true, consumer:consumer);

自动确认的意思就是当消费者程序接收到消息之后,会自动给RabbitMQ一个收到消息的反馈。RabbitMQ会自动将这条消息删除,而不去关心消费者程序实例是否正确处理了这条消息,这样的缺点是一旦消费者程序实例出错,这条消息就丢失了。

所以在正式项目中,很少会将这个参数设置为true。大部分情况下,我们需要在消费者处理程序的Received事件处理方法最后调用BasicAck方法,手动通知RabbitMQ。

        //// Summary://     /// Acknowledge one or more delivered message(s). ///void BasicAck(ulong deliveryTag, bool multiple);

这样做的好处是,如果当前的消费者程序发生异常,RabbitMQ会自动分配一下一个可用的实例处理消息,或者等待当前实例重新连接,再发将消息发送过去

持久化机制

如果没有启动持久化机制,所有的消息队列和消息信息都是存储在服务器内存中。所以当RabbitMQ服务器停止运作或者发生错误的时候,所有的消息队列和消息队列中的消息都会丢失掉。为了能够避免丢失队列或者丢失消息,RabbitMQ提供了一种持久化机制。

RabbitMQ的持久化机制分为消息队列的持久化和消息的持久化。

消息队列持久化

前一篇笔记中,Send和Receive程序都是用如下代码,对消息队列进行声明

channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);

durable就是是否启用消息队列持久化的参数。

注:RabbitMQ不允许重新定义一个已经存在的消息队列,即一个消息队列的参数只有当第一次创建的时候才能配置,后面如果修改配置参数不会起作用。

消息持久化

前一篇笔记中,当发送一个消息的时候,我们使用了如下代码

channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: null,body: body);

其中有一个参数basicProperties, 我们没有设置他的。

这里我们修改一下,创建一个BasicProperties配置,然后设置他的Persistent属性为true

var properties = channel.CreateBasicProperties();properties.Persistent = true;channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: properties,body: body);

Round-Robin调度

Round-Robin调度即循环调度,RabbitMQ的任务分发默认使用是循环调度。即按照消费者程序实例的连接顺序,依次发送消息。

例:如果有2个实例,实例A和实例B, 循环调度发送消息的顺序即A,B,A,B,A,B…..

循环调度的缺点是,它不会考虑实例是否正在处理消息,它只是按照实例的连接顺序,发送消息给实例进行处理,这样就可能导致某些消息处理实例一直处理资源密集型消息任务,导致消息任务处理速度下降。

以2个实例为例,实例A和实例B, 奇数次的消息任务都是资源密集型的消息任务,这样实例A就会堆积很多未完成的任务。

Fair调度

Fair调度,即公平调度,它与Round-Robin调度的区别就是,它可以为每个消费者程序实例设置一个处理任务的上限。

当消费者实例的消息任务数量(待进行任务数量+正在进行的任务数量)达到设置的上限,RabbitMQ就不会再给他分配新的消息任务,除非当该实例的消息任务数量小于设置的上限,RabbitMQ才有可能发送新的消息给该实例处理

RabbitMQ中,我们可以调用channel实例的BasicQos方法设置每个实例处理消息的上限。

例: 设置最大处理上限为1

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

修改程序

Send

  1. 定义新的消息队列task_queue, 启用队列持久化
  2. 启用消息持久化
  3. 使用程序启动参数来决定发送的
static void Main(string[] args){var factory = new ConnectionFactory(){HostName = "localhost"};using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare(queue: "work_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);string message = args[0];var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;channel.BasicPublish(exchange: "",routingKey: "work_queue",basicProperties: properties,body: body);Console.WriteLine("[x] Sent {0}", message);}}}

Receive

  1. 定义新的消息队列task_queue, 启用队列持久化
  2. 启用消息持久化
  3. 使用Fair调度,每个消费者实例最多处理一个消息
  4. Received事件中添加代码Thread.Sleep(4000), 模拟资源密集操作
  5. 取消自动确认消息
  6. 手动确认消息
static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare(queue: "work_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine("[x] Received {0}", message);channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(queue: "work_queue", autoAck: false, consumer: consumer);Console.Read();}}}

最终效果

转载于:https://www.cnblogs.com/lwqlun/p/9095120.html

RabbitMQ学习笔记(二) 工作队列相关推荐

  1. RabbitMQ 学习笔记

    RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...

  2. Rabbitmq学习笔记(尚硅谷2021)

    Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...

  3. Rabbitmq学习笔记教程-尚硅谷

    Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...

  4. qml学习笔记(二):可视化元素基类Item详解(上半场anchors等等)

    原博主博客地址:http://blog.csdn.net/qq21497936 本文章博客地址:http://blog.csdn.net/qq21497936/article/details/7851 ...

  5. [转载]dorado学习笔记(二)

    原文地址:dorado学习笔记(二)作者:傻掛 ·isFirst, isLast在什么情况下使用?在遍历dataset的时候会用到 ·dorado执行的顺序,首先由jsp发送请求,调用相关的ViewM ...

  6. RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)

    RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...

  7. PyTorch学习笔记(二)——回归

    PyTorch学习笔记(二)--回归 本文主要是用PyTorch来实现一个简单的回归任务. 编辑器:spyder 1.引入相应的包及生成伪数据 import torch import torch.nn ...

  8. tensorflow学习笔记二——建立一个简单的神经网络拟合二次函数

    tensorflow学习笔记二--建立一个简单的神经网络 2016-09-23 16:04 2973人阅读 评论(2) 收藏 举报  分类: tensorflow(4)  目录(?)[+] 本笔记目的 ...

  9. Scapy学习笔记二

    Scapy学习笔记二 Scapy Sniffer的用法: http://blog.csdn.net/qwertyupoiuytr/article/details/54670489 Scapy Snif ...

  10. Ethernet/IP 学习笔记二

    Ethernet/IP 学习笔记二 原文链接:http://wiki.mbalib.com/wiki/Ethernet/IP 1.通信模式 不同于源/目的通信模式,EtherNet/IP 采用生产/消 ...

最新文章

  1. highcharts的导出功能
  2. Spark数据分析实战:大型活动大规模人群的检测和疏散
  3. 三种运动让身高增长4-10cm
  4. Elasticsearch原理与调优
  5. 书籍推荐(2016-2020)--统计数学计算机为主,心理学为辅
  6. linux bash命令找不到,Linux下提示命令找不到:bash:command not found
  7. 依赖: ros-melodic-desktop 但是它将不会被安装_npm系列之依赖管理
  8. 快讯:2018 OOW Oracle技术大会PPT抢鲜下载
  9. 《『若水新闻』客户端开发教程》——03.设计新闻分类UI(1)
  10. Shuffle Cards(Rope大法)将一段区间的数字整体搬动
  11. FleaPHP和ThinkPHP(比较)
  12. 雷林鹏分享Node.js Buffer(缓冲区)
  13. SCI-HUB 印度被诉、twitter账号被封,是梁上君子还是罗宾汉?
  14. 青龙脚本合集(不定期更新版)
  15. power BI爬取网页数据方法
  16. 注释大全,神兽护体,佛祖保佑,永无bug
  17. Age of Information(AoI)大体介绍与相关工作
  18. echarts添加背景图片,背景色及水印
  19. Echo写入一句话木马+分段写入
  20. 通过Word或WLW离线发布CSDN博客

热门文章

  1. 电脑罗盘时钟代码_轻松吃透实时时钟芯片DS1302软硬件设计
  2. 计算机考试只读,计算机基础考试试题-20210710011550.docx-原创力文档
  3. c ++异常处理_C ++中的异常处理
  4. selenium中js定位_Selenium中的定位剂
  5. Android CheckBox
  6. java组合与继承始示例_Java 9功能与示例
  7. 数据库设计精选视频_11种精选工具和服务,可改善您的设计工作
  8. IntelliJ IDEA个人许可证赠品报告和获奖者
  9. 2021年Java大厂面试必备面试题
  10. Java基础:String类支持几种构造函数?