在日常工作中使用RabbitMQ偶尔会遇不可预料的情况导致的消息积压,一般出现消息积压基本上分为几种情况:

  1. 消费者消费消息的速度赶不上生产速度,这种问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需要改业务流程或逻辑以保证消费度跟上生产消息的速,譬如增加消费者的数量等。
  2. 消费者出现异常,导致一直无法接收新的消息,这种问题需要排查消费的逻辑是不是有问题,需要优化程序。

除了上面的这两种问题,还有一些其他情况会导致消息积压,譬如一些系统是无法预计成产消息的速度和频率,又或者消费者的速度已经被限制,不能通过加新的消费者来解决,譬如不同的系统间的API对接,对接那一方就做了请求频率的限制,或者对方系统承受不了太大的并发,还有一些系统如果是面对企业客户,譬如电商,物流,仓储等类似平台系统的客户的下单是没有规律的或者集种某一个时间段下单的,这种就不能简单的通过加消费者来解决,就需要分析具体业务来避免消息积压。

针对这种情况,我想到了4中解决思路:

  1. 拆分MQ,生产者一个MQ,消费者一个MQ,写一个程序监听生产者的MQ模拟消费速度(譬如线程休眠),然后发送到消费者的MQ,如果消息积压则只需要处理生产者的MQ的积压消息,不影响消费者MQ
  2. 拆分MQ,生产者一个MQ,消费者一个MQ,写一个程序监听生产者的MQ,定义一个全局静态变量记录上一次消费的时间,如果上一次时间和当前时间只差小于消费者的处理时间,则发送到一个延迟队列(可以使用死信队列实现)发送到消费者的MQ,如果消息积压则只需要处理生产者的MQ的积压消息,不影响消费者MQ
  3. 使用Redis的List或ZSET做接收消息缓存,写一个程序按照消费者处理时间定时从Redis取消息发送到MQ
  4. 设置消息过期时间,过期后转入死信队列,写一个程序处理死信消息(重新如队列或者即使处理或记录到数据库延后处理)

其中使用延时队列会相对来说逻辑简单,业务逻辑变更也不大,在RabbitMQ中,可使用死信来及延时队列插件rabbitmq_delayed_message_exchange两种方式实现延时队列。
使用插件可以在官网找到:https://www.rabbitmq.com/community-plugins.html

插件的安装及使用方式就不做介绍了,主要介绍下使用死信来实现延时队列,原理就是将消息发送到一个死信队列,并设置过期时间,过期后将私信转发到要处理的消息队列。
生产者相关代码:

          ///         /// 发送延时队列消息        ///         ///         ///         /// 默认20        public void SendDelayQueues(string message, string queueName,double delayMilliseconds,string beDeadLetterPrefix="beDeadLetter_")        {            #region 死信到期后转入的交换机及队列            //死信转入新的队列的路由键(消费者使用的路由键)            var routingKey = queueName;            var exchangeName = queueName;            //定义队列            Channel.QueueDeclare(queue: queueName,                durable: true,                exclusive: false,                autoDelete: false,                arguments: null);            //定义交换机            Channel.ExchangeDeclare(exchange: exchangeName,                type: "direct");            //队列绑定到交换机            Channel.QueueBind(queue: queueName,                exchange: exchangeName,                routingKey: routingKey);            #endregion            //将变成死信的队列名            var beDeadLetterQueueName = beDeadLetterPrefix + queueName;            //将变成死信的交换机名            var beDeadLetterExchangeName = beDeadLetterPrefix + queueName;            //定义一个有延迟的交换机来做死信(该消息不能有消费者,不然无法变成死信)            Channel.ExchangeDeclare(exchange:beDeadLetterExchangeName ,                type: "direct");                        //定义该延迟消息过期变成死信后转入的交换机(消费者需要绑定的交换机)            //Channel.ExchangeDeclare(exchange: queueName,type: "direct");            var dic = new Dictionary();            //dic.Add("x-expires", 30000);            //dic.Add("x-message-ttl", 12000);//队列上消息过期时间,应小于队列过期时间              dic.Add("x-dead-letter-exchange", queueName);//变成死信后转向的交换机            dic.Add("x-dead-letter-routing-key",routingKey);//变成死信后转向的路由键            //定义将变成死信的队列            Channel.QueueDeclare(queue: beDeadLetterQueueName,                durable: true,                exclusive: false,                autoDelete: false,                arguments: dic);            //队列绑定到交换机            Channel.QueueBind(queue: beDeadLetterQueueName,                exchange: beDeadLetterExchangeName,                routingKey: routingKey);            //不要同时给一个消费者推送多于prefetchCount个消息, ushort prefetchCount = 20            //Channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);            var body = Encoding.UTF8.GetBytes(message);            var properties = Channel.CreateBasicProperties();            properties.Persistent = true;            properties.DeliveryMode = 2;//持久化消息            //过期时间            properties.Expiration = delayMilliseconds.ToString();            Channel.BasicPublish(exchange: beDeadLetterExchangeName,                routingKey: routingKey,                basicProperties: properties,                body: body);        }

消费者相关代码:

        ///         /// 设置延迟队列接收的事件        ///         ///         ///         /// 默认1        ///         ///         public void SetDelayQueuesReceivedAction(Action action, string queueName, ushort prefetchCount = 1,            bool autoAck = false, int consumerCount = 1)        {            if (prefetchCount < 1)            {                throw new Exception("consumerCount must be greater than 1 !");            }            var exchangeName = queueName;            var routingKey = queueName;            for (int i = 0; i < consumerCount; i++)            {                var Channel = Connection.CreateModel();                //定义队列                Channel.QueueDeclare(queue: queueName,                    durable: true,                    exclusive: false,                    autoDelete: false,                    arguments: null);                //定义交换机                Channel.ExchangeDeclare(exchange: exchangeName,                    type: "direct");                //队列绑定到交换机                Channel.QueueBind(queue: queueName,                    exchange: exchangeName,                    routingKey: routingKey);                //不要同时给一个消费者推送多于prefetchCount个消息                Channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);                ChannelList.Add(Channel);                var consumer = new EventingBasicConsumer(Channel);                consumer.Received += (model, ea) =>                {                    var body = ea.Body.ToArray();                    var message = Encoding.UTF8.GetString(body);                    //Console.WriteLine("处理消费者ConsumerTag:" + ea.ConsumerTag);                    action(message);                    //手动确认消息应答                    Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);                };                //autoACK自动消息应答设置为false                Channel.BasicConsume(queue: queueName, autoAck: autoAck, consumer: consumer);            }        }

如果感觉本文对你有帮助关注我一起学习进步

rabbitmq 限制速度_技术干货分享:RabbitMQ消息积压的几种解决思路相关推荐

  1. redis主从复制_技术干货分享:一文了解Redis主从复制

    Redis主从复制 主从复制 主从复制,将一台Redis服务器的数据,复制到其他Redis服务器.前者称为主(master)节点,后者称为从(slave)节点 . 在默认的情况下,Redis都是主节点 ...

  2. rabbitmq 限制速度_=(:) RabbitMQ详解

    本篇包含了RabbitMQ概念的一些东西,下篇会整理出SpringBoot结合RabbitMQ的使用案例. 目录 一.MQ概述 1.什么是消息 2.什么是消息队列 3.MQ的特点 二.MQ适用场景 1 ...

  3. 【线下沙龙免费报名】像阿里巴巴一样工作_听干货分享欣赏阿里西溪园区风景

    阅读原文请点击 [杭州] Work Like Alibaba,你也可以,像阿里巴巴一样工作 云栖社区大数据云市场云服务阿里云创业大学 活动时间:2017-08-26 13:30 ~ 18:30 报名时 ...

  4. python 自动化运维 读取交换机数据_技术干货|数据中心自动化运维技术探索之交换机零配置上线...

    近几年来,互联网行业处于一个快速发展的快车道,一个又一个风口不断地涌向周边行业.共享单车的出现解决了人们出行***一公里的问题:新零售概念的提出,无人货柜的出现,更是将线上和线下的数据打通,优化了人们 ...

  5. 数据库迁移_【干货分享】DM数据库迁移方法(物理迁移)

    在数据库的维护过程中,可能涉及换服务器,或者需要现网数据库环境测试的情况,这时,最简单快速的办法就是将源数据库相关的文件拷贝到目标主机,然后注册数据库实例服务.这就是数据库的物理迁移过程,可以是从wi ...

  6. 开关电源环路的零极点可以在反馈端补偿吗_【干货分享】开关电源环路补偿设计步骤讲解...

    微信公众号:硬核电子 ps:本文撰写过程较为漫长,要画各种示意图和编辑公式,如果你觉得文章不错的话,就请点个赞同吧. ​1. 理论讲解 在上一篇文章中电子小白菜:[干货分享]轻松弄懂开关电源TL431 ...

  7. keepalived vip ping不通_【干货分享】OpenStack LVS负载均衡为什么不通?

    背 景 介 绍 OpenStack环境Neutron 的安全组会向虚拟机默认添加 anti-spoof 的规则,将保证虚拟机只能发出/接收以本机Port为原地址或目的地址(IP.MAC)的流量,提高了 ...

  8. 太硬核了!十天十场技术干货分享,其中有一个大佬还是我的死党!

    引言 14 世纪的欧洲,佛罗伦萨瘟疫盛行,10名男女在乡村一所别墅里避难.他们终日游玩欢宴,每人每天讲一个故事,共住了10天讲了百个故事.这就是文艺复兴时期的第一部现实主义巨著--<十日谈> ...

  9. rabbitmq 限制速度_关于消息队列速率的解决方案

    消息队列在执行过程中, 如何统计消息队列执行一轮的时间以及效率呢? 如果消息队列中的任务变多, 则需要对应增加消费进程, 保证队列不被堆积. 一.一般消息队列生产和消费类型 1. 一次性任务消费 从某 ...

最新文章

  1. 参照STM32时钟树配置STM32CubeMX Clock Configuration(STM32L011G4U6为例)
  2. 编写文档_如何通过编写优质文档来使自己的未来快乐
  3. HTML5 3D旋转图片相册
  4. linux 文件和打印机共享文件夹,linux服务器向windows客户端提供文件/目录及打印机共享...
  5. php中控制面板折叠,微信小程序折叠面板的实现方法示例
  6. nyoj543遥控器
  7. 科大星云诗社动态20210305
  8. 基于JFinal框架开发的企业办公系统-JFinalOA v1.1源码
  9. 开启admin$共享
  10. php如何定义变量,它和c# 等语言有什么不同呢?,PHP 变量和常量的定义
  11. 循环队列的创建Java_Java版-数据结构-队列(循环队列)
  12. 交换机中tag、untag的理解
  13. php led显示屏控制软件下载,中航led控制软件
  14. python系列3—顺序结构和分支结构
  15. 疯狂springboot终极讲义笔记(一)
  16. 6.7 【实例】窗口查看器
  17. MT6737芯片技术资料集锦下载
  18. 基于Spring Boot框架的驾校学员信息管理系统
  19. unity3d俄罗斯方块源码教程+源码和程序下载
  20. manjaro安装配置美化

热门文章

  1. 网络爬虫--SAX处理xml
  2. pytorch VIF(VIT 改)快了两倍
  3. 六层感知神经网络系统
  4. 研究人工智能最应该注意的问题
  5. OGNL中的s:property /标签
  6. 什么叫编译时和运行时
  7. inux 后台执行命令
  8. Symfony2 - paginator bundle 复杂查询时候报错解决
  9. c++,不能声明为虚函数的函数
  10. (转)oracle 11g安装后用户名忘记怎么办