文章目录

  • 1. 什么是幂等性?
    • 1.1 消息队列的幂等性
    • 1.2 模拟重试机制
      • 1.2.1 生产者代码
      • 1.2.2 消费者代码
      • 1.2.3 消费者 application.yml 配置
  • 2. 如何保证消息幂等性,不被重复消费?
    • 解决方法

1. 什么是幂等性?

在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

HTTP方法的幂等性是指一次和多次请求某一个资源应该具有同样的副作用。幂等性属于语义范畴,正如编译器只能帮助检查语法错误一样,HTTP规范也没有办法通过消息格式等语法手段来定义它。

简之:一个请求,不管重复来多少次,结果是不会改变的。

1.1 消息队列的幂等性

如同HTTP方法的幂等性,消息队列同样会出现幂等性问题。

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;注意,RabbitMQ 这种消息重试(补偿)机制是默认的。

所以,MQ 消费者的幂等性问题,主要在于 MQ 的重试机制,因为网络原因或客户端延迟消费导致重复消费。

那么,如何合适选择重试机制?我们来看两种情况。

情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试?

需要重试

情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试?

不需要重试

总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务 job 健康检查+人工进行补偿

1.2 模拟重试机制

我们采用一种短信消费者客户端异常的情况来模拟 RabbitMQ 的重试机制。

@RabbitListener(queues = "fanout_sms_queue")
public void process(String msg) {System.out.println("短信消费者获取生产者消息msg:" + msg);int i = 1/0;
}

如上代码,很显然会报错,一担报错生产者的消息时不会被消费的?

@RabbitListener 底层使用 AOP 进行异常通知拦截,如果程序没有抛出异常信息,那么就会自动提交事务;如果 AOP 异常通知拦截有捕获到异常信息的话,就会自动实现重试(补偿)机制,同时,这个补偿机制的消息会缓存到 RabbitMQ 服务器端进行存放,一直重试到不抛出异常为止。

1.2.1 生产者代码
@Component
public class FanoutProducer {@Autowiredprivate AmqpTemplate amqpTemplate;/*** 发送消息** @param queueName 队列名称*/public void send(String queueName) {String msg = "my_fanout_msg:" + System.currentTimeMillis();Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();System.out.println(msg + ":" + msg);amqpTemplate.convertAndSend(queueName, message);}
}
1.2.2 消费者代码
@Component
public class FanoutEamilConsumer {@RabbitListener(queues = "fanout_eamil_queue")public void process(Message message) throws Exception {String revMessage = Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8")+ ",messageId:" + message.getMessageProperties().getMessageId();System.out.println(revMessage);}
}
1.2.3 消费者 application.yml 配置
spring:rabbitmq:####连接地址host: 127.0.0.1####端口号   port: 5672####账号 username: guest####密码  password: guest### 地址virtual-host: /admin_hostlistener:simple:retry:####开启消费者重试enabled: true####最大重试次数max-attempts: 5####重试间隔次数initial-interval: 3000server:port: 8081

我们通过 RabbitMQ 配置,增加了 RabbitMQ 重试时间以及重试次数限制,在一定程度上解决了重复消费的问题,接下来看一道常问的面试题。

2. 如何保证消息幂等性,不被重复消费?

其实,这个问题也算是 MQ 面试当中经常考察的一点,因为无论是什么 MQ 都会有这个问题。

首先通过上边我们了解了什么是“幂等性”,以及 MQ 幂等性问题的产生,所以我们要清楚为什么会出现重复性消费?在什么场景会出现重复消费?

解决方法

使用全局 MessageID 判断消费方使用同一个,解决幂等性问题。
或者使用业务逻辑保证唯一(比如订单号码)

生产者关键代码:

@Autowired
private AmqpTemplate amqpTemplate;/*** 发送消息** @param queueName 队列名称*/
public void send(String queueName) {String msg = "my_fanout_msg:" + System.currentTimeMillis();Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();System.out.println(msg + ":" + msg);amqpTemplate.convertAndSend(queueName, message);
}

如上,生产者在发送消息时(convertAndSend),给消息对象设置了唯一的 MessageID,只有该 MessageID 没有被消费者标记方能在重试机制中再次被消费。

消费者关键代码:

@RabbitListener(queues = "fanout_eamil_queue")
public void process(Message message) throws Exception {String revMessage = Thread.currentThread().getName()+ ",邮件消费者获取生产者消息msg:"+ new String(message.getBody(), "UTF-8")+ ",messageId:" + message.getMessageProperties().getMessageId();System.out.println(revMessage);发送邮件的逻辑XXX
}

如上,通过 message.getMessageProperties().getMessageId() 获取 MessageID,获取的 MessageID 可以用来判断是否已经被消费者消费过了,如果已经消费则取消再次消费。

通常怎么判断呢?

比如上方是一个邮件发送的消费者,在做补偿时,假如上一步邮件发送成功了,我们会把该 ID 存至 redis中,下次再调用时,先去 redis 判断是否存在该 ID 了,如果存在表明已经消费过了则直接返回,不再消费,否则消费,然后将记录存至 redis。

我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下微信公众号哈:niceyoo

RabbitMQ消息幂等性问题相关推荐

  1. rabbitmq系列(三)消息幂等性处理

    一.springboot整合rabbitmq 小说网 m.198200.com 我们需要新建两个工程,一个作为生产者,另一个作为消费者.在pom.xml中添加amqp依赖: <dependenc ...

  2. SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压

    1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...

  3. RabbitMQ(mq) 如何处理高并发、负载均衡、消息幂等性、丢失、消息顺序错乱问题?

    目录 介绍: 1.连接器(connection): 2.信道.通道(channel): 3.交换机(exchange): 4.队列(queue): 以下通过两个例子,让我们先来对rabbitmq 有个 ...

  4. 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

    微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了. 今天,以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发 ...

  5. Rabbitmq消息可靠投递和重复消费等问题解决方案

    消息的可靠性投递 在一些对数据一致性要求较高的业务场景里面,如果消息在发布和消费过程中出现了问题(消息丢失,消息重复消费),就会导致数据不一致,要做到消息的可靠性投递. 在RabbitMq里面提供了很 ...

  6. RabbitMQ消息100%不丢失?

    消息消费流程: 1.生产端发送消息到RabbitMQ; 2.RabbitMQ发送消息到消费端: 3.消费端消费消息: 以上3个步骤每个步骤都可能导致消息丢失,消息丢失并不可怕,可怕的是丢失了我们还不知 ...

  7. RabbitMQ常见幂等性、可靠性、顺序性问题及解决方案

    目录 如何保证幂等性 什么是幂等性 重复消费产生的场景 解决方案 如何保证可靠性 产生原因 解决方案 如何保证顺序性 产生原因 解决方案 参考 如何保证幂等性 如果消息的重复消费对业务有影响,那么就需 ...

  8. RabbitMQ消息队列常见面试题总结

    1.什么是消息队列: 1.1.消息队列的优点: (1)解耦:将系统按照不同的业务功能拆分出来,消息生产者只管把消息发布到 MQ 中而不用管谁来取,消息消费者只管从 MQ 中取消息而不管是谁发布的.消息 ...

  9. RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失

    1. 消息丢失源头 RabbitMQ 消息丢失的源头主要有以下三个: 生产者丢失消息 RabbitMQ 丢失消息 消费者丢失消息 下面主要从 3 个方面进行说明并提供应对措施 2. 生产者丢失消息 R ...

最新文章

  1. Linux关机命令和重启命令
  2. 利用 Flash 远程检测客户端安装的杀软
  3. 生病了女朋友说要「陪床」,结果真的是陪床不是陪我......
  4. PostgreSQL 9.6 keepalived主从部署
  5. matlab模拟简单孔径衍射图样,夫琅和费衍射实验文献综述
  6. stm32产生100k时钟信号_stm32f105/107系统时钟变慢
  7. 重新签名IOS .ipa文件 (包含第三方框架和插件)
  8. 一分钟搞懂的算法之BPE算法
  9. windows系统下运行bat脚本实现后台运行及停止jar文件
  10. 宿主机支持avx2指令集,为什么虚拟机cpu就不支持avx2指令集了
  11. URPF - 单播逆向路径转发
  12. BoundsChecker教程
  13. QQ桌球瞄准器开发(1)桌球瞄准器介绍与使用方法
  14. verilogVGA显示太极图案
  15. 【支付】网络支付-支付网关模式与虚拟账户模式
  16. 【风马一族_php】PHP运算
  17. 求有多少个连续字串中所有的字母都出现了偶数次
  18. 简单谈谈STM32(一) - 走近嵌入式
  19. 推荐几本经典计算机书籍
  20. 【算法题】2309. 兼具大小写的最好英文字母

热门文章

  1. [html] 说说你对H5的ServiceWorker的理解,它有什么运用场景?
  2. PS教程第二十一课:有了选区就有了界限
  3. [vue] 说说你对vue的template编译的理解?
  4. 前端学习(2621):更新品牌
  5. 工作57:element格式化内容
  6. “约见”面试官系列之常见面试题第三十一篇之vue-router得守卫(建议收藏)
  7. 前端学习(1752):前端调试值之网络请求的监控
  8. “睡服”面试官系列第八篇之iterator(建议收藏学习)
  9. 前端学习(1179):vue概述
  10. 前端学习(729):函数导读