2019独角兽企业重金招聘Python工程师标准>>>

RabbitMQ流程简介(带Exchange)

RabbitMQ使用一些机制来保证可靠性,如持久化、消费确认及发布确认等。

先看以下这个图:

P为生产者,X为中转站(Exchange),红色部分为消息队列,C1、C2为消费者。

整个流程分成三部分:第一,生产者生产消息,发送到中转站;第二,中转站按定义的规则转发消息到消息队列;第三,消费者从消息队列获取消息进行消费(处理)。

RabbitMQ消息可靠性分析和应用

应用代码均使用C#客户端代码实现。

一、发布确认

生产者生产消息,发送到中转站的过程中,可能会因为网络丢包、网络故障等问题造成消息丢失。为了确保生产者发送的消息不会丢失,RabbitMQ提供了发布确认(Publisher Confirms)机制,从而提高消息的可靠性(注意:发布确认机制不能和事务机制一起使用)。

       单条消息发布确认:

1

2

3

4

5

6

7

8

9

10

channel.ConfirmSelect();//发布确认机制

string message = "msg";

var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(

        exchange: "MarkTopicChange",

        routingKey: "MarkRouteKey.one",

        basicProperties: null,

        body: body

        );

bool isPublished = channel.WaitForConfirms();//通道(channel)里消息发送成功返回true

使用channel.ConfirmSelect,一旦信道进入确认模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始)。消息被投递到所有匹配的队列之后,RabbitMQ就会发送(Basic.Ack)给生产者(包含消息的唯一ID),生产者从而知道消息发送成功。

       多条消息发布确认:

1

2

3

4

5

6

7

8

9

10

11

12

13

channel.ConfirmSelect();//发布确认机制

foreach (var itemMsg in lstMsg)

{

    byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg);

    //发布消息

    channel.BasicPublish(

        exchange: "MarkTopicChange",

        routingKey: "MarkRouteKey.one",

        basicProperties: null,

        body: sendBytes

        );

}

bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回true 

注意:多消息发布确认机制情况下,倘若要发送100条消息,发送90条后,突然网络故障,后面的消息发送失败了,那么isAllPublished返回的是false,而前面90条消息已经发送到消息队列了。我们还不知道哪些消息是发送失败的,所以很多条消息发布确认,建议分几次发送或多通道发送。

此外,需要确保在中转站(Exchange)的消息可以顺利到达消息队列。

(1)首先需要定义匹配的Exchange和Queue,根据Exchange的类型和routingKey确定转发的关系。

(2)设置BasicPublish方法中mandatory参数为true,然后监听Exchange中没有匹配的队列的消息,然后进行相操作。

(3)确保消息队列有足够内存存储消息。

RabbitMQ默认配置vm_memory_high_watermark为0.4。意思是控制消息占40%内存左右。vm_memory_high_watermark_paging_ratio为0.5,当消息占用内存超过50%,RabbitMQ会把消息转移到磁盘上以释放内存。当磁盘剩余空间小于阀值disk_free_limit(默认为50M),所有生产者阻塞,避免充满磁盘,导致所有的写操作失败。

RabbitMQ配置文件一般在%APPDATA%\RabbitMQ\rabbitmq.config.

%APPDATA% 一般为 C:\Users\%USERNAME%\AppData\Roaming(Windows环境)

二、持久化

消息存放到消息队列后,在不配置消息持久化的情况下,若服务器重启、关闭或宕机等,消息都会丢失。配置持久化可以有效提高消息的可靠性。持久化需要同时配置消息持久化和队列持久化。单配置消息持久化,队列消失了,消息没有地方存放;单配置队列持久化,队列还在,消息没了。

队列持久化在定义队列时候配置

1

2

3

4

5

6

7

8

//定义队列

channel.QueueDeclare(

    queue: "Mark_Queue"//队列名称

    durable: true//队列磁盘持久化                  

    exclusive: false,//是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除

    autoDelete: false,//是否自动删除,一般设成false

    arguments: null

    );

  消息持久化在发布消息时候配置

1

2

3

4

5

6

7

8

9

10

//消息持久化,把DeliveryMode设成2

IBasicProperties properties = channel.CreateBasicProperties();

properties.DeliveryMode = 2;

    //发布消息

    channel.BasicPublish(

        exchange: "MarkTopicChange",

        routingKey: "MarkRouteKey.one",

        basicProperties: properties,

        body: sendBytes

        );

如何配置了事务机制或发布确认(publisher confirm)机制,服务端的返回Basic.Ack是在消息落盘之后执行的,进一步的提高了消息的可靠性。

为了防止磁盘损坏带来的消息丢失,可以配置镜像队列,这里不作介绍。

三、消费确认

为了确保消息被消费者消费,RabbitMQ提供消费确认模式(consumer Acknowledgements)。自动确认模式,当消费者成功接收到消息后,自动通知RabbitMQ,把消息队列中相应消息删除。这很大程度上满足不了我们,假如消费者接收到消息后,服务器宕机,消息还没处理完成,这样就会造成消息丢失。手动确认模式,当消费者成功处理完消息后,手动发消息通知RabbitMQ,把消息队列中相应消息删除。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

consumer.Received += (model, ea) =>

{

    var body = ea.Body;

    var message = Encoding.UTF8.GetString(body);

    var routingKey = ea.RoutingKey;

    Console.WriteLine(" [x] Received '{0}':'{1}'",

                      routingKey,

                      message);

//确认该消息已被消费,发删除消息给RabbitMQ,把消息队列中的消息删除

channel.BasicAck(ea.DeliveryTag, false);

//消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者

//channel.BasicReject(ea.DeliveryTag, true);

//消费消息失败,拒绝多条消息,重回队列,让它们可以继续发送到其他消费者

//channel.BasicNack(ea.DeliveryTag, true, true);

};

//手动确认消息,把autoAck设成false

channel.BasicConsume(queue: "Mark_Queue",

                     autoAck: false,

                     consumer: consumer);

这里值得注意的是,消息处理完成后,一定要把处理完成的消息发送到RabbitMQ(channel.BasicAck(ea.DeliveryTag, false)),不然RabbitMQ会一直等待,从而造成内存泄露。若处理消息过程中发生异常,可以使用channel.BasicReject(ea.DeliveryTag, true)来拒绝此消息,让它重回队列。若RabbitMQ收不到消费者任何确认消息的信号(包括确认信号,拒绝信号灯),直到此消费者断开连接,消息才能重回队列,继续发送到其他消费者。

提醒一下,假如消费者消费消息的方法不支持并发(取决于需求),可以限制消费者每次只接收一条消息。

1

channel.BasicQos(0, 1, false);

转载于:https://my.oschina.net/u/4052893/blog/3006677

RabbitMQ消息可靠性分析和应用相关推荐

  1. RabbitMQ消息可靠性投递及分布式事务最终一致性实现

    RabbitMQ消息可靠性投递就是保证消息生产者能够将消息百分百投递到RabbitMQ服务器,并在传递过程中不丢失.然而在生产环境中由于网络中断.网络不稳定等原因导致消息在投递过程中丢失,这或许会造成 ...

  2. 【MQ】如何确保RabbitMQ消息可靠性传递?

    文章内容 1. Producer-to-Exchange 2. Exchange-to-Queue 3. Queue Sotrage 4. Queue-to-Consumer 5. Others 正在 ...

  3. RabbitMq——消息积压分析和解决思路

    文章目录 前言 消息积压产生的原因 消息积压问题解决 前言 专栏中之前进行了一系列各种模式的配置.使用和测试操作.但是都只是应用于使用阶段,暂未面向问题解决分析方向. 最近看了一篇资料,有大佬说到了消 ...

  4. 【内部技术分享PPT】漫谈 RabbitMQ 消息可靠性

  5. Rabbitmq消息保存机制应用案例分析消息可靠性保证

    Rabbitmq 消息保存机制 mandatory参数和immediate参数作用 mandatory:当参数设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,Rabbitmq ...

  6. rabbitmq可靠性投递_解决RabbitMQ消息丢失问题和保证消息可靠性(一)

    工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,什么情况下消息就不见了?如何防止消息丢失?下面通过这篇文章,我们就聊聊RabbitMQ 消 ...

  7. 详解SpringCloud中RabbitMQ消息队列原理及配置,一篇就够!

    作者:kosamino cnblogs.com/jing99/p/11679426.html 一.MQ用途 1.同步变异步消息 场景:用户下单完成后,发送邮件和短信通知. 运用消息队列之后,用户下单完 ...

  8. 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

    微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了. 今天,以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发 ...

  9. 2.RabbitMQ 的可靠性消息的发送

      本篇包含 1. RabbitMQ 的可靠性消息的发送 2. RabbitMQ 集群的原理与高可用架构的搭建 3. RabbitMQ 的实践经验   上篇包含 1.MQ 的本质,MQ 的作用 2.R ...

最新文章

  1. Windows 10版星巴克应用现身官网
  2. 将动态库添加到VC程序中
  3. PHP和MySQL处理树状、分级、无限分类、分层数据的方法
  4. vmware网络桥接模式无法上网的解决办法
  5. 烂泥:SQL Server 2005数据库安装
  6. JS----window对象详解
  7. php中绘制长方体,php代码将常见的长方形图片修改为正方形的图片
  8. 浅析类的const成员函数,类的const对象
  9. python __file__怎么实现_python怎么实现文件上传界面
  10. python 两点曲线_ECC椭圆曲线加密算法:ECDH 和 ECDSA
  11. win7如何进入修复计算机,win7电脑故障怎么进入安全模式修复
  12. 如何在手机上查银行卡号?进来手把手教你!
  13. MODERN ROBOTICS MECHANICS, PLANNING, AND CONTROL
  14. 多重继承--读松本行弘的程序世界
  15. c# wifi串口通信_在C#中实现串口通信的方法
  16. 智能小车系列文章之小车简介
  17. 计算机网络复习(第五章)
  18. 洛谷 P3258 [JLOI2014]松鼠的新家 树上差分
  19. 【Web书城】书城前端开发
  20. 程序员有哪些靠谱的副业赚钱途径

热门文章

  1. HDU-1811 Rank of Tetris
  2. 什么是CPAN(安装NAGIOS使用到)
  3. VMM2012应用指南之3-安装VMM2012
  4. linux进程源码分析,Linux内核源代码分析——口述程序猿如何意淫进程(一)
  5. windows服务器部署jar包
  6. MySQL高级 - 案例 - 系统性能优化 - 分页优化
  7. Nacos-环境隔离
  8. Nginx负载均衡策略之url_hash
  9. Nginx的安全控制及SSL加密介绍
  10. RabbitMQ预取值