1 RabbitMQ自带的重试机制

1 示例代码

rabbitMQ为自带了消息重试机制:当消费者消费消息失败时,可以选择将消息重新“推送”给消费者,直至消息消费成功为止

开启自带的重试机制,需要如下几个配置:

1 开启消费者手动应答机制,对应的springboot配置项:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

2 消费异常时,设置消息重新入列

 boolean multiple = false; // 单条确认boolean requeue  = true; // 重新进入队列,谨慎设置!!!很容易导致死循环,cpu 100%channel.basicNack(tag, multiple, requeue);

以下是运行例子:

消费者代码如下:

package com.fmi110.rabbitmq;
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
​
import java.util.concurrent.atomic.AtomicInteger;
​
/*** @author fmi110* @description 消息消费者* @date 2021/7/1 16:08*/
@Component
@Slf4j
public class RabbitConsumer {
​AtomicInteger count = new AtomicInteger();
​@RabbitListener(queues="my-queue")public void consumer1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
​log.info(">>>> consumer1 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data);try {Thread.currentThread().sleep(1000);int i = 1/0;channel.basicAck(tag,true); // 确认消息消费成功} catch (Exception e) {log.error(">>>>消费异常,消息重新进入队列并消费");boolean multiple = false; // 单条确认boolean requeue  = true; // 重新进入队列,谨慎设置!!!channel.basicNack(tag, multiple, requeue);}}
​@RabbitListener(queues="my-queue")public void consumer2(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{log.info(">>>> consumer2 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data);
​try {Thread.currentThread().sleep(1000);int i = 1/0;channel.basicAck(tag,true); // 确认消息消费成功} catch (Exception e) {log.error(">>>>消费异常,消息重新进入队列并消费");boolean multiple = false; // 单条确认boolean requeue  = true; channel.basicNack(tag, multiple, requeue);}}
}
​

这里模拟了两个消费者 consumer1consumer2 ,并在逻辑中人为设置异常 int 1/0 , 在异常捕获中通过

 channel.basicNack(tag, false, true);

设置消息重新进入队列,最终推给消费者再次消费。运行结果如下:

日志里包含了几个信息点:

  1. 消费者每次只消费一条消息,因为我设置了 spring.rabbitmq.listener.simple.prefetch=1

  2. 消息推送使用的 round-robin 算法

  3. rabbitMQ的消费方式有推和拉两种方式,springboot创建的消费者模式使用的推的方式消费 this.channel.basicConsume()

2 潜在问题

如运行日志所示,重进进入队列的消息,会在队列头部,直接再次推送给消费者消费,如果是因为代码逻辑问题,将会导致消息一直消费失败,导致死循环!!!

比较合理的做法是,重试一定次数消费后,如果仍然失败,则终止重试,将消费异常的消息保存,并上报异常,由人工介入处理。

2 结合spring-retry和死信队列实现消息重试

一个比较合理的重试机制如下:

  1. 消息消费出现异常时,借助springboot提供的重试机制进行重试

    因为使用的spring-retry,所以方法中必须抛出异常,否则spring-retry不会被触发!!!

  2. 重试仍然失败时,消息转发到死信队列,死信队列的消费者记录并上报异常信息

    要实现消息消费失败自动转发到死信队列,则rabbitmq在创建消息队列时,需要指定与之绑定的死信队列

完整的实例代码如下:

1 配置文件 application.properties:

这里注释掉了 spring.rabbitmq.listener.simple.acknowledge-mode=manual ,这样在消息消费失败时,会自动转到死信队列,如果开启手动确认机制,必须调用 chanel.basicNack(tag,false,false) 消息才会进入死信队列!!!

# 应用名称
spring.application.name=rabbitmq
server.port=8080
server.servlet.context-path=/
​
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 指定连接的虚拟主机,可以在rabbitMQ控制台查看对应的虚拟主机的名字
spring.rabbitmq.virtual-host=my_vhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
​
spring.rabbitmq.listener.simple.prefetch=1
​
# 开启 publish-comfirm 机制和消息路由匹配失败退回机制
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消费者应答 ack 机制
# spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 开启spring提供的retry
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.initial-interval=3000

2 RabbitConfig

主要在程序启动时,做如下设置:

  1. 创建死信队列和死信交换器,并将死信队列绑定到死信交换器。

  2. 创建普通队列和普通交换器,并将普通队列绑定到普通交换器,同时将死信队列与普通队列关联,这样当消息消费失败时,消息会进入死信队列(使用了自动 ack模式)。

package com.fmi110.rabbitmq.config;
​
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
import java.util.HashMap;
​
​
/*** @author fmi110* @description rabbitMQ 配置类* @date 2021/7/1 15:08*/
@Configuration
@Slf4j
public class RabbitConfig {
​String dlQueueName  = "my-queue-dl"; // 普通队列名称String dlExchangeName = "my-exchange-dl"; // 死信交换器名称String dlRoutingKey   = "rabbit.test";
​String queueName = "retry-queue";String exchangeName = "my-exchange"; // 普通交换器名称
​/*** 创建死信队列** @return*/@Beanpublic Queue queueDL() {
​return QueueBuilder.durable(dlQueueName) // 持久化队列.build();}
​/*** 创建死信交换机** @return*/@Beanpublic TopicExchange exchangeDL() {return new TopicExchange(dlExchangeName, true, false);}
​/*** 绑定操作*/@Beanpublic Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) {log.info(">>>> 队列与交换器绑定");return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey);}
​/*** 创建持久化队列,同时绑定死信交换器** @return*/@Beanpublic Queue queue() {log.info(">>>> 创建队列 retry-queue");HashMap<String, Object> params = new HashMap<>();params.put("x-dead-letter-exchange", dlExchangeName);params.put("x-dead-letter-routing-key", dlRoutingKey);
​return QueueBuilder.durable(queueName) // 持久化队列.withArguments(params) // 关联死信交换器.build();}
​
​/*** 创建交换机** @return*/@Beanpublic TopicExchange exchange() {log.info(">>>> 创建交换器 my-exchange");boolean durable    = true; // 持久化boolean autoDelete = false; // 消费者全部解绑时不自动删除return new TopicExchange(exchangeName, durable, autoDelete);}
​/*** 绑定队列到交换机** @param queue* @param exchange* @return*/@Beanpublic Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) {log.info(">>>> 队列与交换器绑定");return BindingBuilder.bind(queue).to(exchange).with("rabbit.test");}
​
//    /**
//     * spring-retry重试机制:当重试次数达到最大,消息仍然消费失败时回调。
//     * 如果开启这个类,则死信队列失效,消息消费失败,即使配置了死信队列,消息也不会进入死信队列。
//     * 重试失败回调和死信队列只能二选一!!!spring 提供回调实现类有如下几个:
//     * RejectAndDontRequeueRecoverer :消费失败,并且消息不再入列,spring默认使用。
//     * ImmediateRequeueMessageRecoverer :将消息重新入列
//     * RepublishMessageRecoverer:转发消息到指定的队列,
//     * @return
//     */
//    @Bean
//    public MessageRecoverer messageRecoverer(){
//        return new MessageRecoverer() {
//            @Override
//            public void recover(Message message, Throwable cause) {
//                log.info(message.toString());
//                log.info("spring-retry重试次数达到最大,消息仍然失败的回调");
//                // TODO: 记录错误信息并上报
//            }
//        };
//    }
}

3 消息生产者 RabbitProducer

这里为了保证消息能确保消息发送,配置了 confirm 确认机制

package com.fmi110.rabbitmq;
​
import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
/*** @author fmi110* @description 消息生产者* @date 2021/7/1 15:08*/
@Component
@Slf4j
public class RabbitProducer {@AutowiredRabbitTemplate rabbitTemplate;
​/*** 1 设置 confirm 回调,消息发送到 exchange 时回调* 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调** correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用*/@PostConstructpublic void enableConfirmCallback(){// #1/*** 连接不上 exchange或exchange不存在时回调*/rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{if (!ack) {log.error("消息发送失败");// TODO 记录日志,发送通知等逻辑}});
​// #2/*** 消息投递到队列失败时,才会回调该方法* message:发送的消息* exchange:消息发往的交换器的名称* routingKey:消息携带的路由关键字信息*/rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->{log.error("消息路由失败");// TODO 路由失败后续处理逻辑});}
​public void send(String msg){String exchangeName = "my-exchange";// String routingKey   = "aaa.xxx";String routingKey   = "rabbit.test";rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);}
}

4 消息消费者 RabbitConsumer

package com.fmi110.rabbitmq;
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
​
import java.util.concurrent.atomic.AtomicInteger;
​
​
/*** @author fmi110* @description 消息消费者* @date 2021/7/1 16:08*/
@Component
@Slf4j
public class RabbitConsumer {
​AtomicInteger count = new AtomicInteger();
​/*** 普通队列消费者* @param data* @param channel* @param tag* @throws Exception*/@RabbitListener(queues="retry-queue")public void consumer(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
​log.info(">>>> consumer 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data);// TODO 消息处理逻辑throw new RuntimeException("抛出异常,模拟消费失败,触发spring-retry");}
​/*** 死信队列消费者* @param data* @param channel* @param tag* @throws Exception*/@RabbitListener(queues="my-queue-dl")public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{log.info(">>>> 死信队列消费 tag = {},消息内容 : {}",tag,data);
//        channel.basicNack(tag, false, false);}
}

5 Controller

用于触发发送消息

package com.fmi110.rabbitmq.controller;
​
​
import com.fmi110.rabbitmq.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
​
import java.util.HashMap;
​
@RestController
public class TestController {@AutowiredRabbitProducer rabbitProducer;
​@GetMapping("/test")public Object test() {
​rabbitProducer.send("this is a message");
​HashMap<String, Object> result = new HashMap<>();result.put("code", 0);result.put("msg", "success");return result;}
}

6 运行结果

运行日志如下:

: >>>> consumer 消费 tag = 1,次数count=1,消息内容 : this is a message
: >>>> consumer 消费 tag = 1,次数count=2,消息内容 : this is a message
: >>>> consumer 消费 tag = 1,次数count=3,消息内容 : this is a message
o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message
(Body:'this is a message' MessageProperties
[headers={spring_listener_return_correlation=2840e95b-8544-4ed8-b3ed-8ba02aee2729},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=my-exchange, receivedRoutingKey=rabbit.test,
deliveryTag=1, consumerTag=amq.ctag-a5AZEb9AYpOzL6mQJQIvaQ,
consumerQueue=retry-queue])
​
...
Caused by: java.lang.RuntimeException: 抛出异常,模拟消费失败,触发spring-retryat com.fmi110.rabbitmq.RabbitConsumer.consumer(RabbitConsumer.java:36) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
....
​
: >>>> 死信队列消费 tag = 1,消息内容 : this is a message

从日志可看出,普通队列的消费者一共消费了三次仍然失败,最后回调spring提供 RejectAndDontRequeueRecoverer ,然后消息进入死信队列被消费。

7 pom依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>compile</scope></dependency></dependencies>

RabbitMQ (三)消息重试相关推荐

  1. RabbitMQ之消息重试机制

    1.消息重试机制 消费者消费消息的时候,发生异常情况,导致消息未确认,该消息会被重复消费(默认没有重复次数,即无限循环消费),但可以通过设置重试次数以及达到重试次数之后的消息处理 spring:rab ...

  2. rabbitmq 持久化_RabbitMQ原理与相关操作(三)消息持久化

    现在聊一下RabbitMQ消息持久化: 问题及方案描述 1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间.在此过程中可能会出现一些意外,比如消息接收 ...

  3. RabbitMQ 可靠消息传输实战--云平台技术栈12

    导读:之前发布了云平台技术栈(ps:点击可查看),本文主要说一下其中的RabbitMQ! 作者:极客慧 https://my.oschina.net/jikeh/blog/2207127 可能是缓存架 ...

  4. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  5. RabbitMQ之消息确认机制(事务+Confirm)

    概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...

  6. RabbitMQ的消息确认ACK机制

    1.什么是消息确认ACK. 答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失.为了确保数据不会丢失,RabbitMQ支持消 ...

  7. rabbitmq 取消消息_认识RabbitMQ从这篇文章开始

    关于RabbitMQ 出身:诞生于金融行业的消息队列 语言:Erlang 协议:AMQP(Advanced Message Queuing Protocol 高级消息队列协议) 关键词:内存队列,高可 ...

  8. rabbitmq 消费端代码获取队列名称_C#调用RabbitMQ实现消息队列的示例代码

    前言 我在刚接触使用中间件的时候,发现,中间件的使用并不是最难的,反而是中间件的下载,安装,配置才是最难的. 所以,这篇文章我们从头开始学习RabbitMq,真正的从头开始. 关于消息队列 其实消息队 ...

  9. RabbitMQ的消息确认机制

    转:https://www.toutiao.com/a6583957771840913934/?tt_from=mobile_qq&utm_campaign=client_share& ...

最新文章

  1. python爬取文章保存_爬取博主所有文章并保存到本地(.txt版)--python3.6
  2. Cell子刊:大鱼大肉吃三天,体重未动大脑先变,不仅发胖还会发炎
  3. PHP ORM框架ezpdo(2)之EZPDOSQL
  4. CSS清浮动处理(Clear与BFC)
  5. Linux系统的启动引导过程(转)
  6. Java算法之寻找旋转数组中的最小值
  7. IOCP 下行为投递的关键点
  8. 全国计算机演示文稿,全国计算机等级考试二级office演示文稿题目[文].pdf
  9. 8乘8led点阵显示数字_光立方8乘8乘8立体点阵
  10. 优思学院|2021中质协六西格玛绿带考题及答案
  11. 安卓第三方支付之微信支付
  12. 关于AC6003、6005、6605版本关联WIFI6代产品方法
  13. MIME格式详细介绍
  14. JVM---数据存储和访问(类文件结构)
  15. JD token解密之旅
  16. 怎样判断一个诊断(风险预测)模型的好坏?
  17. HDoj 2604 queuing
  18. PHP将图片验证码转换成base64格式
  19. MySQL-存储过程与函数(PRECEDURE/FUNCTION/DECLARE/SET/CALL/DELIMITER)
  20. 深富策略:机构配置核心赛道股催生A股年底翘尾行情

热门文章

  1. Bitcoin.com| 使用加密的三大驱动因素​——BCH City总结
  2. 对于非技术人员来说,闪电网络和BCH分别意味着什么?
  3. Could not find action or result
  4. flutter dio+rxdart
  5. 使用Ranger对Hive数据进行脱敏
  6. 《Python基础教程第二版》第二章-Python列表和元祖
  7. 在Windows上编译UEFI SDK 2018/OVMF的方法
  8. kali开启禁止或删除ssh 开机启动
  9. Windows 8计算机管理器怎么打开?
  10. android studio无线真机调试------Android