文章目录

  • 一、关于消息的可靠性
  • 二、生产者发送消息对象
  • 三、将消息发送给交换机
  • 四、将消息发送给队列
  • 五、将消息发送给消费者
  • 六、保证消息的幂等性
  • 七、死信消息的补偿【存在问题,待完善】

一、关于消息的可靠性

如上图,关于消息的可靠性,无非就是要保证这三个关键:

  1. 保证生产者发送的消息一定能发送到交换机
  2. 在消息队列中,交换机的消息一定能够发送给路由
  3. 保证消息一定能够从队列发送到消费者,且消费者一定能够消费成功

二、生产者发送消息对象

首先,创建接收消息的交换机与队列:

@Configuration
public class ProviderConfig {// 创建交换机@Beanpublic DirectExchange getProviderExchange() {// 第一个参数是交换机的名字// 第二个参数是交换机是否持久化(即使出现问题,服务器重启,交换机依然存在)// 第三个参数是交换机是否会自动删除(如果没有绑定的话)// 第四个参数是扩展参数,可设置为nullreturn new DirectExchange("pro_exchange", true, false, null);}// 创建队列@Beanpublic Queue getProviderQueue() {// 第一个参数是队列的名字// 第二个参数是队列是否持久化(即使出现问题,服务器重启,队列依然存在)// 第三个参数是队列是否独占(独占则表示只能自己去监听这个队列)// 第四个参数是队列是否会自动删除(如果没有绑定的话)// 第五个参数是扩展参数,可设置为nullreturn new Queue("pro_queue", true, false, false, null);}// 将交换机与队列绑定起来,并指定路由key为pro@Beanpublic Binding getProviderBinding() {return BindingBuilder.bind(getProviderQueue()).to(getProviderExchange()).with("pro");}
}

然后,创建需要发送的消息对象,并发送消息对象:

// 创建消息属性对象,在消息属性对象中,会存放许多消息相关的信息
MessageProperties properties = new MessageProperties();
// 接收消息的交换机
properties.setReceivedExchange("pro_exchange");
// 接收消息的路由的key
properties.setReceivedRoutingKey("pro");
// 通过UUID给消息一个唯一ID
properties.setMessageId(UUID.randomUUID().toString());
// 创建消息对象,第一个参数为消息内容,要求是字节数组,第二参数则是前面创建的消息属性对象
Message message = new Message("这是一条消息".getBytes(), properties);
// 创建CorrelationData对象,在这个对象中会有个id属性,用来表示消息的唯一性
// 用于后面的消息重发
CorrelationData correlationData = new CorrelationData();
correlationData.setReturnedMessage(message);
rabbitTemplate.convertAndSend("pro_exchange", "pro", message, correlationData);

三、将消息发送给交换机

在这一步中,我们要做到的是,保证生产者生成的消息要一定能够发送到交换机,那么具体该如何做呢?

因为通常生产者发送消息给交换机失败是因为网络波动所引起的,所以我们只需要配置发送失败后的确认回调,然后重新发送消息就可以了,具体做法如下:

首先,我们要修改生产者的application.yml配置:

# 添加如下配置
spring:rabbitmq:# 配置开启如果消息发送失败的确认机制publisher-confirm-type: simple

然后,创建消息发送失败的回调类:

// 当消息发送给交换机失败之后,就会回调这个类中的方法
@Component
public class TransCallback implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 关于@PostContruct这个注解,这个注解标注的方法是在构造方法调用之后调用的* 构造方法是构造对象,这个注解标注的方法则是对对象进行初始化*/@PostConstructpublic void init() {// 配置生产者发送消息给交换机失败后的确认回调rabbitTemplate.setConfirmCallback(this);}/*** 生产者发送消息给交换机失败之后就会进入这个方法* 注:如果第一次发送就成功了也就不会进入这个方法了** @param correlationData 之前在发送消息对象的时候创建的CorrelationData对象* @param b               消息是否发送成功* @param s               消息发送失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {//如果消息发送失败,就重新发送消息if (!b) {Message message = correlationData.getReturnedMessage();MessageProperties properties = message.getMessageProperties();// 这里获取到之前放在消息属性对象中的交换机和路由String receivedExchange = properties.getReceivedExchange();String receivedRoutingKey = properties.getReceivedRoutingKey();// 重新发送消息// 如果还是失败会一直重试rabbitTemplate.convertAndSend(receivedExchange, receivedRoutingKey, message, correlationData);System.out.println("发送给交换机失败");} else {// 在进入这个方法后,经过之前的重试,如果最终发送消息成功了// 才会执行到这个else里面,并打印这句话System.out.println("消息发送给交换机成功");}}
}

四、将消息发送给队列

在上一步中,我们已经确保消息从生产者发送给交换机是没有问题的,而在这一步中,我们需要做的就是,保证消息从交换机中发送到队列中也是没有问题的,在这里我们是通过配置消息发送失败后的返回回调来实现,具体做法如下:

首先,依然是配置生产者的application.yml文件:

# 添加如下配置
spring:rabbitmq:publisher-confirm-type: simple# 配置开启如果消息发送失败的返回机制publisher-returns: true

然后,在前面的回调类中加上关于返回回调的部分:

// 当消息发送给交换机失败(确认机制),或者交换机发送给路由失败之后(返回机制),都会回调这个类中的方法
@Component
public class TransCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 关于@PostContruct这个注解,这个注解标注的方法是在构造方法调用之后调用的* 构造方法是构造对象,这个注解标注的方法则是对对象进行初始化*/@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);// 配置交换机发送消息给队列失败后的返回回调rabbitTemplate.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (!b) {System.out.println("发送给交换机失败");Message message = correlationData.getReturnedMessage();MessageProperties properties = message.getMessageProperties();String receivedExchange = properties.getReceivedExchange();String receivedRoutingKey = properties.getReceivedRoutingKey();rabbitTemplate.convertAndSend(receivedExchange, receivedRoutingKey, message, correlationData);} else {System.out.println("消息发送给交换机成功");}}/*** 与前面的确认机制类似,只有在交换机发送消息给路由失败后才会进入这个方法* 如果第一次就发送成功了,就不会进入这个方法了* * @param message 发送的消息对象* @param i 响应码* @param s 响应内容* @param s1 交换机* @param s2 路由*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println("发送给队列失败");// 重新构建新的CorrelationData对象CorrelationData correlationData = new CorrelationData();correlationData.setReturnedMessage(message);// 重新发送消息// 如果不成功会一直重试rabbitTemplate.convertAndSend(s1, s2, message, correlationData);}
}

五、将消息发送给消费者

在前面的过程中,我们已经确保了从生产者到路由这一部分都是不会出现问题的(即使出现问题也已经有了对应的处理方案),那么接下来,就是最后同时也是最重要的一步,如何确保消费者一定能够消费到这条消息。

要做到这一点,首先,我们必须保证三个持久化:

  1. 交换机必须是持久化的
// 在配置交换机bean的时候,参数中指定交换机是否持久化
@Bean
public DirectExchange getProviderExchange() {// 第二个参数是交换机是否持久化(即使出现问题,服务器重启,交换机依然存在)return new DirectExchange("pro_exchange", true, false, null);
}
  1. 路由必须是持久化的
// 在配置路由bean的时候,参数中指定路由是否持久化
@Bean
public Queue getProviderQueue() {// 第二个参数是队列是否持久化(即使出现问题,服务器重启,队列依然存在)return new Queue("pro_queue", true, false, false, null);
}
  1. 消息必须是持久化的
MessageProperties properties = new MessageProperties();
properties.setReceivedRoutingKey("pro");
properties.setReceivedExchange("pro_exchange");
properties.setMessageId(UUID.randomUUID().toString());
// 消息的持久化就是,即使出现问题,服务器重启,消息也不会丢
// 如果不给消息属性对象指定,默认创建的消息对象就是持久化的
// 可以指定消息对象不持久化,传入参数为MessageDeliveryMode.NON_PERSISTENT
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message(jianshe.getId().toString().getBytes(), properties);
CorrelationData correlationData = new CorrelationData();
correlationData.setReturnedMessage(message);
rabbitTemplate.convertAndSend("pro_exchange", "pro", message, correlationData);

在保证了交换机、队列和消息持久化的前提下,如何去确保消费者一定能够消费到这条消息呢?

这是通过重试机制来确保的,那么什么是重试机制呢?

重试机制其实就是消费者在消费消息成功之后,需要给MQ作出应答,如果MQ没有接收到这个应答,就会重新给消费者发送消息,一直到消费成功,MQ服务器接收到应答后结束,或者是超过重试次数之后,把消息放入死信队列,如果没有绑定死信队列的话,就会记录日志,然后丢掉这个消息。

那么重试机制具体该怎么操作?

首先,我们要修改消费者的application.yml配置:

springrabbitmq:listener:simple:# 消费者默认是自动应答方式,就是,只要消费者一监听到这个消息,MQ服务器就认为这个消息已经被消费了# 这样是不安全的,因为有可能消费者监听到了消息,但是在消费之前出异常了,消息其实并没有被消费# 配置手动应答acknowledge-mode: manual   # 不直接丢到死信队列default-requeue-rejected: false # 配置重试策略retry:   # 允许重试enabled: true# 第一次重试的间隔时间initial-interval: 3000ms  # 最大重试次数[加上最开始那一次]max-attempts: 3  # 最大间隔时间max-interval: 20000ms  # 重试因子   3    6   12   20multiplier: 2

然后,在消费者的监听方法中,增加消息消费成功或者失败后的逻辑:

@RabbitListener(queuesToDeclare = @Queue("pro_queue"))
public void getMsg(String msg, Channel channel, Message message) throws IOException {// 获取的消息的唯一IDString messageId = message.getMessageProperties().getMessageId();// 使用redis来保存重试次数,messageId作为键,次数作为值ValueOperations value = redisTemplate.opsForValue();try {System.out.println("消息正常消费");if (true) {throw new IOException();}// 如果消息正常消费了,就给MQ服务器作出应答channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 如果是IO异常,才重试// 因为通常是因为网络波动,作出应答失败抛出IO异常} catch (IOException e) {System.out.println("消息消费失败,重试");// 如果redis中不存在当前messageId,这个键,就添加if (value.get(messageId) == null) {value.set(messageId, 1);throw e;// 如果获取到重试次数<2(因为最开始进来那里也是一次,所以到2就已经算是3次了)} else if (((Integer) value.get(messageId)) < 2) {value.increment(messageId, 1);throw e;}// 如果重试了三次还没有消费成功,就将这个消息给死信(如果没有绑定死信,就会记录日志,然后丢掉这个消息)channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 如果是其他异常的话,就直接拒绝,然后给死信// 因为这类异常通常是逻辑的异常,重试依然会抛channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}
}

六、保证消息的幂等性

在确定消息已经会被消费者消费后,我们需要考虑一个新的问题:消息的幂等性问题,也就是消息的重复消费问题,我们可以想象一下,在MQ服务器发送消息给到消费者,消费者也已经成功消费了,在向MQ服务器作出应答的时候,出现了问题,导致MQ服务器并没有收到消费者的应答,由于MQ服务器并没有收到应答,MQ服务器会再次向消费者发送消息,如果这条消息再次被消费者消费的话,就出现了消息的幂等性问题。

那么这个问题应该如何解决呢?

在这里,我们可以使用自定义注解+AOP的方式来解决消息的幂等性问题。通过将消息的唯一ID和状态保存在redis中,如果我获取到redis中消息的状态是已经被消费了,那我就直接给MQ服务器作出应答,而不再对消息进行消费,具体操作如下:

首先,我们要自定义一个注解:

// 这个注解可以添加在方法上
@Target({ElementType.METHOD})
// 这个注解在运行时生效
@Retention(RetentionPolicy.RUNTIME)
public @interface CheckIdempotence {}

将注解添加到消费者监听消息的方法上:

@RabbitListener(queuesToDeclare = @Queue("pro_queue"))
@CheckIdempotence
public void getMsg(String msg, Channel channel, Message message) throws IOException {String messageId = message.getMessageProperties().getMessageId();ValueOperations value = redisTemplate.opsForValue();try {System.out.println("消息正常消费");if (true) {throw new IOException();}// 如果消费成功了,就将消息的状态修改为1(已消费)value.set(messageId + "lock", 1);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {System.out.println("消息消费失败,重试");if (value.get(messageId) == null) {value.set(messageId, 1);throw e;} else if (((Integer) value.get(messageId)) < 2) {value.increment(messageId, 1);throw e;}channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}
}

编写切面,对方法进行环绕增强,在进入方法前先获取一次消息的状态,如果没有消费过,那就放行,如果已经消费过了,就给MQ作出应答,不再放行方法:

// 切面注解
@Aspect
// 切面需要配置成一个bean
@Component
public class ListenerAspect {@Autowiredprivate RedisTemplate redisTemplate;// 对标注了@CheckIdempotence注解的方法进行增强@Around("@annotation(cn.tcx.hub.bank.customer.config.anno.CheckIdempotence)")public Object adviceListener(ProceedingJoinPoint joinPoint) throws Throwable {// 通过连接点对象获取到方法参数中的消息对象Message message = (Message) joinPoint.getArgs()[2];// 通过消息唯一ID获取redis中存放的消息状态if("1".equals(redisTemplate.opsForValue().get(message.getMessageProperties().getMessageId() + "lock"))) {// 如果获取到消息的状态为 1,则说明当前消息已经被消费过 // 直接给MQ服务器作出应答((Channel)joinPoint.getArgs([1]).basicAck(message.getMessageProperties().getDeliveryTag(), false);// 返回一个null,不会再去执行目标方法return null;}// 如果redis中不存在这个键,或者状态不是 1,则说明当前消息未被消费过// 那么久放行执行目标方法return joinPoint.proceed();}
}

七、死信消息的补偿【存在问题,待完善】

在前面的重试机制中,如果消息重试超过了重试次数还没有被消费的话,就会把消息丢弃或者发送给死信队列,那最终消息还是没有得到处理,通常来说,会有以下两种解决方案:

一是没有绑定死信交换机的情况下,通常会将消息的关键信息写入日志,然后丢掉消息,后续在通过查看日志做出统一的处理,但是通常日志内容会比较多,比较杂,处理起来比较繁琐;

二是绑定了死信交换机的情况下,我们可以写一个方法去监听死信队列,如果这个方法监听到了死信队列中的消息,就将消息的关键信息写入数据库,这样的话,信息都在数据库中,查看和处理都会比较方便。

这里我们采用第二种方案,具体做法如下:

首先,我们需要创建死信交换机和死信队列(其实与之前的创建交换机和队列的方式一样,只是其他队列可以将这个交换机绑定为死信交换机):

@Configuration
public class ProviderDlxConfig {@Beanpublic DirectExchange getDlxExchange() {return new DirectExchange("pro_dlx_exchange", true, false, null);}@Beanpublic Queue getDlxQueue() {return new Queue("pro_dlx_queue", true, false, false, null);}@Beanpublic Binding getDlxBinding() {return BindingBuilder.bind(getDlxQueue()).to(getDlxExchange()).with("dlx");}
}

接着,让之前创建的接收消息的队列将上面创建的交换机绑定为死信交换机:

@Bean
public Queue getProviderQueue() {// 扩展参数Map<String, Object> map = new HashMap<>();// 绑定死信交换机map.put("x-dead-letter-exchange", "pro_dlx_exchange");map.put("x-dead-letter-routing-key", "dlx");return new Queue("pro_queue", true, false, false, map);
}

然后,编写监听死信队列的方法,并对进入死信的消息进行处理,将其写入数据库:

@Component
public class DlxListener {@Autowiredprivate BcService bcService;// 监听死信队列的方法// 参数和之前的监听方法一样,不再赘述@RabbitListener(queuesToDeclare = @Queue("pro_dlx_queue"))public void dlxListener(String msg, Channel channel, Message message) {// 只要监听到了消息,直接获取到关键信息,写入数据库MessageProperties properties = message.getMessageProperties();// 通常关键信息有这些://        1.消息ID//        2.接收消息的交换机//        3.接收消息的队列//     4.消息的内容Bc bc = Bc.builder().messageid(properties.getMessageId()).body(new String(message.getBody())).exchange(properties.getReceivedExchange()).routingKey(properties.getReceivedRoutingKey())// 消息进入死信的时间.endTime(new Date())// 消息在数据库中的状态 0(未处理)1(已处理).status("0").build();bcService.insert(bc);}
}

保证RabbitMQ消息的可靠性总结相关推荐

  1. 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

    微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了. 今天,以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发 ...

  2. RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失

    1. 消息丢失源头 RabbitMQ 消息丢失的源头主要有以下三个: 生产者丢失消息 RabbitMQ 丢失消息 消费者丢失消息 下面主要从 3 个方面进行说明并提供应对措施 2. 生产者丢失消息 R ...

  3. 11 RabbitMQ消息的可靠性保障

    RabbitMQ消息的补偿机制--确保消息百分百发送成功 首先producer操作自己的数据库,发送一条消息,将这个消息存储在Q1这个队列中. 然后Consumer拿Q1中的消息去消费,同时操作自己的 ...

  4. 如何保证RabbitMQ消息队列的高可用?

    RabbitMQ 有三种模式:单机模式,普通集群模式,镜像集群模式. 单机模式:就是demo级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式 普通集群模式:意思就是在多台机器上启动多个Rabb ...

  5. 【MQ】如何确保RabbitMQ消息可靠性传递?

    文章内容 1. Producer-to-Exchange 2. Exchange-to-Queue 3. Queue Sotrage 4. Queue-to-Consumer 5. Others 正在 ...

  6. RabbitMQ消息可靠性投递及分布式事务最终一致性实现

    RabbitMQ消息可靠性投递就是保证消息生产者能够将消息百分百投递到RabbitMQ服务器,并在传递过程中不丢失.然而在生产环境中由于网络中断.网络不稳定等原因导致消息在投递过程中丢失,这或许会造成 ...

  7. RabbitMQ消息队列(一)《Java-2021面试谈资系列》

    RabbitMQ RabbitMQ消息队列 一.中间件 1.什么是中间件 2.中间件技术及架构概述 3.消息中间件 1.消息中间件的分布式架构 2.消息中间件使用场景 3.常见的消息中间件 4.消息中 ...

  8. RabbitMQ消息

    如何确保RabbitMQ消息的可靠性? 开启生产者确认机制,确保生产者的消息能到达队列 开启持久化功能,确保消息未消费前在队列中不会丢失 开启消费者确认机制为auto,由spring确认消息处理成功后 ...

  9. RabbitMQ(十):RabbitMQ 如何保证消息的可靠性

    一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失. 一 消息生产者没有把消息成功发送到MQ 1.1 事务机制 AMQP协议提供了事务机制,在投递消息 ...

最新文章

  1. signature=eddfa127dafaf9a7c1ea87598961fbc9,Slingerende vrachtwagens verleden tijd?
  2. python sub 不区分大小写_解决Python列表字符不区分大小写的问题
  3. 1.void main
  4. 【转】sharepoint foundation 2013升级sp1补丁后无法使用搜索功能
  5. python递归算法案例教案_python教案
  6. shell技巧(sed 断句、读取指定行) 【ZT】
  7. cocos2dx 3.0 触摸机制
  8. table内容保存到Excel中
  9. ssh与tcp wappers
  10. linux进程sleep硬盘,linux下线程调用sleep,进程挂起
  11. mac教程:磁力种子qBittorrent 使用教程
  12. php唤起微信打开网址,点击链接打开微信再跳转到微信内部浏览器的解决方案
  13. SQL Server 2014下载及安装教程
  14. html div.menus,性感的CSS菜单(Menus)
  15. 【C语言】输出国际象棋棋盘
  16. 我写的Javascript贪食蛇v1
  17. 【3d建模】全网最全3dmax快捷键【附软件安装包和角色基础教程下载】
  18. 最精简的QQ2005:只用最核心的几个文件
  19. matlab中simple函数怎么用,matlab里simple函数
  20. 计算机PS实验报告范文,PHOTOSHOP实验报告范文

热门文章

  1. 开发者必备的顶级Android开发工具,成功入职阿里
  2. tp+layui 时间戳转换
  3. 进制转换之十进制转换为十六进制
  4. 什么是大数据分析 主要应用于哪些行业?
  5. 三位数求最大公因数c语言,求最大公因数的三种算法
  6. hdmi怎么支持2k分辨率_为什么显示器闪瞎眼 HDMI线版本有讲究
  7. 腾讯TBS浏览服务打开word.pdf.ppt等文档的使用
  8. Java程序员跳槽之旅,离开京东,14面面试回顾和一点感想
  9. 如何正确的下载安装使用别人的laravel项目?
  10. 中等专业学校计算机教师,中等专业学校计算机老师年度工作总结