最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI。不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的。

于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习!

在这里插入图片描述

这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,其实整体的内容比较简单,有时候事情就是这么神奇,越是简单的东西就越容易出错。

可以看到使用了 RabbitMQ 以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:

  • 消息生产者 - > rabbitmq服务器(消息发送失败)

  • rabbitmq服务器自身故障导致消息丢失

  • 消息消费者 - > rabbitmq服务(消费消息失败)

所以说能不使用中间件就尽量不要用,如果为了用而用只会徒增烦恼。开启消息确认机制以后,尽管很大程度上保证了消息的准确送达,但由于频繁的确认交互,rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议你用消息确认机制。


下边我们先来实现springboot + rabbitmq消息确认机制,再对遇到的问题做具体分析。

一、准备环境

1、引入 rabbitmq 依赖包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、修改 application.properties 配置

配置中需要开启 发送端消费端 的消息确认。

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

3、定义 Exchange 和 Queue

定义交换机 confirmTestExchange 和队列 confirm_test_queue ,并将队列绑定在交换机上。

@Configuration
public class QueueConfig {@Bean(name = "confirmTestQueue")public Queue confirmTestQueue() {return new Queue("confirm_test_queue", true, false, false);}@Bean(name = "confirmTestExchange")public FanoutExchange confirmTestExchange() {return new FanoutExchange("confirmTestExchange");}@Beanpublic Binding confirmTestFanoutExchangeAndQueue(@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,@Qualifier("confirmTestQueue") Queue confirmTestQueue) {return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);}
}

rabbitmq 的消息确认分为两部分:发送消息确认 和 消息接收确认。

在这里插入图片描述

二、消息发送确认

发送消息确认:用来确认生产者 producer 将消息发送到 brokerbroker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。

消息从 producerrabbitmq broker有一个 confirmCallback 确认模式。

消息从 exchangequeue 投递失败有一个 returnCallback 退回模式。

我们可以利用这两个Callback来确保消的100%送达。

1、 ConfirmCallback确认模式

消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.error("消息发送异常!");} else {log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);}}
}

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationDataackcause

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。

  • ack:消息投递到broker 的状态,true表示成功。

  • cause:表示投递失败的原因。

但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback

2、 ReturnCallback 退回模式

如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);}
}

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

下边是具体的消息发送,在rabbitTemplate中设置 ConfirmReturn 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id10000000000

@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ConfirmCallbackService confirmCallbackService;@Autowiredprivate ReturnCallbackService returnCallbackService;public void sendMessage(String exchange, String routingKey, Object msg) {/*** 确保消息发送失败后可以重新返回到队列中* 注意:yml需要配置 publisher-returns: true*/rabbitTemplate.setMandatory(true);/*** 消费者确认收到消息后,手动ack回执回调处理*/rabbitTemplate.setConfirmCallback(confirmCallbackService);/*** 消息投递到队列失败回调处理*/rabbitTemplate.setReturnCallback(returnCallbackService);/*** 发送消息*/rabbitTemplate.convertAndSend(exchange, routingKey, msg,message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;},new CorrelationData(UUID.randomUUID().toString()));}

三、消息接收确认

消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {@RabbitHandlerpublic void processHandler(String msg, Channel channel, Message message) throws IOException {try {log.info("小富收到消息:{}", msg);//TODO 具体业务channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}  catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); }}}
}

消费消息有三种回执方法,我们来分析一下每种方法的含义。

1、basicAck

basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple)

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行acknackreject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

2、basicNack

basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列。

3、basicReject

basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列。

四、测试

发送消息测试一下消息确认机制是否生效,从执行结果上看发送者发消息后成功回调,消费端成功的消费了消息。用抓包工具Wireshark 观察一下rabbitmq amqp协议交互的变化,也多了 ack 的过程。

五、踩坑日志

1、别忘确认消息

这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。

2、消息无限投递

在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

@RabbitHandlerpublic void processHandler(String msg, Channel channel, Message message) throws IOException {try {log.info("消费者 2 号收到:{}", msg);int a = 1 / 0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}

但是有个问题是,业务代码一旦出现 bug 99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。

在这里插入图片描述

本地的CPU被瞬间打满了,大家可以想象一下当时在生产环境导致服务死机,我是有多慌。

而且rabbitmq management 只有一条未被确认的消息。

在这里插入图片描述

经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。

消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。

而我们当时的解决方案是,先将消息进行应答,此时消息队列会删除该条消息,同时我们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(msg));

但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入MySQL并推送报警,进行人工处理和定时任务做补偿。

3、重复消费

如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL、或者redis 将消息持久化,通过再消息中的唯一性属性校验。

特别推荐一个分享架构+算法的优质内容,还没关注的小伙伴,可以长按关注一下:长按订阅更多精彩▼如有收获,点个在看,诚挚感谢

springboot + rabbitmq 用了消息确认机制,感觉掉坑里了相关推荐

  1. RabbitMQ消息确认机制和消息重发机制

    一.机制 首先我们要知道一条消息的传递过程. 生产者 -> 交换机 ->  队列 我们的生产者生产消息,生产完成的消息发送到交换机,由交换机去把这个消息转发到对应的队列上.这其中我们可能在 ...

  2. RabbitMQ之消息确认机制(事务+Confirm)

    概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...

  3. RabbitMQ消息确认机制

    文章目录 1. 事务机制 2. Confirm模式 2.1 生产者 2.1.1 普通Confirm模式 2.1.2 批量Confirm模式 2.1.3 异步Confirm模式 2.2 消费者 3. 其 ...

  4. rabbitmq消息确认机制及死信队列的使用

    关于rabbitmq的基本概念和相关的理论这里就不做过多介绍了,在之前的篇幅中有过相应的介绍,也可以查询一些资料详细了解一下rabbitmq的基础知识,下面要介绍的点主要包括两个方面, 1.rabbi ...

  5. RabbitMQ 基本消息模型和消息确认机制

    ​01 前言 关于 RabbitMQ 服务器的安装,本章节不做介绍,请培养个人动手能力,自行百度解决.RabbitMQ 成功安装后(win 版),浏览器输入:localhost:15672,则可以进入 ...

  6. RabbitMQ消息确认机制-可靠抵达

    消息发送到被消费的流程: JAVA的生产端的发送数据----->Broker(消息服务器)-------->达到Exchange交换机------------->通过路由键到达Que ...

  7. RabbitMQ ACK消息确认机制 快速入门

    RabbitMQ 消息确认机制ACK ack机制保证的是broker和消费者之间的可靠性 ack表示的是消费端收到消息后的确认方式,有三种确认方式 自动确认:acknowledge="non ...

  8. RabbitMQ学习笔记(四)-消息确认机制

    引入 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 return ...

  9. RabbitMQ之订阅模式与主题模式,消息确认机制

    1.订阅模式 作用类似与微信公众号,你订阅了就可以接收到消息 解读: 1.一个生产者,多个消费者. 2.每一个消费者 都有自己的队列 3.生产者没有直搂把洧息发送到队列而是发到了交换机 转发器exch ...

最新文章

  1. 机器学习(11)线性回归(2)实战 -- 正规方程优化、梯度下降优化(波士顿房价预测)
  2. 8. An Introduction to MCMC for Machine Learning (2)
  3. 通过pxe远程安装linux,通过PXE远程安装多台Linux系统
  4. ubuntu安装-Caffe依赖
  5. URL的getFile()和getPath()方法的区别
  6. linux pv命令,pv命令_Linux pv命令使用详解:用来度量执行命令的具体信息
  7. bzoj 1878: [SDOI2009]HH的项链(主席树)
  8. Navicat连接Mysql报错:Client does not support authentication protocol requested by server;
  9. 三体第一部(13到22节)
  10. 【Axure】--原型设计简易手册
  11. 图像处理学习--银行卡号码识别(1)
  12. html加减乘除除数不能为零,0不能做除数(数学中0为什么不能做除数)
  13. 华夏出入口车牌识别摄像机
  14. 名帖330 王羲之 行草《平安帖》《何如帖》《奉橘帖》
  15. 搭建2048网页小游戏
  16. 1ppi等于多少dpi_图片知多少?
  17. 网易2018校园招聘编程题真题集合 详解
  18. 抄代码对自己编程提高有用吗?
  19. 二十六个英语字母相关的公司商标
  20. 网传阿里要裁员2万人,附我的一些思考!

热门文章

  1. Flask中的重定向redirect和url_for
  2. PTA数据结构与算法题目集(中文)7-38
  3. openwrt linux rootfs 分区表,Openwrt扩展rootfs到U盘
  4. spark 算子例子_10年大数据架构师,用一文带你玩转Spark计算框架,你能读懂吗?...
  5. 蓝桥杯国赛-大胖子走迷宫
  6. php遍历数组哪个效率高,PHP遍历数组的三种方法及效率对比分析
  7. 几何分布的期望和方差
  8. lenovo电脑_联想推出了其最新的ThinkBook笔记本电脑产品ThinkBookPlus
  9. 如何识别 Linux 上的文件分身
  10. HDU 2300 Crashing Robots