RabbitMQ学习笔记(二) 工作队列
什么是工作队列?
工作队列(又名任务队列)是RabbitMQ提供的一种消息分发机制。当一个Consumer实例正在进行资源密集任务的时候,后续的消息处理都需要等待这个实例完成正在执行的任务,这样就导致消息挤压,后续的消息不能及时的处理。
RabbitMQ的工作队列机制允许同一个Consumer的多个实例同时监听消息队列,如果发现某个Consumer实例的处理任务未完成,就自动将消息转给其他未工作的Consumer实例,从而达到平衡负载的效果。
消息确认机制
前一篇笔记中,Receive程序创建消息消费者的时候,我们将autoAck参数设置为true, 即自动确认消息。
channel.BasicConsume(queue: "hello", autoAck: true, consumer:consumer);
自动确认的意思就是当消费者程序接收到消息之后,会自动给RabbitMQ一个收到消息的反馈。RabbitMQ会自动将这条消息删除,而不去关心消费者程序实例是否正确处理了这条消息,这样的缺点是一旦消费者程序实例出错,这条消息就丢失了。
所以在正式项目中,很少会将这个参数设置为true。大部分情况下,我们需要在消费者处理程序的Received事件处理方法最后调用BasicAck方法,手动通知RabbitMQ。
//// Summary:// /// Acknowledge one or more delivered message(s). ///void BasicAck(ulong deliveryTag, bool multiple);
这样做的好处是,如果当前的消费者程序发生异常,RabbitMQ会自动分配一下一个可用的实例处理消息,或者等待当前实例重新连接,再发将消息发送过去
持久化机制
如果没有启动持久化机制,所有的消息队列和消息信息都是存储在服务器内存中。所以当RabbitMQ服务器停止运作或者发生错误的时候,所有的消息队列和消息队列中的消息都会丢失掉。为了能够避免丢失队列或者丢失消息,RabbitMQ提供了一种持久化机制。
RabbitMQ的持久化机制分为消息队列的持久化和消息的持久化。
消息队列持久化
前一篇笔记中,Send和Receive程序都是用如下代码,对消息队列进行声明
channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);
durable就是是否启用消息队列持久化的参数。
注:RabbitMQ不允许重新定义一个已经存在的消息队列,即一个消息队列的参数只有当第一次创建的时候才能配置,后面如果修改配置参数不会起作用。
消息持久化
前一篇笔记中,当发送一个消息的时候,我们使用了如下代码
channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: null,body: body);
其中有一个参数basicProperties, 我们没有设置他的。
这里我们修改一下,创建一个BasicProperties配置,然后设置他的Persistent属性为true
var properties = channel.CreateBasicProperties();properties.Persistent = true;channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: properties,body: body);
Round-Robin调度
Round-Robin调度即循环调度,RabbitMQ的任务分发默认使用是循环调度。即按照消费者程序实例的连接顺序,依次发送消息。
例:如果有2个实例,实例A和实例B, 循环调度发送消息的顺序即A,B,A,B,A,B…..
循环调度的缺点是,它不会考虑实例是否正在处理消息,它只是按照实例的连接顺序,发送消息给实例进行处理,这样就可能导致某些消息处理实例一直处理资源密集型消息任务,导致消息任务处理速度下降。
以2个实例为例,实例A和实例B, 奇数次的消息任务都是资源密集型的消息任务,这样实例A就会堆积很多未完成的任务。
Fair调度
Fair调度,即公平调度,它与Round-Robin调度的区别就是,它可以为每个消费者程序实例设置一个处理任务的上限。
当消费者实例的消息任务数量(待进行任务数量+正在进行的任务数量)达到设置的上限,RabbitMQ就不会再给他分配新的消息任务,除非当该实例的消息任务数量小于设置的上限,RabbitMQ才有可能发送新的消息给该实例处理
RabbitMQ中,我们可以调用channel实例的BasicQos方法设置每个实例处理消息的上限。
例: 设置最大处理上限为1
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
修改程序
Send
- 定义新的消息队列task_queue, 启用队列持久化
- 启用消息持久化
- 使用程序启动参数来决定发送的
static void Main(string[] args){var factory = new ConnectionFactory(){HostName = "localhost"};using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare(queue: "work_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);string message = args[0];var body = Encoding.UTF8.GetBytes(message);var properties = channel.CreateBasicProperties();properties.Persistent = true;channel.BasicPublish(exchange: "",routingKey: "work_queue",basicProperties: properties,body: body);Console.WriteLine("[x] Sent {0}", message);}}}
Receive
- 定义新的消息队列task_queue, 启用队列持久化
- 启用消息持久化
- 使用Fair调度,每个消费者实例最多处理一个消息
- Received事件中添加代码
Thread.Sleep(4000)
, 模拟资源密集操作 - 取消自动确认消息
- 手动确认消息
static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using (var connection = factory.CreateConnection()){using (var channel = connection.CreateModel()){channel.QueueDeclare(queue: "work_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine("[x] Received {0}", message);channel.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(queue: "work_queue", autoAck: false, consumer: consumer);Console.Read();}}}
最终效果
转载于:https://www.cnblogs.com/lwqlun/p/9095120.html
RabbitMQ学习笔记(二) 工作队列相关推荐
- RabbitMQ 学习笔记
RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...
- Rabbitmq学习笔记(尚硅谷2021)
Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...
- Rabbitmq学习笔记教程-尚硅谷
Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...
- qml学习笔记(二):可视化元素基类Item详解(上半场anchors等等)
原博主博客地址:http://blog.csdn.net/qq21497936 本文章博客地址:http://blog.csdn.net/qq21497936/article/details/7851 ...
- [转载]dorado学习笔记(二)
原文地址:dorado学习笔记(二)作者:傻掛 ·isFirst, isLast在什么情况下使用?在遍历dataset的时候会用到 ·dorado执行的顺序,首先由jsp发送请求,调用相关的ViewM ...
- RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)
RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...
- PyTorch学习笔记(二)——回归
PyTorch学习笔记(二)--回归 本文主要是用PyTorch来实现一个简单的回归任务. 编辑器:spyder 1.引入相应的包及生成伪数据 import torch import torch.nn ...
- tensorflow学习笔记二——建立一个简单的神经网络拟合二次函数
tensorflow学习笔记二--建立一个简单的神经网络 2016-09-23 16:04 2973人阅读 评论(2) 收藏 举报 分类: tensorflow(4) 目录(?)[+] 本笔记目的 ...
- Scapy学习笔记二
Scapy学习笔记二 Scapy Sniffer的用法: http://blog.csdn.net/qwertyupoiuytr/article/details/54670489 Scapy Snif ...
- Ethernet/IP 学习笔记二
Ethernet/IP 学习笔记二 原文链接:http://wiki.mbalib.com/wiki/Ethernet/IP 1.通信模式 不同于源/目的通信模式,EtherNet/IP 采用生产/消 ...
最新文章
- highcharts的导出功能
- Spark数据分析实战:大型活动大规模人群的检测和疏散
- 三种运动让身高增长4-10cm
- Elasticsearch原理与调优
- 书籍推荐(2016-2020)--统计数学计算机为主,心理学为辅
- linux bash命令找不到,Linux下提示命令找不到:bash:command not found
- 依赖: ros-melodic-desktop 但是它将不会被安装_npm系列之依赖管理
- 快讯:2018 OOW Oracle技术大会PPT抢鲜下载
- 《『若水新闻』客户端开发教程》——03.设计新闻分类UI(1)
- Shuffle Cards(Rope大法)将一段区间的数字整体搬动
- FleaPHP和ThinkPHP(比较)
- 雷林鹏分享Node.js Buffer(缓冲区)
- SCI-HUB 印度被诉、twitter账号被封,是梁上君子还是罗宾汉?
- 青龙脚本合集(不定期更新版)
- power BI爬取网页数据方法
- 注释大全,神兽护体,佛祖保佑,永无bug
- Age of Information(AoI)大体介绍与相关工作
- echarts添加背景图片,背景色及水印
- Echo写入一句话木马+分段写入
- 通过Word或WLW离线发布CSDN博客
热门文章
- 电脑罗盘时钟代码_轻松吃透实时时钟芯片DS1302软硬件设计
- 计算机考试只读,计算机基础考试试题-20210710011550.docx-原创力文档
- c ++异常处理_C ++中的异常处理
- selenium中js定位_Selenium中的定位剂
- Android CheckBox
- java组合与继承始示例_Java 9功能与示例
- 数据库设计精选视频_11种精选工具和服务,可改善您的设计工作
- IntelliJ IDEA个人许可证赠品报告和获奖者
- 2021年Java大厂面试必备面试题
- Java基础:String类支持几种构造函数?