Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?
优先级队列
方式一:可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority
方式二:代码设置
Map<String,Object> args = new HashMap<String,Object>(); args.put("x-max-priority", 10); channel.queueDeclare("queue_priority", true, false, false, args); |
这里设置的是一个队列queue的最大优先级,之后要在发送的消息中设置消息本身的优先级,设置代码:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(5); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes()); |
完整代码:生产者
public class Producer { public static final String ip = "10.0.40.127"; public static final int port = 5672; public static final String username = "admin"; public static final String password = "123456"; public static void main(String[] args) throws IOException{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPassword(password); connectionFactory.setUsername(username); connectionFactory.setPort(port); connectionFactory.setHost(ip); /* Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //create exchange channel.exchangeDeclare("exchange_priority", "direct", true); //create queue with priority Map<String, Object> params = new HashMap<>(); params.put("x-max-priority", 10); channel.queueDeclare("queue_priority", true, false, false, params); channel.queueBind("queue_priority", "exchange_priority", "rk_priority"); //send message with priority for (int i = 0; i < 10; i++) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); if (i % 2 == 0) { builder.priority(5); } AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange_priority", "rk_priority", properties, ("produce messages-" + i).getBytes()); } channel.close(); connection.close();*/ } } |
消费者
public class Consumer { public static final String ip = "10.0.40.127"; public static final int port = 5672; public static final String username = "admin"; public static final String password = "123456"; public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPassword(password); connectionFactory.setUsername(username); connectionFactory.setPort(port); connectionFactory.setHost(ip); /* Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("queue_priority",consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println(msg); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }*/ } } |
打印输出:先输出偶数,后输出奇数
如何限流?
1、为什么要对消费端限流?
如果Rabbitmq 服务器积压了有上万条未处理的消息,如果这时候连上了一个消费端,那么巨量的消息瞬间全部推送过来,但是单个客户端无法同时处理这么多。当数据量特别大的时候对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。
2、限流的实现方式—限流api
RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException; |
- prefetchSize:0,单条消息大小限制,0代表不限制
- prefetchCount:一次性消费的消息数量。告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
- global:true、false 是否将上面设置应用于 channel,就是上面限制 channel 级别还是 consumer 级别。当我们设置为 false 的时候生效
- prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。
3、如何进行限流?
- 首先第一步,使用消费端限流需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
- 第二步设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
- 第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);
消息确认机制
1、如果没有开启ack消息确认,rabbitmq会认为这条消息没有被消费,会将消息再次放入到队列中,再次让你消费,形成死循环;
2、消费端配置了手动ack,但是在异常捕获中设置了消息重新入队,那么还是会出现死循环
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); |
因为最后一个参数requeue一般都会为true,此次没调用到数据,把这个消息返回到队列中再消费,如果代码中出现了int a=1/0,那么还是会造成死循环。
消息重试机制
当你开启了手动ack的时候再消费端如果在消费的时候出现异常也会导致循环消费,所以要启动消息重试机制,默认是3次重试去消费一条消息,如果没有消费完成,则丢弃(删除)该消息或者放入死信队列中或者进行人工补偿。
erver.port=8889 spring.rabbitmq.host=192.168.221.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=zl spring.rabbitmq.password=123 #开启消息确认机制 spring.rabbitmq.publisher-confirms=true #支持消息发送失败返回队列 spring.rabbitmq.publisher-returns=true #设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除 spring.rabbitmq.template.mandatory=true spring.rabbitmq.connection-timeout=15000 #用户虚拟机权限名称 spring.rabbitmq.virtual-host=/ #设置消费端手动 ack none不确认 auto自动确认 manual手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual #消费者最小数量 spring.rabbitmq.listener.simple.concurrency=1 #消费之最大数量 spring.rabbitmq.listener.simple.max-concurrency=1 #开启消费者重试机制(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息) spring.rabbitmq.listener.simple.retry.enabled=true #重试次数5 spring.rabbitmq.listener.simple.retry.max-attempts=5 #重试时间间隔 spring.rabbitmq.listener.simple.retry.initial-interval=5000 #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列) spring.rabbitmq.listener.simple.default-requeue-rejected=true #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量) spring.rabbitmq.listener.simple.prefetch=2 |
1、触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。
2、对于重试之后仍然异常的消息,mq默认的处理类是RejectAndDontRequeueRecoverer
见名知意。
SimpleRabbitListenerContainerFactoryConfigurer——>>RejectAndDontRequeueRecoverer(实现了MessageRecoverer接口)
MessageRecoverer接口实现类 |
RejectAndDontRequeueRecoverer |
RepublishMessageRecoverer |
|
ImmediateRequeueMessageRecoverer |
优化处理一:对于重试之后仍然异常的消息,可以采用RepublishMessageRecoverer,将消息发送到其他的队列中,再专门针对新的队列进行处理。
优化处理二:采用死信队列的方式处理重试失败的消息。
/** * 死信交换机 * @return */ @Bean public DirectExchange dlxExchange(){ return new DirectExchange(dlxExchangeName); } /** * 死信队列 * @return */ @Bean public Queue dlxQueue(){ return new Queue(dlxQueueName); } /** * 死信队列绑定死信交换机 * @param dlxQueue * @param dlxExchange * @return */ @Bean public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){ return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey); } |
业务代码添加死信交换机、死信路由配置
/** * 业务队列 * @return */ @Bean public Queue queue(){ Map<String,Object> params = new HashMap<>(); params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机 params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键 return QueueBuilder.durable(queueName).withArguments(params).build(); //return new Queue(queueName,true); } |
注意点:
1、消费者在重试5次后,由于MessageCover默认的实现类是RejectAndDontRequeueRecoverer,也就是requeue=false,因为业务队列绑定了死信队列,消息会从业务队列中删除,同时发送到死信队列中。
2、如果ack模式是手动ack,那么需要调用channe.nack方法,同时设置requeue=false才会将异常消息发送到死信队列中
重试使用场景:
对于消费端异常的消息,如果在有限次重试过程中消费成功是最好,如果有限次重试之后仍然失败的消息,不管是采用RejectAndDontRequeueRecoverer还是使用死信队列都是可以的,同时也可以采用折中的方法,先将消息从业务队列中ack掉,再将消息发送到另外的一个队列中,后续再单独处理异常数据的队列
考虑下面两个场景:
1、http下载视频或者图片或者调用第三方接口
2、空指针异常或者类型转换异常(其他的受检查的运行时异常)
第一种重试有意义,第二种重试无意义,需要记录日志以及人工处理或者轮询任务方式处理。
重试的使用方式:
1、自动ack模式,不能catch异常
2、手动ack模式,不能try—catch异常
建议自动ack模式使用重试机制,如果一定要在手动ack模式下使用retry功能,最好还是确认在有限次重试过程中可以重试成功,否则超过重试次数,又没办法执行nack,会出现消息一直unack死循环
消息幂等性
问题 |
解决方案 |
消息重复消费问题:消费者消息处理了,没来的及提交offset,再重启可能导致重复消费 |
方式一:使用全局MessageID判断消费方使用同一个,解决幂等性。 方式二:用一个消息消费表来记录每一条消息,给每个一个消息设置一个id(uuid),消费了就保存到表中去。消息过来的时候先查询是否已经消费。 |
Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?相关推荐
- Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理
Redis如何实现分布式锁延时队列以及限流应用 视频讲解如下,点击观看: Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理|数据模 ...
- 【高并发秒杀系统】对分布式锁、缓存、消息队列、限流等的原理分析及代码实现
前言:在一些商城项目中,秒杀是不可或缺的.但是,如果将普通的购买.消费等业务流程用于秒杀系统,不做任何的处理,会导致请求阻塞严重.超买超卖等严重后果,服务器.数据库也可能因为瞬时的流量而奔溃.所以,设 ...
- RabbitMQ认知篇 - 优先级队列
优先级队列 RabbitMQ在3.5.0版本的时候提供了优先级队列的实现.客户端通过配置队列的x-max-priority参数的方式设置一个队列支持的最大优先级(但是不能使用策略的方式配置)以此来声明 ...
- java队列实现限流,java中应对高并发的两种策略
目的:提高可用性 通过ExecutorService实现队列泄洪 //含有20个线程的线程池 private ExecutorService executorService = Executors.n ...
- Tomcat容器做到自我保护,设置最大连接数(服务限流:tomcat请求数限制)
http://itindex.net/detail/58707-%E5%81%87%E6%AD%BB-tomcat-%E5%AE%B9%E5%99%A8 为了确保服务不会被过多的http长连接压垮,我 ...
- RabbitMQ的优先级队列
优先级队列 队列需要设置优先级队列,消息需要设置消息的优先级.消费者需要等待消息已经发送到队列中,然后对队列中的消息进行排序,最后再去消费. Map<String, Object> arg ...
- Java限流解决方案
前言 说到限流,想必大家都不陌生,一个很简单的例子就是,在12306上面买票的时候,遇到某时刻开始抢票的时候,经常页面会弹出一个类似请稍后重试的提示,从后端的技术层面来看,大概有2层解释,第一是服务器 ...
- Java限流策略与算法
概要 在大数据量高并发访问时,经常会出现服务或接口面对暴涨的请求而不可用的情况,甚至引发连锁反映导致整个系统崩溃.此时你需要使用的技术手段之一就是限流,当请求达到一定的并发数或速率,就进行等待.排队. ...
- 十分钟搞懂Java限流及常见方案
点击关注公众号:互联网架构师,后台回复 2T获取2TB学习资源! 上一篇:Alibaba开源内网高并发编程手册.pdf 文章目录 限流基本概念 QPS和连接数控制 传输速率 黑白名单 分布式环境 限流 ...
最新文章
- 【转】 Android Fragment 真正的完全解析(下)
- PHP的urlencode
- TODO:Go语言goroutine和channel使用
- java工程师去字节飞书可以,字节跳动飞书Java后端开发暑假实习一面(过了)
- C++ 用new 动态创建多维数组
- 局部变量 和 全局变量
- 1.5不同类型的循环神经网络
- 第五章、使用复合赋值和循环语句
- python行业数据分析_python在数据分析
- java找出最高工资和下标_Java 8 lambda用于为每个部门选择最高薪资员工
- P5057 [CQOI2006]简单题
- 【DSP】CCS 5.5的安装教程
- 第一章 路径规划算法概述
- 手机号正则(2020年4月15日)
- 2022.04.13【读书笔记】|10X单细胞转录组分析流程介绍
- 复杂边坡的ansys三维建模
- java 龟兔赛跑_Java实现多线程模拟龟兔赛跑
- 数字信号处理中各种频率关系
- NCCL源码解析②:Bootstrap网络连接的建立
- win7进不了系统怎么办
热门文章
- 前端学习(2182):keep-alive及其他问题
- 前端学习(2154):webpack横幅plugin的使用
- 前端学习(670):分支流程控制if
- spring学习(8):log4j.properties 详解与配置步骤
- 第四十期:九个对Web开发者最有用的Python包,掌握这些,工资至少能涨涨
- 第二十八期:Notepad++ 新 Logo 出炉,官网全新改版采用自适应设计
- python appium自动化测试平台开发,Python+Appium实现自动化测试
- 元组、字典、集合的常用方法
- 【练习】实现一个parse方法(需要实现的效果见内容),方法总结
- HTML第二课——css