Rabbitmq专题:rabbitmq消费端如何做限流?
文章目录
- 1. 什么是消费端的限流?
- 2. 解决方案
- 3. 代码示例
1. 什么是消费端的限流?
场景:在订单高峰期,rabbitmq上已经堆积了很多消息等待消费,如果没有任何限流措施,贸然启动一个消费者时,如此多的消息瞬间推送给消费者,消费者可能因无法处理这么多的消息而承受巨大压力,甚至崩溃!
2. 解决方案
rabbitmq 提供了basicQos方法实现了限流,也就是在关闭了消费端的自动ack的前提 下,我们可以设置阈值(出队)的消息数。 没有手动确认,那么就不会推送新的消息过来!可以有效防止消费者压力过大而崩溃
/*** 限流设置: prefetchSize:每条消息大小的设置,0是无限制* prefetchCount:标识每次推送多少条消息 一般是一条* global:false标识channel级别的 true:标识消费者级别的*/channel.basicQos(0,1,false);
3. 代码示例
生产者:
public class Producer {public static void main(String[] args) throws Exception{// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setPort(5672);factory.setHost("192.168.200.130");factory.setUsername("mqs");factory.setPassword("mqs123");// 2、创建连接Connection connection = factory.newConnection();// 3、获取通道Channel channel = connection.createChannel();// 4、声明交换机和路由String exchangeName = "limit_exchange";String routingKey = "limit.key";//消息体String msg = "send message test limit mandatory ";// 5、生产者发送消息for (int i = 0; i < 6; i++){channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());}}
}
消费者:
public static void main(String[] args)throws Exception {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setPort(5672);factory.setHost("192.168.200.130");factory.setUsername("mqs");factory.setPassword("mqs123");// 2、创建连接Connection connection = factory.newConnection();// 3、获取通道Channel channel = connection.createChannel();// 4、声明String exchangeName = "limit_exchange";String routingKey = "limit.key";String exchangeType = "direct";String queueName = "limit_queue";// 5、声明一个交换器channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);// 6、声明一个队列channel.queueDeclare(queueName, true, false, false, null);// 7、绑定队列到交换器channel.queueBind(queueName, exchangeName, routingKey);/*** 限流设置: * prefetchSize:每条消息大小的设置,0是无限制* prefetchCount:标识每次推送多少条消息 一般是一条* global:false标识channel级别的 true:标识消费者级别的*/channel.basicQos(0, 1, false);// 8、消费者,要想做限流必须将自动ack设置为false,代表手动ack,一条条的消费// MyConsumer 自定义消费者channel.basicConsume(queueName, false, new MyConsumer(channel));}
}
MyConsumer 自定义消费者:
public class MyConsumer extends DefaultConsumer {private Channel channel;public MyConsumer(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==handle message====");System.out.println("==consumerTag: " + consumerTag);System.out.println("==envelope: " + envelope);System.out.println("==properties: " + properties);System.out.println("==body: " + new String(body));/*** multiple:false标识不批量签收* 这行代码如果注释掉,则只打印一条消息,因为没有手动ack* 这行代码如果放开,打印所有消息,但也是一条一条消费的!*/channel.basicAck(envelope.getDeliveryTag(), false);}
}
Rabbitmq专题:rabbitmq消费端如何做限流?相关推荐
- 面试官:RabbitMQ怎么实现消费端限流
哈喽!大家好,我是小奇,一位不靠谱的程序员 小奇打算以轻松幽默的对话方式来分享一些技术,如果你觉得通过小奇的文章学到了东西,那就给小奇一个赞吧 文章持续更新,可以微信搜索[小奇JAVA面试]第一时间阅 ...
- Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?
优先级队列 方式一:可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority 方式二:代码设置 Map<String,Object> args = new ...
- 系统不做限流,我看你是对中国人口数量有什么误解
目录 限流 怎么做限流? 获得系统能力上限.处理被限制流量 具体如何限流? 固定窗口 滑动窗口 漏桶 令牌桶 做限流的最佳实践 分布式系统中带来的新挑战 纵 在软件架构领域,"限流" ...
- 面试官 | 讲一下如何给高并发系统做限流?
作者 | nick hao 来源 | uee.me/cDuRD 在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.本文结合作者的一些经验介绍限流的相关概念.算法和常规的实现方式. 缓存 缓存 ...
- 程序员修神之路--高并发优雅的做限流(有福利)
点击上方蓝色字体,关注我们 菜菜哥,有时间吗? YY妹,什么事? 我最近的任务是做个小的秒杀活动,我怕把后端接口压垮,X总说这可关系到公司的存亡 简单呀,你就做个限流呗 这个没做过呀,菜菜哥,帮妹子写 ...
- 【Guava】使用Guava的RateLimiter做限流
2019独角兽企业重金招聘Python工程师标准>>> 一.常见的限流算法 目前常用的限流算法有两个:漏桶算法和令牌桶算法. 1.漏桶算法 漏桶算法的原理比较简单,请求进入到漏桶中, ...
- 1命名规则 sentinel_Sentinel实战:为系统做限流保护
我们已经知道了 Sentinel 的三大功能:限流 降级 系统保护.现在让我们来了解下具体的使用方法,以限流来演示具体的步骤. 引入依赖 首先肯定是要先引入需要的依赖,如下所示: com.alibab ...
- 使用Guava的RateLimiter做限流
一.问题描述 某天A君突然发现自己的接口请求量突然涨到之前的10倍,没多久该接口几乎不可使用,并引发连锁反应导致整个系统崩溃.如何应对这种情况呢?生活给了我们答案:比如老式电闸都安装了保险丝,一旦 ...
- 1-8 (4). RabbitMQ高级特性-消费端ACK
Consumer ACK 指Acknowledge,确认 有三种方式: (1)自动确认:acknowledge="none"(默认) (2)手动确认:acknowledge=&qu ...
最新文章
- 如何利用 notedown 完成 ipynb与markdown之间的格式转换?
- MSM8974 fastboot烧写软件
- skynet 报错 skynet 服务缺陷 Lua死循环
- boost::intrusive::auto_unlink_hook用法的测试程序
- 解决crontab 定时任务加载失败
- 修改Windows 2003/2008/2012远程桌面服务端口号
- 腾讯视频已上线超前点播选集解锁
- 数据仓库工具箱维度建模权威指南-第一章 数据仓库、商业智能及维度建模初步
- 三轴合并_用两套乐高60107合并成铰接式云梯消防车,看看和60112有什么区别
- 《HTML与CSS设计》课程总结,网页设计课程学习心得总结
- 29 伪造ICMP数据包的IP层
- 实验四:AC与AP间VLAN配置实验
- NES模拟器开发笔记(001)缘起、资料及开发准备
- 随记 C#读取TXT文件乱码
- Excel不用函数嵌套一键完成四舍六入五成双
- java 二元组_java里有类似于二元组之类的数据结构么?
- 14、MyBatis-Plus入门到进阶
- C# Hash字符串
- mount挂载基础点
- 计算机中丢失vba,打开Excel的时候提示visual basic项目错误导致VBA模块代码丢失