什么是消费端的·限流?

  1. 假设一个场景,首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况:
  2. 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!

注意:高并发情景下,生产端我们没办法做限制,所以我们只能限制消费端,防止消费端口资源耗尽。

RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。


消费端的方法
void BasicQos(uint prefetchSize, ushort prefetchCount,bool global)


prefetchSize: 0
prefetchCount : 最多消费的消息个数 ,一般设置为1
global : channel 级别或者 消费端级别,一般设置为false

注意:自动设置签收一定要设置为false,autoAck = false

代码如下:

package com.bfxy.rabbitmq.api.limit;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");
//      connectionFactory.setHost("192.168.43.223");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_qos_exchange";String queueName = "test_qos_queue";String routingKey = "qos.#";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);// 0代表不限制大小,1条结束再给我,false代表应用到consumer级别channel.basicQos(0, 1, false);//1 限流方式  第一件事就是 autoAck设置为 false , 第二个参数channel.basicConsume(queueName, false, new MyConsumer(channel));}
}
package com.bfxy.rabbitmq.api.limit;import java.io.IOException;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;public class MyConsumer extends DefaultConsumer {private Channel channel ;public MyConsumer(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));// false 不批量签收//     channel.basicAck(envelope.getDeliveryTag(), false); 可以注释试试channel.basicAck(envelope.getDeliveryTag(), false);}}

生产者代码

package com.bfxy.rabbitmq.api.consumer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");
//      connectionFactory.setHost("192.168.43.223");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_consumer_exchange";String routingKey = "consumer.save";String msg = "Hello RabbitMQ Consumer Message";//这里可以改成 50000 , 可以看出来mq消费还是很快的for(int i =0; i<5; i ++){channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}

结果如下:消费者正常消费5条信息

消费端不签收的话:
结果如下:只能有一条,其他的都在ready

消息中间件--RabbitMQ --- 消费端限流 -- 非常重要相关推荐

  1. 面试官:说说RabbitMQ 消费端限流、TTL、死信队列

    欢迎关注方志朋的博客,回复"666"获面试宝典 1. 为什么要对消费端限流 假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户 ...

  2. RabbitMQ(七):RabbitMQ 消费端限流、TTL、死信队列是什么?

    消费端限流 1. 为什么要对消费端限流 假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我 ...

  3. RabbitMQ 消费端限流、TTL、死信队列

    目录 消费端限流 1. 为什么要对消费端限流 2.限流的 api 讲解 3.如何对消费端进行限流 TTL 1.消息的 TTL 2.队列的 TTL 死信队列 实现死信队列步骤 总结 消费端限流 1. 为 ...

  4. 面试官:RabbitMQ怎么实现消费端限流

    哈喽!大家好,我是小奇,一位不靠谱的程序员 小奇打算以轻松幽默的对话方式来分享一些技术,如果你觉得通过小奇的文章学到了东西,那就给小奇一个赞吧 文章持续更新,可以微信搜索[小奇JAVA面试]第一时间阅 ...

  5. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  6. rabbitmq 不同的消费者消费同一个队列_RabbitMQ 消费端限流、TTL、死信队列

    消费端限流 1. 为什么要对消费端限流 假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我 ...

  7. RocketMQ 消费端限流

    RocketMQ 消费端限流 RocketMQ 消费端限流 首先我们要明白为什么需要限流?如果不使用限流呢? 通常情况下,当客户端生产的消息很多时,消费者消费消息速度低于生产者消费速度,我们该如何解决 ...

  8. RabbitMQ消费端停止问题

    RabbitMQ消费端过段时间会停止消费,为了解决这个问题: 可以在消费端的首页添加计时器进行刷新操作,这样消费端就可以一直工作. <script type="text/javascr ...

  9. Rabbitmq专题:rabbitmq消费端如何做限流?

    文章目录 1. 什么是消费端的限流? 2. 解决方案 3. 代码示例 1. 什么是消费端的限流? 场景:在订单高峰期,rabbitmq上已经堆积了很多消息等待消费,如果没有任何限流措施,贸然启动一个消 ...

最新文章

  1. 《LeetCode力扣练习》第31题 下一个排列 Java
  2. Java子线程中的异常处理(通用)
  3. 二分图模板(女生赛要用)
  4. Oracle启动操作
  5. C语言:fopen与open的总结
  6. 做数据产品经理要学习那些东西?
  7. gethostbyname与sockaddr_in的完美组合
  8. vue延迟渲染组件_做一个可复用的 echarts-vue 组件(延迟动画加载)
  9. 虚拟机CentOS系统没有UNIX2dos或dos2UNIX命令的解决方案(参考各路大佬后的总结)
  10. linux/windows对应的软件
  11. ping命令显示的TTL是什么意思
  12. 三种振幅调制AM、DSB、SSB
  13. Jenkins docker下JNLP slave节点远程连接报错port not reachable的解决
  14. 2u服务器支持29块硬盘,01-正文
  15. Win11快捷键切换输入法无反应怎么办?快捷键切换输入法没有反应
  16. 为什么你的同龄人在抛弃你?还要等着被谁抛弃?
  17. Z-score与修正的Z-score评分识别异常
  18. nslookup命令详解和实战例子(全)
  19. Spark 性能常规性能调优广播大变量_大数据培训
  20. tensorflow标签向量化

热门文章

  1. Scala的partition函数
  2. excel重复上一步快捷键_工作再忙也要学会的十个Excel快捷键
  3. python3怎么使用pyrex_用户指南 - Cython 和 Pyrex 之间的区别 - 《Cython 3.0 中文文档》 - 书栈网 · BookStack...
  4. 苹果将iOS应用带入macOS
  5. 《C++编程惯用法——高级程序员常用方法和技巧》——2.7 Const
  6. 实验8 SQLite数据库操作
  7. Elixir:可能成为下一代Web开发语言
  8. linux进程状态浅析
  9. IE Automation Tabs
  10. Web应用——焦点图自动浏览