现在聊一下RabbitMQ消息持久化:

问题及方案描述

1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

这种情况可以使用RabbitMQ提供的消息队列的持久化机制。

相关理论描述

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我个人觉得大多数开发人员都会选择持久化。

队列和交换机有一个创建时候指定的标志durabledurable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。

消息队列持久化包括3个部分:

1、exchange持久化,在声明时指定durable => true2、queue持久化,在声明时指定durable => true3、消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

注意:一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建

程序示例

生产者

class Producter    {        const string ExchangeName = "eric.exchange";        const string QueueName = "eric.queue";        static void Main(string[] args)        {            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的                channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的                channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);                string message = "Eric is very handsome";                var body = Encoding.UTF8.GetBytes(message);                //将队列设置为持久化之后,还需要将消息也设为可持久化的                var props = channel.CreateBasicProperties();                props.SetPersistent(true);                channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: body);                Console.WriteLine("Producter Sent: {0}", message);                Console.ReadKey();            }        }    }

注:ack是 acknowledgments 的缩写,noAck 是("no manual acks")

因为我前段时间换了笔记本,所以用户的“eric”的操作出踩了个坑,下面进行介绍下:

如果调试运行时报错:None of the specified endpoints were reachable

innerException是:

{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text="Unexpected Exception", classId=0, methodId=0, cause=System.IO.IOException: 无法从传输连接中读取数据: 远程主机强迫关闭了一个现有的连接。。 ---> System.Net.Sockets.SocketException: 远程主机强迫关闭了一个现有的连接。   在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)   在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)   --- 内部异常堆栈跟踪的结尾 ---   在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)   在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()   在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()   在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}

这说明我们使用的用户 不是 系统默认的 guest 而是我们自己创建的用户,但是没有足够的权限进行操作。

解决办法:

rabbitmqctl set_user_tags username administratorrabbitmqctl set_permissions -p / username ".*" ".*" ".*"

执行结果:

相关其他操作见:windows下 安装 rabbitMQ 及操作常用命令

程序运行结果:

消费者

class Recevice    {        const string ExchangeName = "eric.exchange";        const string QueueName = "eric.queue";        public static void Main()        {            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost = "/" };            using (var connection = factory.CreateConnection())            using (var channel = connection.CreateModel())            {                channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的                channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的                channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);                BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);                //NoAck:true 告诉RabbitMQ立即从队列中删除消息,另一个非常受欢迎的方式是从队列中删除已经确认接收的消息,可以通过单独调用BasicAck 进行确认:                //BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false);                var msgContent = Encoding.UTF8.GetString(msgResponse.Body);                Console.WriteLine("The received content:"+msgContent);                channel.BasicAck(msgResponse.DeliveryTag, multiple: false);                //使用BasicAck方式来告之是否从队列中移除该条消息                //需要额外注意,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。                Console.ReadKey();            }        }    }

接受消息还有一种方法,就是通过基于推送的事件订阅。可以使用内置的 QueueingBasicConsumer 提供简化的编程模型,允许在共享队列上阻塞,直到收到一条消息。

var consumer = new QueueingBasicConsumer(channel);                channel.BasicConsume(QueueName, noAck: true, consumer: consumer);                var msgResponse = consumer.Queue.Dequeue();                 var msgContent = Encoding.UTF8.GetString(msgResponse.Body);

程序运行结果:

原文链接:https://www.cnblogs.com/ericli-ericli/p/5938106.html

rabbitmq 持久化_RabbitMQ原理与相关操作(三)消息持久化相关推荐

  1. [RabbitMQ]RabbitMQ原理与相关操作(一)

    2019独角兽企业重金招聘Python工程师标准>>> RabbitMQ原理与相关操作(一) 小编是菜鸟一枚,最近想试试MQ相关的技术,所以自己看了下RabbitMQ官网,试着写下自 ...

  2. rabbitmq 同步策略_RabbitMQ(三):消息持久化策略

    一.前言 在正常的服务器运行过程中,时常会面临服务器宕机重启的情况,那么我们的消息此时会如何呢?很不幸的事情就是,我们的消息可能会消失,这肯定不是我们希望见到的结果.所以我们希望AMQP服务器崩溃了也 ...

  3. RabbitMQ 队列消息持久化

    参考链接: https://www.cnblogs.com/Keep-Ambition/p/8044752.html 假如消息队列test里面还有消息等待消费者(consumers)去接收,但是这个时 ...

  4. 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解

    导语   在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...

  5. Redis(一):Redis的持久化的原理和操作

    目录 写在前面 1.电商架构方案 1.1.页面静态化 2.大型网站 3.缓存架构 3.redis持久化对容灾意义 3.1.redis持久化意义 4.RDB和AOF 4.1.RDB和AOF两种持久化机制 ...

  6. 关于Linux中的apt-get的相关操作及原理

    关于Linux中的apt-get的相关操作及原理 Linux下的apt-get指令与相关文件夹 apt-get是linux下的一种简便的安装和更新软件的方法,在装软件的时候常用的命令就是 sudo a ...

  7. 《自然语言处理实战入门》 第三章 :中文分词原理及相关组件简介 ---- 语言学与分词技术简介

    文章大纲 0.内容梗概 1. 汉语语言学简介 1.1 汉语与汉字的起源 1.2 汉字的统一与演变 1.3 印欧语系与汉藏语系 1.4 语言区别对于NLP 的影响 2. 词汇与分词技术简介 2.1 汉语 ...

  8. Git入门与使用 (三) 使用GitHub进行代码托管的相关操作

    文章目录 一.前言 二.使用GitHub进行代码托管的相关操作 1.推送本地仓库内容至远程仓库 2.克隆远程仓库内容至本地仓库 3.邀请他人加入项目团队 4.拉取远程仓库修改的内容 5.解决协同开发时 ...

  9. rabbitmq消息队列,消息发送失败,消息持久化,消费者处理失败相关

    转:https://blog.csdn.net/u014373554/article/details/92686063 项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败. ...

最新文章

  1. 数据结构与算法---队列
  2. centos7安装Filebeat采集日志文件存到Elasticsearch
  3. go 分段锁ConcurrentMap,map+读写锁,sync.map的效率测试
  4. 【CVPR2019】Workshops 研讨会列表及链接
  5. 为什么我不推荐敏捷开发?
  6. C 和 C ++ 再活 45 年不是梦
  7. linux---多线程---信号量--不懂
  8. zynq文档阅读pg144-axi-gpio之AXI GPIO IP核
  9. linux 谷歌浏览器设置代理_Linux用谷歌浏览器模拟手机访问
  10. 问什么说我的计算机主机名无效,为什么arcgis的许可管理器 修改为主机时,总是说我输入的是无效主机名...
  11. rpg服务器无限刷金币bug,魔兽世界怀旧服:邮箱交易BUG无限刷金币?小号回档一次1000金!...
  12. 2022年国庆节水篇
  13. Netkeeper联网时报pppoe拨号模块损坏
  14. 计算机视觉算法与应用 英文pdf,计算机视觉:算法与应用(套装共2册) [Computer Vision:Algorithms and Applications]...
  15. 【转】用户管理模块:如何保证用户数据安全?
  16. 联合会杯-内马尔破门锋霸2球 巴西3-0西班牙3连冠
  17. 甘肃阿克塞百余只“岩壁精灵”雪中觅食
  18. Java练习题——超市贴花
  19. UNIX痛恨者手册[转载]
  20. 分享一个基于QT的自定义串口助手

热门文章

  1. 关于Cocos2d-x的专属数据类型
  2. 求一个数字中1的个数
  3. iOS程序UI主线程和定时器相互阻塞的问题
  4. Java与Http协议
  5. 牛客18987 粉嘤花之恋(矩阵快速幂、斐波那契数列)
  6. 牛客16464 神奇的幻方
  7. 管理计算机域内置账户改为用户账户,“管理计算机(域)的内置帐户”我给它改了名...
  8. 三菱fx2n64mr说明书_FX2N-64MR-001原理及应用三菱FX2N-64MR-001使用说明书 - 三菱
  9. php读取某类型文件代码,php代码实现读取文件头判断文件类型
  10. 服务器数据库2008怎么备份数据库文件,怎么备份SQL Server2008数据库