文章目录

  • 1. 什么是消费端的限流?
  • 2. 解决方案
  • 3. 代码示例

1. 什么是消费端的限流?

场景:在订单高峰期,rabbitmq上已经堆积了很多消息等待消费,如果没有任何限流措施,贸然启动一个消费者时,如此多的消息瞬间推送给消费者,消费者可能因无法处理这么多的消息而承受巨大压力,甚至崩溃!

2. 解决方案

rabbitmq 提供了basicQos方法实现了限流,也就是在关闭了消费端的自动ack的前提 下,我们可以设置阈值(出队)的消息数。 没有手动确认,那么就不会推送新的消息过来!可以有效防止消费者压力过大而崩溃

 /*** 限流设置:  prefetchSize:每条消息大小的设置,0是无限制* prefetchCount:标识每次推送多少条消息 一般是一条* global:false标识channel级别的  true:标识消费者级别的*/channel.basicQos(0,1,false);

3. 代码示例

生产者:


public class Producer {public static void main(String[] args) throws Exception{// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setPort(5672);factory.setHost("192.168.200.130");factory.setUsername("mqs");factory.setPassword("mqs123");// 2、创建连接Connection connection = factory.newConnection();// 3、获取通道Channel channel = connection.createChannel();// 4、声明交换机和路由String exchangeName = "limit_exchange";String routingKey = "limit.key";//消息体String msg = "send message test limit mandatory ";// 5、生产者发送消息for (int i = 0; i < 6; i++){channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());}}
}

消费者:

    public static void main(String[] args)throws Exception {// 1、创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setPort(5672);factory.setHost("192.168.200.130");factory.setUsername("mqs");factory.setPassword("mqs123");// 2、创建连接Connection connection = factory.newConnection();// 3、获取通道Channel channel = connection.createChannel();// 4、声明String exchangeName = "limit_exchange";String routingKey = "limit.key";String exchangeType = "direct";String queueName = "limit_queue";// 5、声明一个交换器channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);// 6、声明一个队列channel.queueDeclare(queueName, true, false, false, null);// 7、绑定队列到交换器channel.queueBind(queueName, exchangeName, routingKey);/*** 限流设置:  * prefetchSize:每条消息大小的设置,0是无限制* prefetchCount:标识每次推送多少条消息 一般是一条* global:false标识channel级别的  true:标识消费者级别的*/channel.basicQos(0, 1, false);// 8、消费者,要想做限流必须将自动ack设置为false,代表手动ack,一条条的消费// MyConsumer  自定义消费者channel.basicConsume(queueName, false, new MyConsumer(channel));}
}

MyConsumer 自定义消费者:

public class MyConsumer extends DefaultConsumer {private Channel channel;public MyConsumer(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("==handle message====");System.out.println("==consumerTag: " + consumerTag);System.out.println("==envelope: " + envelope);System.out.println("==properties: " + properties);System.out.println("==body: " + new String(body));/*** multiple:false标识不批量签收* 这行代码如果注释掉,则只打印一条消息,因为没有手动ack* 这行代码如果放开,打印所有消息,但也是一条一条消费的!*/channel.basicAck(envelope.getDeliveryTag(), false);}
}

Rabbitmq专题:rabbitmq消费端如何做限流?相关推荐

  1. 面试官:RabbitMQ怎么实现消费端限流

    哈喽!大家好,我是小奇,一位不靠谱的程序员 小奇打算以轻松幽默的对话方式来分享一些技术,如果你觉得通过小奇的文章学到了东西,那就给小奇一个赞吧 文章持续更新,可以微信搜索[小奇JAVA面试]第一时间阅 ...

  2. Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?

    优先级队列 方式一:可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority 方式二:代码设置 Map<String,Object> args = new ...

  3. 系统不做限流,我看你是对中国人口数量有什么误解

    目录 限流 怎么做限流? 获得系统能力上限.处理被限制流量 具体如何限流? 固定窗口 滑动窗口 漏桶 令牌桶 做限流的最佳实践 分布式系统中带来的新挑战 纵 在软件架构领域,"限流" ...

  4. 面试官 | 讲一下如何给高并发系统做限流?

    作者 | nick hao 来源 | uee.me/cDuRD 在开发高并发系统时有三把利器用来保护系统:缓存.降级和限流.本文结合作者的一些经验介绍限流的相关概念.算法和常规的实现方式. 缓存 缓存 ...

  5. 程序员修神之路--高并发优雅的做限流(有福利)

    点击上方蓝色字体,关注我们 菜菜哥,有时间吗? YY妹,什么事? 我最近的任务是做个小的秒杀活动,我怕把后端接口压垮,X总说这可关系到公司的存亡 简单呀,你就做个限流呗 这个没做过呀,菜菜哥,帮妹子写 ...

  6. 【Guava】使用Guava的RateLimiter做限流

    2019独角兽企业重金招聘Python工程师标准>>> 一.常见的限流算法 目前常用的限流算法有两个:漏桶算法和令牌桶算法. 1.漏桶算法 漏桶算法的原理比较简单,请求进入到漏桶中, ...

  7. 1命名规则 sentinel_Sentinel实战:为系统做限流保护

    我们已经知道了 Sentinel 的三大功能:限流 降级 系统保护.现在让我们来了解下具体的使用方法,以限流来演示具体的步骤. 引入依赖 首先肯定是要先引入需要的依赖,如下所示: com.alibab ...

  8. 使用Guava的RateLimiter做限流

    一.问题描述   某天A君突然发现自己的接口请求量突然涨到之前的10倍,没多久该接口几乎不可使用,并引发连锁反应导致整个系统崩溃.如何应对这种情况呢?生活给了我们答案:比如老式电闸都安装了保险丝,一旦 ...

  9. 1-8 (4). RabbitMQ高级特性-消费端ACK

    Consumer ACK 指Acknowledge,确认 有三种方式: (1)自动确认:acknowledge="none"(默认) (2)手动确认:acknowledge=&qu ...

最新文章

  1. 如何利用 notedown 完成 ipynb与markdown之间的格式转换?
  2. MSM8974 fastboot烧写软件
  3. skynet 报错 skynet 服务缺陷 Lua死循环
  4. boost::intrusive::auto_unlink_hook用法的测试程序
  5. 解决crontab 定时任务加载失败
  6. 修改Windows 2003/2008/2012远程桌面服务端口号
  7. 腾讯视频已上线超前点播选集解锁
  8. 数据仓库工具箱维度建模权威指南-第一章 数据仓库、商业智能及维度建模初步
  9. 三轴合并_用两套乐高60107合并成铰接式云梯消防车,看看和60112有什么区别
  10. 《HTML与CSS设计》课程总结,网页设计课程学习心得总结
  11. 29 伪造ICMP数据包的IP层
  12. 实验四:AC与AP间VLAN配置实验
  13. NES模拟器开发笔记(001)缘起、资料及开发准备
  14. 随记 C#读取TXT文件乱码
  15. Excel不用函数嵌套一键完成四舍六入五成双
  16. java 二元组_java里有类似于二元组之类的数据结构么?
  17. 14、MyBatis-Plus入门到进阶
  18. C# Hash字符串
  19. mount挂载基础点
  20. 计算机中丢失vba,打开Excel的时候提示visual basic项目错误导致VBA模块代码丢失

热门文章

  1. Day12-流Stream
  2. java多线程的安全问题与死锁(面向厕所编程)
  3. 为5—18岁青少年提供营地教育,漫族完成百万级天使轮融资
  4. 固态电池技术取得新突破,充电一分钟续航800公里
  5. 看看async,await 是如何简化异步的调用WCF!
  6. 资料汇总--java开发程序员必备技能
  7. Oracle安装出现报错
  8. ubuntu下chromium 安装flash player
  9. 框架设计之菜鸟漫漫江湖路系列 开篇
  10. 从面试题看考察知识点(四)