文章目录

  • 1. 简述
  • 2. 特性示例:
    • 2.1 消息可靠性投递
    • 2.2 Consumer Ack
    • 2.3 消费端限流
    • 2.4 TTL
      • 2.5 死信队列
    • 2.6 延迟队列

1. 简述

在rabbitMQ都官方文档中,高级特性又叫做Our Extensions

常用的有:

  • 消息可靠性投递
  • Consumer ACK
  • 消费端限流
  • TTL
  • 死信队列
  • 延迟队列

2. 特性示例:

2.1 消息可靠性投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

其中rabbitmq 整个消息投递的路径为:
producer ---> rabbitmq broker ---> exchange ---> queue ---> consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
  • 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

我们使用这两个callBack控制消息的可靠性投递

// 生产者/*** 确认模式:*      1. 开启确认模式,在yml中将 publisher-confirms 设置为true*      2. 在rabbitTemplate中定义回调函数 setConfirmCallback*/@Testpublic void testConfirm() throws InterruptedException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 交换机相关配置* @param b 是否发送成功, true 为成功* @param s 发送失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("发送消息....");if (b) {System.out.println("消息发送成功 " + s);} else {System.out.println("消息发送失败 " + s);}}});rabbitTemplate.convertAndSend("test_Exchange_confirm","confirm","mq confirm ~~~");Thread.sleep(2000);}/*** 回退模式: 当exchange将消息发送给 queue后,发送失败,才会执行returnCallBack*  步骤:*      1. 开启回退模式: publisher-returns: true*      2. 设置 returnCallBack*      3. 设置Exchange的处理模式*          1. 如果消息没有路由到queue,丢弃消息 (默认)*          2. 如果消息没有路由到queue, 返回给消息发送方 returnCallBack*/@Testpublic void testReturn() throws InterruptedException {// 设置交换机处理消息的模式rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** @param message 消息对象* @param replyCode 错误码* @param replyText 错误信息* @param exchange 交换机信息* @param routingKey 路由键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("执行returnCallBack");System.out.println(message);System.out.println(replyCode);System.out.println(replyText);System.out.println(exchange);System.out.println(routingKey);}});rabbitTemplate.convertAndSend("test_Exchange_confirm","confirm","mq confirm ~~~");Thread.sleep(3000);}

步骤:

  1. 开启消息发送的回调模式
  2. 重写ConfirmCallbackReturnCallBack 两个方法,用来接收消息是否完成发送的信息

小结

  • 设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。
  • 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回
    调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发
    送失败,需要处理。
  • 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式
  • 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到
    queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退
    回给producer。并执行回调函数returnedMessage。

2.2 Consumer Ack

当 RabbitMQ 将消息传递给消费者时,RabbbitMQ需要知道这个消息是否到达,那么消费者共有三种模式:

  • 自动确认:acknowledge=“none”
  • 手动确认:acknowledge=“manual”
  • 根据异常情况确认:acknowledge=“auto”

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。


开启手动确认模式

acknowledge-mode: manual

注意
当节点将消息传递给使用者时,它必须决定是否应将该消息视为由使用者处理(或至少接收)。由于多个事物(客户端连接、使用者应用等)可能会失败,因此此决策是一个数据安全问题。消息传递协议通常提供一种确认机制,允许使用者确认对其所连接到的节点的传递。是否使用该机制是在消费者订阅时决定的。

根据所使用的确认模式,RabbitMQ 可以认为消息在发送出去(写入 TCP 套接字)或收到显式(“手动”)客户端确认后立即成功传递。手动发送的确认可以是正数或负数,并使用以下协议方法之一:

  • basic.ack : 用于正面确认
  • basic.nack : 用于否定确认,可以异常拒绝多个失败请求
  • basic.reject: 用于否定确认,只能拒绝一个
 /***  consumer Ack步骤:*      1. 在yml中开启 acknowledge-mode: manual*      2. 监听器加上RabbitHandler*      3. 定义方法参数为 (String msg, Channel channel, Message message)*      4. 如果方法执行成功,调用 channel.basicAck*      5. 如果方法执行失败,调用 channel.basicNAck* @param message*/@RabbitHandlerpublic void ListenerMessage(String msg, Channel channel, Message message) throws InterruptedException {Thread.sleep(1000);long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("接收到消息 :" + msg);System.out.println("开始处理消息");int a = 1 / 0;/*** basicAck(long deliveryTag, boolean multiple)*      deliveryTag 类似于TCP中的ACK号*      multiple 确认所有消息到达*/channel.basicAck(deliveryTag, true);System.out.println("消息处理成功");} catch (Exception e) {try {/*** basicNack(long deliveryTag, boolean multiple, boolean requeue)*      requeue : true 表示重新丢回队列*                  false 表示直接丢弃*/channel.basicNack(deliveryTag, true, true);System.out.println("消息发送失败。。。");} catch (IOException ioException) {ioException.printStackTrace();}}}

在自动确认模式下,消息在发送后立即被视为已成功传递。这种模式以更高的吞吐量(只要消费者能够跟上)换取降低交付和消费者处理的安全性。这种模式通常被称为“即发即弃”。与手动确认模型不同,如果消费者的TCP连接或通道在成功传递之前关闭,则服务器发送的消息将丢失。因此,应将自动消息确认视为不安全,并且不适合所有工作负荷。
使用自动确认模式时要考虑的另一件重要事情是使用者重载。手动确认模式通常与有界通道预取一起使用,该预取限制通道上未完成(“进行中”)传递的数量。但是,使用自动确认时,根据定义没有这样的限制。因此,消费者可能会被交付速度所淹没,这可能会在内存中积累积压并耗尽堆或使其进程作系统终止。某些客户端库将应用 TCP 背压(停止从套接字读取,直到未处理的交付的积压超过一定限制)。因此,自动确认模式仅推荐给能够高效且稳定地处理交付的消费者。

2.3 消费端限流

根据消费端处理能力来决定消费端一次最多可以拉取的消息
属性为: prefetch
例如: prefetch = 1 表示消费端一秒只接受一条消息,直到手动确认接收完毕,才接收下一条消息

值得注意的是,如果prefetch=0表示可以接收任意数量的未确认消息

2.4 TTL

TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

 @Testpublic void testSent() throws InterruptedException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("发送消息....");if (b) {System.out.println("消息发送成功 " + s);} else {System.out.println("消息发送失败 " + s);}}});/*** 消息后处理对象,设置消息的一些参数*/MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 1. 设置过期时间message.getMessageProperties().setExpiration("10000");// 2. 返回消息return message;}};/*** TTL 过期有两种设置*      1. 队列整个消息过期 : 需要在队列参数中设置 "x-message-ttl", 10000*      2. 单个消息设置过期:*          如果设置了队列的过期时间和单个消息的过期时间,谁时间段谁就生效*          队列过期后,会将所有消息都清除*          RabbitMQ并不会轮询消息,而是消息到了队列顶端,才会判断消息是否过期*/for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("test_Exchange_TTL","ttl.hhh","mq ttl ~~~" + i, messagePostProcessor);}Thread.sleep(100000);}

2.5 死信队列

消息成为死信的三种方式:

  1. 队列消息长度到达限制;
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

配置死信队列

     /** 测试死信队列* 分别创建正常的队列(test_queue_dlx)和正常的交换机(test-exchange_dlx)* 创建死信队列(queue_dlx)和死信交换机(exchange_dlx)* 正常队列绑定死信交换机,并且指定*      x-dead-letter-exchange*      x-dead-letter-routing-key*//*** 创建正常队列和正常交换机*/@Bean("test_queue_dlx")public Queue testQueueDlx() {return QueueBuilder.durable("test_queue_dlx").withArgument("x-dead-letter-exchange", "exchange_dlx") //指定死信交换机.withArgument("x-dead-letter-routing-key", "dlx.hhh")  // 指定路由key.withArgument("x-message-ttl", 10000) //设置队列过期时间.withArgument("x-max-length", 10) //设置最大长度.build();}@Bean("test_exchange_dlx")public Exchange testExchangeDlx() {return ExchangeBuilder.topicExchange("test_exchange_dlx").build();}@Beanpublic Binding DlxBinding(@Qualifier("test_queue_dlx") Queue queue, @Qualifier("test_exchange_dlx") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();}/*** 创建死信队列和死信交换机*/@Bean("queue_dlx")public Queue QueueDlx() {return QueueBuilder.durable("queue_dlx").build();}@Bean("exchange_dlx")public Exchange ExchangeDlx() {return ExchangeBuilder.topicExchange("exchange_dlx").build();}@Beanpublic Binding DlxBindingDead(@Qualifier("queue_dlx") Queue queue, @Qualifier("exchange_dlx") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();}

测试死信队列

 // 1. 队列消息过期rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hhh", "我那些残梦灵翼九霄");// 2. 队列长度超出,设置队列长度为10for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hhh", "我那些残梦灵翼九霄");}//3. 测试消息拒收rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hhh", "我那些残梦灵翼九霄");

2.6 延迟队列

使用TTL + 死信队列,就是延迟队列
TTL用于定时,消费者从死信队列中取值

RabbitMQ高级特性相关推荐

  1. RabbitMQ(二):RabbitMQ高级特性

    RabbitMQ(二):RabbitMQ高级特性 RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用.作为一名合格的开发者,有必要了解一下相关知识,RabbitM ...

  2. RabbitMQ 高级特性(吐血猝死整理篇)

    文章目录 RabbitMQ 高级特性 消息可靠性投递(可靠性发送) 事务机制 代码实现 发送方确认机制 为什么比事务性能好 示例代码 测试一下QPS 持久化存储 TTL 队列 死信队列(DLX) 延迟 ...

  3. 3 RabbitMQ高级特性 3

    主要为大家讲解RabbitMQ的高级特性和实际场景应用, 包括消息如何保障 100% 的投递成功 ? 幂等性概念详解,在海量订单产生的业务高峰期,如何避免消息的重复消费问题? Confirm确认消息. ...

  4. 【消息中间件】RabbitMQ 高级特性与应用问题

    消息的可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 r ...

  5. 消息中间件--RabbitMQ ---高级特性之消费端ACK与重回队列

    什么是消费端的ACK和重回队列? 消费端的手工ACK和NACK 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障 ...

  6. RabbitMQ高级特性-惰性队列

    目录 一.消息堆积问题 二.解决消息堆积的三种思路 三.惰性队列 1.命令行修改惰性队列 2.用SpringAMQP声明惰性队列 @Bean的方式 注解方式 测试发送消息 3.惰性队列的优点 4.惰性 ...

  7. 1-8 (4). RabbitMQ高级特性-消费端ACK

    Consumer ACK 指Acknowledge,确认 有三种方式: (1)自动确认:acknowledge="none"(默认) (2)手动确认:acknowledge=&qu ...

  8. rabbitmq高级特性(消息手动确认)

    为了保证消息不丢失,从生产者两种模式(确认模式和返回模式)到消费者手动确认的一个大整合 生产者 1.maven依赖 <dependency><groupId>org.sprin ...

  9. RabbitMQ(13)RabbitMQ高级特性:TTL

    TTL全称Time To Live (存活时间/过期时间). 当消息到达存活时间后,还没有被消费,会被自动清除.. RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间. ...

最新文章

  1. 俄罗斯将用机器人当探月先锋
  2. android 命令行创建模拟器,在命令行创建、删除和浏览AVD、使用android模拟器
  3. 使用Swagger创建Api
  4. 简单干净的C#方法设计案例:SFCUI.AjaxLoadPage()之二
  5. C语言单片机等待询问_单片机很好玩8,温度太高就报警,制作一个智能室内温度湿度计...
  6. 【PL/SQL】学习笔记 (1)一个简单的PL/SQL程序
  7. java clob存储_java oracle clob string 大字符串存储
  8. efcore mysql坑,.net core 2.1 使用ef DBfirst 生成 实体映射遇到的坑 (Pomelo.EntityFrameworkCore.MySql 篇)...
  9. win11菜单栏的推荐项目怎么取消 windows11取消推荐项目的设置方法
  10. python统计word页码_使用Python(win32com)在MS Word表中插入带页码的字段
  11. day07 深浅拷贝
  12. 【目标跟踪】基于matlab红外图像弱小目标检测与跟踪【含Matlab源码 374期】
  13. c语言二级java难吗_计算机二级考JAVA还是C?
  14. Centos7防火墙iptables安装及设置图文并茂【实现防火墙管理功能】
  15. Android 隐藏状态栏
  16. 有关java的演讲稿_有关超级演说家励志的演讲稿
  17. 杰理之获取恒流充电的挡位值【篇】
  18. kubernetes 【组件】ingress 如何通过域名访问您的应用
  19. yolox的正负样本分配策略mmdet代码详解
  20. 华为路由器 静态路由

热门文章

  1. adlds文件服务器,WinServer2008之ADLDS轻型目录服务解析
  2. 新闻资讯博客小程序源码/支持微信、QQ、百度小程序/支持流量主
  3. android7.1root工具,Android模拟器Root,Android7.1.1
  4. 要想富,先有LU,超能鹿战队LU开盘涨幅240%
  5. 愿我们在路上聪明绝顶但不绝顶
  6. Android AccountManager帐号管理(一)
  7. International English Language Testing System - IELTS - 雅思
  8. 阿拉伯数字转换为中文大写数字
  9. 职场防背锅的有效方式
  10. 【文献阅读】A2-Nets: Double Attention Networks