环境:

  • win10
  • rabbitmq-3.8.8
  • .net core 3.1
  • RabbitMQ.Client 6.2.1
  • vs2019

安装RabbitMq环境参照:

  • window下安装rabbitmq
  • linux下安装rabbitmq

实验目的:

  • 探索RabbitMq消费者的处理模式
  • 验证RabbitMq的队列在自动应答和手动应答中的表现
  • 探索Qos的作用

一、说明

这里探索队列向消费者派发消息的模式;
消费者拿到消息后的处理模式;
自动应答时的消息处理情况;
开启Qos并关闭自动应答时的消息处理情况;

这里实验使用的代码风格是工作者模式

二、RabbitMq的消费者处理模式

结论:
给消费者绑定的事件是单线程顺序执行的,并不会并发执行,也就是说消费者接收到的消息会在单线程模式下按照分发的顺序一个一个去处理。
这个结论可以《消息队列5:rabbitmq的WorkQueue模式》很容易看到。

三、RabbitMq的开启消息自动应答时的消息处理情况

结论:
队列默认将消息依次分发到不同的消费者,就像发牌一样,按顺序一个一个来;
当消费者绑定队列开启自动应答时(autoAck:true),队列就只管按顺序将消息派发了,不管消费者的处理速度如何(自动应答嘛),此时如果消费者处理速度慢的话,消息可能在消费者那里产生积压,所以当消费者处理速度慢的时候不要开启自动应答。

下面实验消息在消费者端产生积压的情况:
准备:

  1. 一个生产者,每秒产生消息;
  2. 两个消费者,每秒消费一条消息(速度和产出一致);

实验过程:

先开启生产者,当产出到第10条消息时,开启消费者1,此时消费者1和生产者将始终保持10条消息的差距(但生产者的消息已经全部发送到消费者1了,只是消费者1自己处理的慢)。
当生产者产出到第20条消息时,开启消费者2,此时消费者2将从第20条消息开始消费,并且是隔条消费,这个很好理解,因为从第20条消息开始,队列将以此派分消息到两个消费者,而此时消费者1仍然在第10条消息处不紧不慢的消费者,因为它已经积压的10-20条连续消息还未处理完。
等生产者产出到第30条消息时,消费者1也已经消费掉积压的10-20条的连续消息,然而消费者1仍然有20-30中的其中5条消息积压者,而消费者2的速度从一开始就是紧紧根据生产者的。
当生产者产出到第40条消息时,消费者1经过同样的时间(10秒)也将消息从第20条消费到了40条,这之后,生产者和两个消费者的速度将保持一致。

它们的消费情况如下图所示:

他们的代码如下:
生产者:

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace Send
{class Program{static void Main(string[] args){//1. 实例化连接工厂var factory = new ConnectionFactory();//2. 建立连接using (var connection = factory.CreateConnection()){var t = Task.Factory.StartNew(() =>{//3. 创建信道var channel = connection.CreateModel();{//4. 声明队列channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);//5. 发送数据包var index = 1;while (true){string message = $"chanel1-{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 测试消息:{index}";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);Thread.Sleep(1000);index++;}}}, TaskCreationOptions.LongRunning);t.Wait();}}}
}

消费者:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;namespace Receive
{class Program{static void Main(string[] args){//1.实例化连接工厂var factory = new ConnectionFactory(){HostName = "localhost",Port = 5672,UserName = "guest",Password = "guest",VirtualHost = "/"};//2. 建立连接using (var connection = factory.CreateConnection()){//3. 创建信道using (var channel = connection.CreateModel()){//4. 声明队列channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);//5. 构造消费者实例var consumer = new EventingBasicConsumer(channel);//6. 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($" [x] [Thread:{Thread.CurrentThread.ManagedThreadId}] Received {message}");Thread.Sleep(1000);//模拟耗时Console.WriteLine(" [x] Done");};//7. 启动消费者channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}}}
}

运行后效果如下:

实验代码下载:https://download.csdn.net/download/u010476739/18168224

四、关于Qos

参照:https://www.rabbitmq.com/consumer-prefetch.html

我们知道在RabbitMq中:

  • IConnection:表示一个真实的Tcp连接;
  • IModel:表示Tcp连接上的信道,为的是复用Tcp连接;
  • 生产者和消费者的消息收发都是通过信道传递的。
  • 生产者通过信道将消息发送到RabbitMq;
  • 消费者通过信道接收RabbitMq队列分发的消息;

从上面的实验也看出来了,当消费者绑定队列并指定自动应答时,队列会将消息依次分发到所有的消费者,而不管消费者的处理速度情况(是否有积压)。
所以,默认应答仅适合那种消费非常快的情况。

RabbitMq允许消费者绑定队列时关闭自动应答,关闭后,消费者处理完消息后手动向RabbitMq发送确认完成消息,并拉取下一条消息(如果有的话)。这样就有个好处是,能者多干,有利于整个系统的负载均衡。
不过有个新的问题,就是每次新消息的获取都需要先手动和RabbitMq确认上次消息已消费完成,这样就影响了消费的速度。RabbitMq考虑到这种问题,并提供了一种模式Qos,就是允许消费者在处理速度有限的情况下也可以多储存一部分消息,而不用每次手动确认拉取新消息。基于这种模式,队列在分发消息时也就观察消费者的储存积压消息的容量,如果还有消息积压的空间就派发消息,没有的话就不派发。

如下图所示:

在RabbitMq中允许对队列和信道做消息的数量限制,但只推荐作用在队列上,这里实验也只在队列上做实验,因为在信道上限制消息的数量影响太大。

4.1 情形1: 关闭自动应答、不设置qos

准备一个生产者和一个消费者,速度都是1个/每秒。
先让生产者先跑到第20条消息,这样队列里面就积压了20条消息,然后开启消费者,可以看到生产者和消费者之间始终有20条消息的差距。
此时,观察RabbitMq的web面板,可以看到,这个队列hello未确认的消息稳定在20条。。。

代码如下:
生产者:

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace Send
{class Program{static void Main(string[] args){//1. 实例化连接工厂var factory = new ConnectionFactory();//2. 建立连接using (var connection = factory.CreateConnection()){var t = Task.Factory.StartNew(() =>{//3. 创建信道var channel = connection.CreateModel();{//4. 声明队列channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);//5. 发送数据包var index = 1;while (true){string message = $"chanel1-{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 测试消息:{index}";var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);Thread.Sleep(1000);index++;}}}, TaskCreationOptions.LongRunning);t.Wait();}}}
}

消费者:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;namespace Receive
{class Program{static void Main(string[] args){//1.实例化连接工厂var factory = new ConnectionFactory(){HostName = "localhost",Port = 5672,UserName = "guest",Password = "guest",VirtualHost = "/"};//2. 建立连接using (var connection = factory.CreateConnection()){//3. 创建信道using (var channel = connection.CreateModel()){//4. 声明队列channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);5. 声明信道上的Qos限制,作用在队列上//channel.BasicQos(//    prefetchSize: 0,//最多传输的内容的大小的限制,0为不限制,但据说prefetchSize参数,RabbitMQ暂未对其没有实现。//    prefetchCount: 5,//会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该消费者Consumer将阻塞block掉,直到有消息进行ack确认//    global: false // true/false,表示是否将上面设置应用于channel,简单点说,就是上面限制是信道channel级别的还是消费者consumer级别。//);//6. 构造消费者实例var consumer = new EventingBasicConsumer(channel);//7. 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($" [x] [Thread:{Thread.CurrentThread.ManagedThreadId}] Received {message}");Thread.Sleep(1000);//模拟耗时Console.WriteLine(" [x] Done");//multiple: 是否批量确认,这里单独确认即可channel.BasicAck(ea.DeliveryTag, multiple: false);};//8. 启动消费者channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}}}
}

效果截图:

4.2 情形2:关闭自动应答,设置Qos的个数为5

准备一个生产者和一个消费者,速度都是1个/每秒;
还上面一样的步骤,先让生产者跑一会,到第20条消息时开启消费者,在上面的实验中,RabbitMq的web面板中稳定显示有20条消息未应答(因为没有Qos,队列中有消息就会分发到消费者)。
但是在这里,我们会观察到RabbitMq的web面板中稳定显示有5条消息未应答,有15条消息处于Ready状态,总共有20条消息。

代码:
生产者的代码未改动,消费者中的代码去掉Qos的注释即可。

实验效果如下图所示:

4.3 关于Qos的声明

Qos的声明会作用到后面新绑定的消费者上,所以,当一个队列上需要绑定多个消费者的时候,我们可以在每次绑定前重新声明Qos,如下面的代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;namespace Receive
{class Program{static void Main(string[] args){//1.实例化连接工厂var factory = new ConnectionFactory();//2. 建立连接using (var connection = factory.CreateConnection()){//3. 创建信道using (var channel = connection.CreateModel()){//4. 声明队列channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);// 5.声明信道上的Qos限制,作用在队列上channel.BasicQos(prefetchSize: 0,prefetchCount: 5,global: false);//6. 构造消费者实例var consumer = new EventingBasicConsumer(channel);//7. 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($" [x] [Thread:{Thread.CurrentThread.ManagedThreadId}] Received {message}");Thread.Sleep(1000);//模拟耗时Console.WriteLine(" [x] Done");channel.BasicAck(ea.DeliveryTag, multiple: false);};//8. 启动消费者channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);//连续声明5个消费者绑定到同一队列for (var i = 0; i < 5; i++){channel.BasicQos(prefetchSize: 0,prefetchCount: (ushort)(i + 1),global: false);consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($" [x] 2 [Thread:{Thread.CurrentThread.ManagedThreadId}] Received {message}");Thread.Sleep(1000);Console.WriteLine(" [x] 2 Done");channel.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);}Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}}}
}

运行后,查看rabbitmq的web面板,可以看到每个消费者消息预取的长度如下:

4.4 单个消费者应用程序如何并发消费队列

场景是这样的,有一个pdf转word队列,我写了一个消费者处理器,它会连续不断的从队列中拉取消息。
如果,我们想提高并发数量,只需要多部署几台机器就可以了。
但实际情况是,单台机器的性能还可以,同时转换个几十个不成问题,总不能每提高一个并发就买一台机器吧?
这时,我们在一台机器上将消费者处理器运行几十次,这样单个机器就能拥有几十个并发了,但是,开启几十个进程合适嘛?显然不合适!
所以,最终我们期望可以配置单个进程的并发数量。比如,我们可以配置消费者处理器的并发处理能力为30,这样起一个进程就可以满足30个并发了。

不过,我们写代码的时候要注意一下,一个消费者要对应一个信道。
因为,一个通道内的消息处理顺序是串行的,无论是几个队列或几个消费者。(这里实验的效果是这样的,欢迎找到反例。。。)

正确的写法如下:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace Receive
{class Program{static void Main(string[] args){var factory = new ConnectionFactory();using (var connection = factory.CreateConnection()){//加入有10个并发var concurrent = 10;for (int i = 0; i < concurrent; i++){//每一个并发单独创建一个信道var channel = connection.CreateModel();channel.QueueDeclare(queue: "hello0", durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(prefetchSize: 0,prefetchCount: 5,global: false);var consumer = new EventingBasicConsumer(channel);var cc = i;consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")} [x] {cc} [Thread:{Thread.CurrentThread.ManagedThreadId}] Received {message}");Thread.Sleep(6000);Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")} [x] {cc} Done");channel.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: "hello0", autoAck: false, consumer: consumer);}Console.ReadLine();}}}
}

4.5 其他注意事项:

当调用chanel的消息确认时,必须使用接收消息时的chanel对象,如下所示:

private void Consumer_Received(object sender, BasicDeliverEventArgs e){try{}catch (Exception ex){logger.LogError($"[{e.DeliveryTag}]:{ex?.Message}");}finally{var consumer = sender as EventingBasicConsumer;consumer.Model.BasicAck(e.DeliveryTag, false);}}

消息队列8:RabbitMq的QOS实验相关推荐

  1. RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ

    RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ ...

  2. 消息队列之 RabbitMQ

    消息队列之 RabbitMQ 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了. 市面上的消息队列产品有很 ...

  3. 消息队列mysql redis那个好_Redis作为消息队列与RabbitMQ的比较

    Redis作为消息队列与RabbitMQ的比较 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩 ...

  4. 【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战

    目录 一.绪论 二.生产者 2.1事务机制 2.2confirm模式 串行模式 批量模式 异步模式 三.消费者 3.1手动ACK 一.绪论 上篇文章介绍了rabbitmq的基本知识.交换机类型实战&l ...

  5. 【重难点】【RabbitMQ 01】消息队列的作用、主流的消息队列、RabbitMQ 基于什么传输消息、RabbitMQ 模型架构、死信队列和延迟队列

    [重难点][RabbitMQ 01]消息队列的作用.主流的消息队列.RabbitMQ 基于什么传输消息.RabbitMQ 模型架构.死信队列和延迟队列 文章目录 [重难点][RabbitMQ 01]消 ...

  6. vs如何实现tcp连续发送多条消息_消息队列之 RabbitMQ

    为什么要使用MQ消息中间件?它解决了什么问题?关于为什么要使用消息中间件?消息中间件是如何做到同步变异步.流量削锋.应用解耦的?网上已经有很多说明,我这里就不再说明.我在接下来的RabbitMq系列博 ...

  7. 消息队列探秘-RabbitMQ消息队列介绍 侵立删

    1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有 ...

  8. 消息队列探秘 – RabbitMQ 消息队列工作原理

    1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有 ...

  9. 消息队列之RabbitMQ

    知识预览 RabbitMQ 回到顶部 RabbitMQ 什么叫消息队列 消息(Message)是指在应用间传送的数据.消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象. 消息队列 ...

最新文章

  1. Android架构篇-5 CI/CD(持续集成、持续交付、持续部署)
  2. dex文件结构(二):dex文件加载基本原理
  3. antd源码解读(4)- ButtonGroup
  4. python和c#交互_python与C#的互相调用
  5. python的pip_Python3中安装pip3
  6. DescribingDesign Patterns 描述设计模式
  7. MATLAB模糊控制算法,驾驶员制动意图识别
  8. H264压缩码率与GOP
  9. 投屏软件_duet for Mac(Mac投屏软件)
  10. android 微信输入法表情,分析Android 搜狗输入法在微信和QQ中发送图片和表情
  11. 计算机工作面试需要准备什么,视频面试手机还是电脑 面试前的准备工作要做好...
  12. LeetCode 图解 | 237.删除链表中的节点
  13. 如何写一份合格的SAP功能开发说明书--报表类
  14. 常见的激活函数及其优缺点分析
  15. 计算机视频追踪方向,基于深度学习的目标视频跟踪算法综述
  16. uniapp引入字体图标库
  17. 用telnet+openocd+jtag_dpi+vcs仿真调试RISCV的cpu
  18. 【JavaEE】进入Web开发的世界-CSS
  19. 基于Matlab脚本实现对串口数据的实时采集绘图
  20. 基于虚拟同步机的柔性直流输电(HVDC)控制Matlab/simulink仿真模型

热门文章

  1. 沙盒隔离技术的本质与背景
  2. 数据人需要掌握的技能,从底层到应用
  3. 生物信息学软件_基因测序、生物信息分析平台工作站硬件配置探讨2020
  4. linux系统中开机自启的三种方式
  5. Xilinx Zynq mpsoc 的 pcie Tandem 配置
  6. Github上开源项目readme里好看的高大上的有趣的徽章从何而来
  7. MMDetection 快速开始,训练自定义数据集
  8. Matlab修改背景色
  9. 05 第四章 一阶逻辑基本概念
  10. 自然语言处理中的分词问题总结