优先级队列

方式一:可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority

方式二:代码设置

Map<String,Object> args = new HashMap<String,Object>();

args.put("x-max-priority", 10);

channel.queueDeclare("queue_priority", true, false, false, args);

这里设置的是一个队列queue的最大优先级,之后要在发送的消息中设置消息本身的优先级,设置代码:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

builder.priority(5);

AMQP.BasicProperties properties = builder.build();

channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());

完整代码:生产者

public class Producer {

public static final String ip = "10.0.40.127";

public static final int port = 5672;

public static final String username = "admin";

public static final String password = "123456";

public static void main(String[] args) throws IOException{

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setPassword(password);

connectionFactory.setUsername(username);

connectionFactory.setPort(port);

connectionFactory.setHost(ip);

/*   Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

//create exchange

channel.exchangeDeclare("exchange_priority", "direct", true);

//create queue with priority

Map<String, Object> params = new HashMap<>();

params.put("x-max-priority", 10);

channel.queueDeclare("queue_priority", true, false, false, params);

channel.queueBind("queue_priority", "exchange_priority", "rk_priority");

//send message with priority

for (int i = 0; i < 10; i++) {

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

if (i % 2 == 0) {

builder.priority(5);

}

AMQP.BasicProperties properties = builder.build();

channel.basicPublish("exchange_priority", "rk_priority", properties, ("produce messages-" + i).getBytes());

}

channel.close();

connection.close();*/

}

}

消费者

public class Consumer {

public static final String ip = "10.0.40.127";

public static final int port = 5672;

public static final String username = "admin";

public static final String password = "123456";

public static void main(String[] args) throws IOException, InterruptedException {

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setPassword(password);

connectionFactory.setUsername(username);

connectionFactory.setPort(port);

connectionFactory.setHost(ip);

/*        Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume("queue_priority",consumer);

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String msg = new String(delivery.getBody());

System.out.println(msg);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}*/

}

}

打印输出:先输出偶数,后输出奇数

如何限流?

1、为什么要对消费端限流?

如果Rabbitmq 服务器积压了有上万条未处理的消息,如果这时候连上了一个消费端,那么巨量的消息瞬间全部推送过来,但是单个客户端无法同时处理这么多。当数据量特别大的时候对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

2、限流的实现方式—限流api

RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

  • prefetchSize:0,单条消息大小限制,0代表不限制
  • prefetchCount:一次性消费的消息数量。告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
  • global:true、false 是否将上面设置应用于 channel,就是上面限制 channel 级别还是 consumer 级别。当我们设置为 false 的时候生效
  • prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。

3、如何进行限流?

  • 首先第一步,使用消费端限流需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
  • 第二步设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
  • 第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);

 消息确认机制

1、如果没有开启ack消息确认,rabbitmq会认为这条消息没有被消费,会将消息再次放入到队列中,再次让你消费,形成死循环;

2、消费端配置了手动ack,但是在异常捕获中设置了消息重新入队,那么还是会出现死循环

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

因为最后一个参数requeue一般都会为true,此次没调用到数据,把这个消息返回到队列中再消费,如果代码中出现了int a=1/0,那么还是会造成死循环。

 消息重试机制

当你开启了手动ack的时候再消费端如果在消费的时候出现异常也会导致循环消费,所以要启动消息重试机制,默认是3次重试去消费一条消息,如果没有消费完成,则丢弃(删除)该消息或者放入死信队列中或者进行人工补偿。

erver.port=8889

spring.rabbitmq.host=192.168.221.150

spring.rabbitmq.port=5672

spring.rabbitmq.username=zl

spring.rabbitmq.password=123

#开启消息确认机制

spring.rabbitmq.publisher-confirms=true

#支持消息发送失败返回队列

spring.rabbitmq.publisher-returns=true

#设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除

spring.rabbitmq.template.mandatory=true

spring.rabbitmq.connection-timeout=15000

#用户虚拟机权限名称

spring.rabbitmq.virtual-host=/

#设置消费端手动 ack   none不确认  auto自动确认  manual手动确认

spring.rabbitmq.listener.simple.acknowledge-mode=manual

#消费者最小数量

spring.rabbitmq.listener.simple.concurrency=1

#消费之最大数量

spring.rabbitmq.listener.simple.max-concurrency=1

#开启消费者重试机制(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)

spring.rabbitmq.listener.simple.retry.enabled=true

#重试次数5

spring.rabbitmq.listener.simple.retry.max-attempts=5

#重试时间间隔

spring.rabbitmq.listener.simple.retry.initial-interval=5000

#重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)

spring.rabbitmq.listener.simple.default-requeue-rejected=true

#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)

spring.rabbitmq.listener.simple.prefetch=2

1、触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。

2、对于重试之后仍然异常的消息,mq默认的处理类是RejectAndDontRequeueRecoverer

见名知意。

SimpleRabbitListenerContainerFactoryConfigurer——>>RejectAndDontRequeueRecoverer(实现了MessageRecoverer接口)

MessageRecoverer接口实现类

RejectAndDontRequeueRecoverer

RepublishMessageRecoverer

ImmediateRequeueMessageRecoverer

优化处理一:对于重试之后仍然异常的消息,可以采用RepublishMessageRecoverer,将消息发送到其他的队列中,再专门针对新的队列进行处理

优化处理二:采用死信队列的方式处理重试失败的消息。

/**

* 死信交换机

* @return

*/

@Bean

public DirectExchange dlxExchange(){

return new DirectExchange(dlxExchangeName);

}

/**

* 死信队列

* @return

*/

@Bean

public Queue dlxQueue(){

return new Queue(dlxQueueName);

}

/**

* 死信队列绑定死信交换机

* @param dlxQueue

* @param dlxExchange

* @return

*/

@Bean

public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);

}

业务代码添加死信交换机、死信路由配置

/**

* 业务队列

* @return

*/

@Bean

public Queue queue(){

Map<String,Object> params = new HashMap<>();

params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机

params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键

return QueueBuilder.durable(queueName).withArguments(params).build();

//return new Queue(queueName,true);

}

注意点:

1、消费者在重试5次后,由于MessageCover默认的实现类是RejectAndDontRequeueRecoverer,也就是requeue=false,因为业务队列绑定了死信队列,消息会从业务队列中删除,同时发送到死信队列中。

2、如果ack模式是手动ack,那么需要调用channe.nack方法,同时设置requeue=false才会将异常消息发送到死信队列中

重试使用场景:

对于消费端异常的消息,如果在有限次重试过程中消费成功是最好,如果有限次重试之后仍然失败的消息,不管是采用RejectAndDontRequeueRecoverer还是使用死信队列都是可以的,同时也可以采用折中的方法,先将消息从业务队列中ack掉,再将消息发送到另外的一个队列中,后续再单独处理异常数据的队列

考虑下面两个场景:

1http下载视频或者图片或者调用第三方接口

2空指针异常或者类型转换异常(其他的受检查的运行时异常)

第一种重试有意义,第二种重试无意义,需要记录日志以及人工处理或者轮询任务方式处理。

重试的使用方式:

1、自动ack模式,不能catch异常

2、手动ack模式,不能try—catch异常

建议自动ack模式使用重试机制,如果一定要在手动ack模式下使用retry功能,最好还是确认在有限次重试过程中可以重试成功,否则超过重试次数,又没办法执行nack,会出现消息一直unack死循环

消息幂等性

问题

解决方案

消息重复消费问题:消费者消息处理了,没来的及提交offset,再重启可能导致重复消费

方式一:使用全局MessageID判断消费方使用同一个,解决幂等性。

方式二:用一个消息消费表来记录每一条消息,给每个一个消息设置一个id(uuid),消费了就保存到表中去。消息过来的时候先查询是否已经消费。

Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?相关推荐

  1. Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理

    Redis如何实现分布式锁延时队列以及限流应用 视频讲解如下,点击观看: Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理|数据模 ...

  2. 【高并发秒杀系统】对分布式锁、缓存、消息队列、限流等的原理分析及代码实现

    前言:在一些商城项目中,秒杀是不可或缺的.但是,如果将普通的购买.消费等业务流程用于秒杀系统,不做任何的处理,会导致请求阻塞严重.超买超卖等严重后果,服务器.数据库也可能因为瞬时的流量而奔溃.所以,设 ...

  3. RabbitMQ认知篇 - 优先级队列

    优先级队列 RabbitMQ在3.5.0版本的时候提供了优先级队列的实现.客户端通过配置队列的x-max-priority参数的方式设置一个队列支持的最大优先级(但是不能使用策略的方式配置)以此来声明 ...

  4. java队列实现限流,java中应对高并发的两种策略

    目的:提高可用性 通过ExecutorService实现队列泄洪 //含有20个线程的线程池 private ExecutorService executorService = Executors.n ...

  5. Tomcat容器做到自我保护,设置最大连接数(服务限流:tomcat请求数限制)

    http://itindex.net/detail/58707-%E5%81%87%E6%AD%BB-tomcat-%E5%AE%B9%E5%99%A8 为了确保服务不会被过多的http长连接压垮,我 ...

  6. RabbitMQ的优先级队列

    优先级队列 队列需要设置优先级队列,消息需要设置消息的优先级.消费者需要等待消息已经发送到队列中,然后对队列中的消息进行排序,最后再去消费. Map<String, Object> arg ...

  7. Java限流解决方案

    前言 说到限流,想必大家都不陌生,一个很简单的例子就是,在12306上面买票的时候,遇到某时刻开始抢票的时候,经常页面会弹出一个类似请稍后重试的提示,从后端的技术层面来看,大概有2层解释,第一是服务器 ...

  8. Java限流策略与算法

    概要 在大数据量高并发访问时,经常会出现服务或接口面对暴涨的请求而不可用的情况,甚至引发连锁反映导致整个系统崩溃.此时你需要使用的技术手段之一就是限流,当请求达到一定的并发数或速率,就进行等待.排队. ...

  9. 十分钟搞懂Java限流及常见方案

    点击关注公众号:互联网架构师,后台回复 2T获取2TB学习资源! 上一篇:Alibaba开源内网高并发编程手册.pdf 文章目录 限流基本概念 QPS和连接数控制 传输速率 黑白名单 分布式环境 限流 ...

最新文章

  1. 【转】 Android Fragment 真正的完全解析(下)
  2. PHP的urlencode
  3. TODO:Go语言goroutine和channel使用
  4. java工程师去字节飞书可以,字节跳动飞书Java后端开发暑假实习一面(过了)
  5. C++ 用new 动态创建多维数组
  6. 局部变量 和 全局变量
  7. 1.5不同类型的循环神经网络
  8. 第五章、使用复合赋值和循环语句
  9. python行业数据分析_python在数据分析
  10. java找出最高工资和下标_Java 8 lambda用于为每个部门选择最高薪资员工
  11. P5057 [CQOI2006]简单题
  12. 【DSP】CCS 5.5的安装教程
  13. 第一章 路径规划算法概述
  14. 手机号正则(2020年4月15日)
  15. 2022.04.13【读书笔记】|10X单细胞转录组分析流程介绍
  16. 复杂边坡的ansys三维建模
  17. java 龟兔赛跑_Java实现多线程模拟龟兔赛跑
  18. 数字信号处理中各种频率关系
  19. NCCL源码解析②:Bootstrap网络连接的建立
  20. win7进不了系统怎么办

热门文章

  1. 前端学习(2182):keep-alive及其他问题
  2. 前端学习(2154):webpack横幅plugin的使用
  3. 前端学习(670):分支流程控制if
  4. spring学习(8):log4j.properties 详解与配置步骤
  5. 第四十期:九个对Web开发者最有用的Python包,掌握这些,工资至少能涨涨
  6. 第二十八期:Notepad++ 新 Logo 出炉,官网全新改版采用自适应设计
  7. python appium自动化测试平台开发,Python+Appium实现自动化测试
  8. 元组、字典、集合的常用方法
  9. 【练习】实现一个parse方法(需要实现的效果见内容),方法总结
  10. HTML第二课——css