RabbitMQ学习笔记(高级篇)

文章目录

  • RabbitMQ学习笔记(高级篇)
    • RabbitMQ的高级特性
      • 消息的可靠投递
        • 生产者确认 —— confirm确认模式
        • 生产者确认 —— return确认模式
      • 消费者确认 Consumer ACK
      • 消费端限流
      • TTL
      • 死信队列
      • 延迟队列
      • 日志与监控
      • 消息追踪
    • RabbitMQ的应用问题
      • 消息可靠性保障
      • 消息幂等性保障

RabbitMQ的高级特性

消息的可靠投递

  • 在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式。

    • confirm确认模式
    • return退回模式
  • rabbitmg整个消息投递的路径为:
    producer—>rabbitmq broker—>exchange—>queue—>consumer

  • 我们将利用这两个callback控制消息的可靠性投递

生产者确认 —— confirm确认模式

  • 消息从producer到exchange则会返回一个confirmCallback。
  • 需要在配置文件中添加 publisher-confirm-type: correlated # 如果不加这个,confirm回调方法无法执行
spring:rabbitmq:port: 5672host: 192.168.253.129virtual-host: /domousername: lmxpassword: 123456publisher-confirm-type: correlated # 如果不加这个,confirm回调方法无法执行
@Testvoid contextLoads() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("回调方法被执行======");if (b){System.out.println("接受信息成功"+s);}else {System.out.println("失败原因是"+s);}}});String s="waring级别的数据";rabbitTemplate.convertAndSend(RabbitMQConfigure.TocpicExchangeName,"serve.order.warning",s);
//        rabbitTemplate.convertAndSend("12","serve.order.warning",s);
  • 函数参数说明:confirm(CorrelationData correlationData, boolean b, String s)

    • b:消息是否发送成功
    • s:发送失败的原因,发送成功为null
    • correlationData:额外添加的参数

生产者确认 —— return确认模式

  • 消息从exchange->queue投递失败则会返回一个returnCallback
  • 需要配置publisher-returns: true属性
spring:rabbitmq:port: 5672host: 192.168.253.129virtual-host: /domousername: lmxpassword: 123456publisher-confirm-type: correlated # 如果不加这个,confirm回调方法无法执行publisher-returns: true
@Testvoid contextLoadsRunturnCallback() {rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("returncallback方法被执行");System.out.println("信息是"+returnedMessage.getMessage());
//               System.out.println(new String(returnedMessage.getMessage().getBody()));System.out.println("交换机名称:"+returnedMessage.getExchange());System.out.println("路由key民称:"+returnedMessage.getRoutingKey());System.out.println("编码:"+returnedMessage.getReplyCode());System.out.println("编码信息:"+returnedMessage.getReplyText());}});String s="error级别的数据";rabbitTemplate.convertAndSend(RabbitMQConfigure.TocpicExchangeName,"serve.order.error",s);
//        rabbitTemplate.convertAndSend("12","serve.order.warning",s);}

消费者确认 Consumer ACK

  • ack指Acknowledge,确认。表示消费端收到消息后的确认方式。
  • 有三种确认方式:
    • 自动确认:acknowledge="none
    • 手动确认:acknowledge=“manual”
    • 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)
  • 其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
  • 如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用**channel…basicNack()**方法,让其自动重新发送消息。
spring:rabbitmq:port: 5672host: 192.168.253.129virtual-host: /domousername: lmxpassword: 123456listener:direct: # 就是pub、sub模式acknowledge-mode: manualprefetch: 1000simple: # 简单模式与workquene模式acknowledge-mode: manualprefetch: 1000
@RabbitListener(queues = "quneq1")public void Lister(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("error级别的消息是" + new String(message.getBody()));
//            System.out.println("交换机" + message.getMessageProperties().getReceivedExchange());
//            System.out.println("路由信息" + message.getMessageProperties().getReceivedRoutingKey());System.out.println("============================");System.out.println("处理业务逻辑");
//            int a=1/0;channel.basicAck(deliveryTag,true);}catch (Exception e){e.printStackTrace();channel.basicNack(deliveryTag,true,true);}
  • 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式none:自动确认,manual:手动确认
  • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
  • 如果出现异常,则在catch中调用basicNack或basicReject,拒绝消息,让MQ重新发送消息。
  • channel.basicAck(deliveryTag,false)参数:第一个就是message对象中的deliveryTag ,消息的标识符,第二个参数就是是否接收消息
  • basicNack(long deliveryTag, boolean multiple, boolean requeue)
    • 第一个参数message对象中的deliveryTag,第二个参数,true的时候不接收消息,第三个参数,是否将消息恢复到Rabbitmq的消息队列中,重复发送,直到异常处理结束

消费端限流

  • 配置prefetch属性设置消费端一次拉去多少消息
  • 注意:在限流的时候需要将消费端的消费确认模式改为manel(手动确认),否则,限流不生效
spring:rabbitmq:port: 5672host: 192.168.253.129virtual-host: /domousername: lmxpassword: 123456listener:direct: # 就是pub、sub模式acknowledge-mode: manualprefetch: 1 #  每次拉去一条消息simple: # 简单模式与workquene模式acknowledge-mode: manualprefetch: 1  #  每次拉去一条消息
@RabbitListener(queues = "quneq2")public void Lister2(Message message, Channel channel) throws IOException, InterruptedException {System.out.println("info或waring级别的消息是"+new String(message.getBody()));
//        System.out.println("交换机"+message.getMessageProperties().getReceivedExchange());
//        System.out.println("路由信息"+message.getMessageProperties().getReceivedRoutingKey());
//        System.out.println("============================");Thread.sleep(1000);System.out.println("处理业务逻辑");//        签收消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}

TTL

  • TTL全称Time To Live(存活时间/过期时间)。

  • 当消息到达存活时间后,还没有被消费,会被自动清除。

  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

  • 向队列添加过期时间

    @Beanpublic Queue ttlqueue(){//        设置队列过期时间return QueueBuilder.durable(ttlqueue).ttl(5000).build();}
  • 向单独的消息添加过期时间
    @Testpublic void TestTTl() {MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//                设置Expiration属性message.getMessageProperties().setExpiration("5000");return message;}};rabbitTemplate.convertAndSend(RabbitMQConfigure.TocpicExchangeName, "123.ttl", "单独过期消息发送".getBytes(StandardCharsets.UTF_8),messagePostProcessor);}

死信队列

  • 死信队列,英文缩写:DLX。Dead Letter Exchange(死信交换机),当消息成为Dead message)后,可以被重新发送到另一个交换机,这个交换机就是DLX。

  • 消息成为死信的三种情况:

    • 1.队列消息长度到达限制
    • 2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
    • 3.原队列存在消息过期设置,消息到达超时时间未被消费:
  • 队列绑定死信交换机
    • 给队列设置参数:x-dead-letter-exchange和X-dead-letter-routing-key
  • 定义死信交换机,死信队列(同正常交换机,队列相同)
    //    设置死信交换机@Beanpublic Exchange dlxexchange() {return ExchangeBuilder.topicExchange(dlxexchange).durable(true).build();}@Beanpublic Queue dlxqueue() {return QueueBuilder.durable(dlxqueue).build();}@Beanpublic Binding dlxbinding() {return BindingBuilder.bind(dlxqueue()).to(dlxexchange()).with("dlx.#").and(null);}
  • 将正常队列与死信队列绑定
@Beanpublic Queue dlxtestqueue() {return QueueBuilder.durable(dlxtestqueue).deadLetterExchange(dlxexchange).deadLetterRoutingKey("dlx.123").maxLength(10).build();}@Beanpublic Binding Exchangedlxtest(){return  BindingBuilder.bind(dlxtestqueue()).to(TocpicExchange()).with("testdlx.#").and(null);}

延迟队列


  • 配置deadLetterExchange(dlxexchange).ttl(10000).deadLetterRoutingKey(“dlx.yanchi”),属性
//    延迟队列,设置队列的过期时间是10秒@Beanpublic Queue yanchiqueue(){return QueueBuilder.durable("yanchiqueue").deadLetterExchange(dlxexchange).ttl(10000).deadLetterRoutingKey("dlx.yanchi").build();}@Beanpublic Binding yanchibing(){return BindingBuilder.bind(yanchiqueue()).to(TocpicExchange()).with("yanchi.#").and(null);}
@Testpublic void errorTestYanchi2() throws InterruptedException {LocalDateTime now = LocalDateTime.now();rabbitTemplate.convertAndSend(RabbitMQConfigure.TocpicExchangeName, "yanchi.123", "商品订单2" + now);for (int i=0;i<10;i++){Thread.sleep(1000);System.out.println(i+"秒----");}}

日志与监控

  • 日志文件存放在 /var/log/rabbitmq下面,可以查看
  • 通过web管控台进行监控

消息追踪

  • 在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
  • 在RabbitMQ中可以使用Firehose和rabbitmg tracing插件功能来实现消息追踪。
  • Firehose
  • tracing方式

RabbitMQ的应用问题

消息可靠性保障

消息幂等性保障

  • 幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
  • 在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

RabbitMQ学习笔记(高级篇)相关推荐

  1. R语言学习笔记——高级篇:第十四章-主成分分析和因子分析

    R语言 R语言学习笔记--高级篇:第十四章-主成分分析和因子分析 文章目录 R语言 前言 一.R中的主成分和因子分析 二.主成分分析 2.1.判断主成分的个数 2.2.提取主成分 2.3.主成分旋转 ...

  2. 数据库MySQL学习笔记高级篇(周阳)

    数据库MySQL学习笔记高级篇 1. mysql的架构介绍 mysql简介 高级Mysql mysqlLinux版的安装 mysql配置文件 mysql逻辑架构介绍 mysql存储引擎 2. 索引优化 ...

  3. OpenLayers学习笔记高级篇(一、openlayers画点线面)

    话不多说直接上代码: <!doctype html> <html xmlns=http://www.w3.org/1999/xhtml> <head> <me ...

  4. OpenLayers学习笔记高级篇(二、地图控件)

    在OpenLayers 3中,地图控件指的是下图标注的这些,包括缩放按钮,标尺,版权说明,指北针等等. 他们不会随着地图的移动而移动,一直处于一个固定的位置. 在实现上,并不是在画布上绘制的,而是使用 ...

  5. OpenLayers学习笔记高级篇(四、地图开发实战之地图要素的增删改查)

    一切都准备好了,现在终于可以通过ol3加载配置好的数据了.上一节中最后的预览结果,大家已经看到了,此处我们自己通过ol来实现这个预览页面,直接上代码如下: 1.加载Geoserver发布的wfs地图服 ...

  6. Redis学习笔记①基础篇_Redis快速入门

    若文章内容或图片失效,请留言反馈.部分素材来自网络,若不小心影响到您的利益,请联系博主删除. 资料链接:https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA( ...

  7. Postgresql学习笔记-高级语法篇

    Postgresql学习笔记-高级语法篇 Postgresql 约束 Postgresql约束用于规定表中的数据规则. 如果存在违反约束的数据行为,行为会被约束终止. 约束可以在创建表的时候就规定(通 ...

  8. RabbitMQ学习总结 第一篇:理论篇

    目录 RabbitMQ学习总结 第一篇:理论篇 RabbitMQ学习总结 第二篇:快速入门HelloWorld RabbitMQ学习总结 第三篇:工作队列Work Queue RabbitMQ学习总结 ...

  9. RabbitMQ 学习笔记

    RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...

最新文章

  1. python分类算法_用Python实现KNN分类算法
  2. Tomcat6 内存和线程配置
  3. 安装SQL2000,挂起无法安装问题
  4. Apache——启动错误:[Cannot load modules/mod_actions.so into server]解决方案
  5. 实战绕过宝塔PHP disable_function 限制getshell
  6. SAP Cloud for Customer的CTI呼叫中心解决方案
  7. Dapr牵手.NET学习笔记:发布-订阅
  8. UBIFS - UBI File-System
  9. C++基础与深度解析第零章:C++基础笔记
  10. 科学计算与MATLAB语言之基础知识
  11. STM32接收红外遥控数据
  12. 计算机网络第四章课后答案(第七版谢希仁著)
  13. 1079:计算分数加减表达式的值
  14. 知识图谱从0到-1的笔记——6.知识推理
  15. 小米用户画像_小米10/10pro的目标用户画像是怎样的呢??
  16. 计算机组装如何配置更好更便宜,电脑组装越贵越好?小白DIY组装电脑的几个误区...
  17. excel 查询 表关联_在Excel中计算查询表
  18. 11年北漂老码农转行!黯然离场...
  19. C语言入门(初识C语言)
  20. Visual Studio 2019 及更低版本中 Microsoft C++ 编译器对 C++ ISO 标准支持情况

热门文章

  1. 多模态视频商品检索记录再刷新!第二届淘宝直播算法大赛完美落幕
  2. 【直播预告】SDWAN+安全,如何帮助企业组建安全智能的自有网络
  3. Python基础学习第三天——条件控制与while循环语句
  4. 学计算机的心理300字,【必备】心理作文300字10篇
  5. 拿什么拯救你 我的“游戏寡妇”、“游戏孤儿”们
  6. 泰拉瑞亚mod鸿蒙方舟,三款高评分的沙盒生存类手游,经典之作泰拉瑞亚你有玩过吗?...
  7. 生存类html5小游戏,紧张绝望!刺激爽爆!盘点最好玩的PC生存类游戏(中)
  8. C#微信登录-手机网站APP应用
  9. 【Linux】网络管理与相关应用
  10. GEE学习笔记:在Google Earth Engine(GEE)计算两个时间的时间间隔