RabbitMQ多消费者消息分配
一、 轮询分配
当有多个消费者同时监听一个队列时,RabbitMQ默认将消息逐一顺序分配给各消费者,该消息分配机制称为轮询(Round-Robin)。
为验证该机制,建立两个消费者,同时监听同一队列,消息生产者连续向队列中发送20条消息,查看消息的分配状况。
//生产者
public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 设置服务端的地址、端口、用户名和密码...Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("Queue_Java", false, false, false, null);for(int i = 0; i < 20; i++) {byte[] message = ("message" + i).getBytes();channel.basicPublish("", "Queue_Java", null, message);}channel.close();connection.close();
}//消费者
public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 设置服务端的地址、端口、用户名和密码...Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("Queue_Java", false, false, false, null);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {String message = new String(body);System.out.println("Received: " + message);try {channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();}}};// 标识进程,第二个消费者将输出内容改为“Consumer2:”,再次运行程序即可System.out.println("Consumer2:");channel.basicConsume(QUEUE_NAME, false, consumer);
}
运行结果如下:
第一个消费者收到了所有偶数号的消息,第二个消费者收到了所有奇数号的消息,消息被顺序分配给了两个消费者。
二、 消息预取
消息转发到队列后,分配是提前一次性完成的,即RabbitMQ尽可能快速地将消息推送至客户端,由客户端缓存本地,而并非在消息消费时才逐一确定。再加入新的消费者时,队列已经为空,即使前面的消费者未处理完消息,新加入的消费者也不会接收到。
为验证该结论,在消费者处理消息的方法中,加入线程休眠。首先启动2个消费者,生产者将20个消息发送完毕后,断开2号消费者,启动3号消费者,观察消息消费情况。
public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 设置服务端的地址、端口、用户名和密码...Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("Queue_Java", false, false, false, null);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {String message = new String(body);System.out.println("Received: " + message);try {Thread.sleep(5000); // 加入线程休眠channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();}}};System.out.println("Consumer1:");channel.basicConsume(QUEUE_NAME, false, consumer);
}
程序运行结果:
可见,中途断开的2号消费者所应消费的余下的奇数号消息,既未分配给新加入的3号消费者,也未交给发送消息前已建立连接的1号消费者。
三、 公平分配
消息的轮询分配机制和尽可能快速推送消息的机制给实际使用带来困难。实际情况下,每个消费者处理消息的能力、每个消息处理所需时间可能都是不同的,若只是机械化地顺次分配,可能造成一个消费者由于处理的消息的业务复杂、处理能力低而积压消息,另一个消费者早早处理完所有的消息,处于空闲状态,造成系统的处理能力的浪费。且无法加入新的消费者以提高系统的处理能力。
希望达到的效果是每个消费者都根据自身处理能力合理分配消息处理任务,既无挤压也无空闲,新加入的消费者也能分担消息处理任务,使系统的处理能力能够平行扩展。
RabbitMQ客户端可通过Channel类的basicQos(int prefetchCount)设置消费者的预取数目,即消费者最大的未确认消息的数目。
假设prefetchCount=10,有两个消费者,两个消费者依次从队列中抓取10条消息缓存本地,若此时有新的消息到达队列,先判断信道中未确认的消息是否大于或等于20条,若是,则不向信道中投递消息,当信道中未确认消息数小于20条后,信道中哪个消费者未确认消息小于10条,就将消息投递给哪个消费者。
设置信道的预取数量为1,重复5.1.2节的测试。
public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 设置服务端的地址、端口、用户名和密码...Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("Queue_Java", false, false, false, null);// 设置预取数量为1channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {String message = new String(body);System.out.println("Received: " + message);try {Thread.sleep(5000); // 加入线程休眠channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {e.printStackTrace();}}};System.out.println("Consumer1:");channel.basicConsume(QUEUE_NAME, false, consumer);
}
程序运行结果(消费者2在打印5号消息后、返回确认前被停止):
在停止消费者2、加入消费者3后,消息被平均分配给消费者1和3,达到了所需的效果。
还可将两个消费者的休眠时间设为不同值(代表不同的处理消息耗时),观察运行情况。消息将会按照消息处理速度的比例分配给两个消费者,达到了消息均衡分配的效果。
四、 预取数量的优化
channel.basicQos()中设置的预取数量多少合适,是一个颇有讲究的问题。我们希望充分利用消费者的处理能力,因此不宜设置过小,否则在消费者处理消息后,RabbitMQ收到确认消息后才会投递新的消息,导致此期间消费者处于空闲状态,浪费消费者的处理能力;但设置过大,又可能使消息积压在消费者的缓存里,我们希望对于来不及处理的消息,应保留在队列中,便于加入新的消费者或空闲出来的消费者分担消息处理任务。
RabbitMQ官网的一篇文章详细讨论了预取数量的设置问题:
https://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/
文章大致内容如下。
假设从RabbitMQ服务端队列取消息、传输消息到消费者耗时为50ms,消费者消费消息耗时4ms,消费者传输确认消息到服务端耗时为50ms。若网络状况、消费者处理速度稳定,则预取数量的最优数值为:(50 + 4 + 50)/4=26个。
最初服务端将向客户端发送26条消息,并缓存在客户端本地,当消费者处理好第一个消息后,向服务端发送确认消息并取本地缓存的第二个消息,确认消息由客户端传送到服务端耗时50ms,服务端收到确认后发送新的消息经过50ms又到达了客户端,而余下的25个消息被消费耗时为25×4=100ms,所以当新的消息达到时,第一轮的26个消息恰好全部处理完。依次类推,之后,每当处理完一个旧有的消息时,恰好会到达一个新的消息。既不会发生消息积压,消费者也不会空闲。
但实际情况是,网络的传输状况、消费者处理消息的速度都不会是恒定的,会时快时慢,造成消息积压或消费者空闲,这就要求预取数量要与网络和消费者的状况实时改变。
新近发表的一个称作“Controlled Delay”(控制延迟?)算法(参见https://queue.acm.org/detail.cfm?id=2209336),能够较好地解决此问题。作者实现了其Java版本:
https://gist.github.com/2658712
文章中说明了其中的参数,有兴趣者可自行研究。
RabbitMQ多消费者消息分配相关推荐
- rabbitmq怎样确认是否已经消费了消息_【朝夕专刊】RabbitMQ生产者/消费者消息确认...
欢迎大家阅读<朝夕Net社区技术专刊> 我们致力于.NetCore的推广和落地,为更好的帮助大家学习,方便分享干货,特创此刊!很高兴你能成为忠实读者,文末福利不要错过哦! 上篇文章介绍了R ...
- java实现rabbitmq任务模型(work queues), 生产者 消费者 消息队列 能者多劳
work queues也成为task queues,任务模型.当消息处理比较耗时的时候,可能生产消息的速度远远大于消费速度,长此以往,消息就会堆积,无法及时处理.此时,就恶意使用work模型,让多个消 ...
- RabbitMQ学习之消息可靠性及特性
转载自 https://blog.csdn.net/zhu_tianwei/article/details/53971296 下面主要从队列.消息发送.消息接收方面了解消息传递过的一些可靠性处理. ...
- rabbitmq 拉取消息太慢_面试官:消息队列这些我都要问
作者:mousycoder segmentfault.com/a/1190000021054802 消息队列连环炮 项目里怎么样使用 MQ 的? 为什么要使用消息队列? 消息队列有什么优点和缺点? k ...
- (转)RabbitMQ学习之消息可靠性及特性
http://blog.csdn.net/zhu_tianwei/article/details/53971296 下面主要从队列.消息发送.消息接收方面了解消息传递过的一些可靠性处理. 1.队列 ...
- Rabbitmq简单模式和消息的手动应答以及Rabbitmq持久化
Hello模式 在idea中新建一个空工程 设置项目 添加模块 选择模块类型 设置模块 在pom文件中导入jar包依赖 书写生产者代码: public class HelloProduct {// 创 ...
- RabbitMQ如何保证消息发送、消费成功
好记忆不如烂笔头,能记下点东西,就记下点,有时间拿出来看看,也会发觉不一样的感受. 目录 1.发送确认机制设置 2.消息丢失.非信任或失败 3.消息重复消费 4.消费成功通知 5.总结 消息因为其:削 ...
- RabbitMQ如何防止消息丢失及重复消费
RabbitMQ目录 文章目录 RabbitMQ如何防止消息丢失及重复消费 一.消息丢失 1.1.生产者没有成功把消息发送到MQ 1.1.1.confirm(发布确认)机制 1.1.2.事务机制 1. ...
- 2.RabbitMQ 的可靠性消息的发送
本篇包含 1. RabbitMQ 的可靠性消息的发送 2. RabbitMQ 集群的原理与高可用架构的搭建 3. RabbitMQ 的实践经验 上篇包含 1.MQ 的本质,MQ 的作用 2.R ...
最新文章
- Android系统编译so库提示error undefined reference to '__android_log_print问题的解决
- 自欺欺人的使用 NSTimer 销毁
- android h5 书,android与H5交互
- 【Python】分享几个好用到爆的Python内置模块
- 数学建模国赛 常考赛题类型(模拟退火算法、粒子群算法、遗传算法)
- Android -- AudioPlayer
- Java 实现固定长度队列,自动删除最早添加的数据
- 英语每日阅读---3、VOA慢速英语(翻译+字幕+讲解):哈佛大学被控歧视亚裔学生
- Oauth协议是否会泄露用户的密码
- lisp 设计盘形齿轮铣刀_钨钢铣刀制造厂
- [总结] Min-Max容斥学习笔记
- c语言编程软件我的世界,我的世界(Minecraft)
- C语言选择循环练习题
- openwrt编译smartdns_【萌新理解交流】浅谈openWRT中的smartDNS中各个选项如何设置及其含义。...
- Spring Security源码解析(一)
- 教程:如何成为日入斗金NFT数字艺术创作家
- 10个顶级商业思维:如何升级思维模式突破认知,让自己快速成长
- 美国国土安全部重点努力加强国家关键基础设施的网络安全
- 源码分析 There is no getter for property named '*' in 'class java.lang.String
- MyCAT数据库分片(一)
热门文章
- git管理github仓库详解
- Leetcode每日一题:345.reverse-vowels-of-a-string(反转字符串中的元音字母)
- 吴恩达机器学习【第一天】
- 第十三:Pytest参数化-@pytest.mark.parametrize装饰器来实现数据驱动测试
- 【八】有验证码登录配置:通过 Cookie 跳过验证码登录接口
- 【五】Jmeter:函数助手
- 服务器电源维修哪里便宜,服务器电源维修
- linux脚本程序是什么意思,什么是shell脚本编程?
- python for语句_从零开始py个thon3:循环语句(1)
- 利用Docker一键部署若依前后端分离项目详细教程