Springboot RabbitMQ 基础使用、消息发送确认、签收
概述
rabbitMQ 会做一个系列,包括:安装、基础使用、高级队列、集群。
使用环境: jdk 8
、springboot 2.4.10
常见概念:
- AMQP:高级消息队列协议,这是一个消息应用的规范。
- Broker: 接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
- Channel:Channel 作为轻量级 Connection 极大减少了操作系统建立 TCP connection 的开销。
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
docker 安装
使用 docker 安装测试环境,在 dockerHub 可以查找版本:https://hub.docker.com/
选择带有控制界面的 management 版本(包含web管理页面):
docker pull rabbitmq:3.9.5-management
rabbit mq 默认两个端口:
- 5672 是默认应用访问端口
- 15672 是默认控制台 Web 端口号
docker run -d --name rabbitMqDocker -p 52365:5672 -p 32512:15672 -v /usr/local/docker/rabbit:/var/lib/rabbitmq --hostname rabbitMq -e RABBITMQ_DEFAULT_VHOST=mqDocker76 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.9.5-management
--hostname
: 主机名,RabbitMQ 的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名;-e
: 指定环境变量。- RABBITMQ_DEFAULT_VHOST:默认虚拟机名
- RABBITMQ_DEFAULT_USER:默认的用户名
- RABBITMQ_DEFAULT_PASS:默认用户名的密码
后续的控制台新增用户、权限细节略过。
Exchange 不同模式
准备环境,先添加依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application 配置:
spring:rabbitmq:host: 127.0.0.1port: 52365username: adminpassword: adminvirtual-host: mqDocker76
RabbitMQ 基本架构如下,然后开始分别测试三种模式。
Direct Exchange
直连模式基础用法,配置文件、生产、消费代码如下:
@Configuration
public class DirectRabbitConfig {/*** 队列 起名:directQueue*/@Beanpublic Queue directQueue() {// durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。return new Queue("directQueue", true, false, false);}/*** Direct交换机 起名:directExchange*/@BeanDirectExchange directExchange() {return new DirectExchange("directExchange", true, false);}/*** 绑定,将队列和交换机绑定, 并设置用于匹配键:directRouting*/@BeanBinding bindingDirect() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRouting");}
}
@RestController
@RequestMapping
public class DirectProducer {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/directMsg")public String directMsg() {Map<String, Object> map = new HashMap<>();map.put("messageId", String.valueOf(UUID.randomUUID()));map.put("data", "发送数据体" + System.currentTimeMillis());map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));//将消息携带绑定键值:directRouting 发送到交换机 directExchangerabbitTemplate.convertAndSend("directExchange", "directRouting", map);return JSONObject.toJSONString(map);}}
@Component
@RabbitListener(queues = "directQueue")
public class DirectConsumer {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("DirectReceiver 消费者收到消息 : " + message.toString());}}
运行请求,控制台输出:
DirectReceiver 消费者收到消息 : {data=发送数据体1630769307037, createTime=2021-09-04 23:28:27, messageId=c09bbfc8-5018-4f9f-b8fc-678cb42348d2}
上面是一对一的生产消费模式。
实际业务中,对消息生产者没有多少限制,只需要生产发送就可以,但是 消息消费需要保证不能出现重复消费 。
而消费端也不是一个服务在进行,工作队列就是这种情况:一个生产者,多个消费者。
spring:rabbitmq:listener:simple:# 多消费者轮询模式prefetch: 1 # 每个消费者都能收到的未被消费的最大消息数量
再新建一个消费者:
@Component
@RabbitListener(queues = "directQueue")
public class DirectConsumerTwo {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("消费者2,DirectReceiver 消费者收到消息 : " + message.toString());}
}
其实就是 轮询模式 。
Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。
这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
代码基本一样:
@Configuration
public class FanoutConfig {@Beanpublic Queue queueA() {return new Queue("fanout.A", true, false, false);}@Beanpublic Queue queueB() {return new Queue("fanout.B", true, false, false);}@Beanpublic Queue queueC() {return new Queue("fanout.C", true, false, false);}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}
@GetMapping("/fanoutMsg")
public String confirmMsg() {Map<String, Object> map = new HashMap<>();map.put("messageId", String.valueOf(UUID.randomUUID()));map.put("data", "发送数据体" + System.currentTimeMillis());map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));rabbitTemplate.convertAndSend("fanoutExchange", null, map);return JSONObject.toJSONString(map);
}
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("fanout.A 收到消息 : " + message.toString());}
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("fanout.B 收到消息 : " + message.toString());}
}
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是:
在它的路由键和绑定键之间是有规则的,大致如下:
- 路由键必须是一串字符,用小数点(.) 隔开
- 通配符 * ,代表一个占位符,或者说一个单词 ,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了
- 通配符 # ,代表一个或多个占位符,或者说一个或多个单词 ,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配
代码相似:
@Configuration
public class TopicConfig {@Beanpublic Queue queue1() {return new Queue("topic.queue1", true, false, false);}@Beanpublic Queue queue2() {return new Queue("topic.queue2", true, false, false);}@BeanTopicExchange topicExchange() {return new TopicExchange("topicExchange", true, false);}@BeanBinding bindingQueue1() {return BindingBuilder.bind(queue1()).to(topicExchange()).with("topic.queue1");}@BeanBinding bindingQueue2() {return BindingBuilder.bind(queue2()).to(topicExchange()).with("topic.#");}}
@RestController
@RequestMapping
public class TopicProducer {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/queue1")public String queue1() {Map<String, Object> map = new HashMap<>();map.put("messageId", String.valueOf(UUID.randomUUID()));map.put("data", "发送数据体" + System.currentTimeMillis());map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));//将消息携带绑定键值:directRouting 发送到交换机 directExchangerabbitTemplate.convertAndSend("topicExchange", "topic.queue1", map);return JSONObject.toJSONString(map);}@GetMapping("/queue2")public String queue2() {Map<String, Object> map = new HashMap<>();map.put("messageId", String.valueOf(UUID.randomUUID()));map.put("data", "发送数据体" + System.currentTimeMillis());map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));//将消息携带绑定键值:directRouting 发送到交换机 directExchangerabbitTemplate.convertAndSend("topicExchange", "topic.queue2", map);return JSONObject.toJSONString(map);}}
@Component
@RabbitListener(queues = "topic.queue1")
public class TopicReceiver1 {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("topic.queue1 收到消息 : " + message.toString());}
}@Component
@RabbitListener(queues = "topic.queue2")
public class TopicReceiver2 {@RabbitHandlerpublic void process(Map<String, Object> message) {System.out.println("topic.queue2 收到消息 : " + message.toString());}
}
向 topic.queue1
发送消息,打印:
topic.queue1 收到消息 : {data=发送数据体1631024400445, createTime=2021-09-07 22:20:00, messageId=b4dc33cc-42d6-42e7-a828-4a5a3d2e1678}
topic.queue2 收到消息 : {data=发送数据体1631024400445, createTime=2021-09-07 22:20:00, messageId=b4dc33cc-42d6-42e7-a828-4a5a3d2e1678}
向 topic.queue2
发送消息,打印:
topic.queue2 收到消息 : {data=发送数据体1631024437342, createTime=2021-09-07 22:20:37, messageId=c92d5dab-84b7-4530-86f3-0bc31e5c4036}
消息可靠性
使用了 RabbitMQ 以后,我们的业务链路明显变长了,但造成消息丢失的场景也增加了。
主要存在以下三个关键环节:
- 消息生产者 - rabbitmq服务器,发送消息失败
- rabbitmq服务器自身故障导致消息丢失
- 消息消费者 - rabbitmq服务,消息消费失败
针对这三个环节分别有对应的解决方案。
发送确认
发送确认分为两步,第一步是消息到达 exchange 交换机,第二步是从交换机路由到队列。两步同时成功则消息发送成功。
先添加配置:
spring:rabbitmq:# 确认消息已发送到交换机(Exchange)publisher-returns: true# 确认消息已发送到队列(Queue)publisher-confirm-type: correlated
rabbitMQ 有以下两个接口供实现:
- ConfirmCallback:通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
- ReturnsCallback:通过实现 ReturnsCallback 接口,启动消息失败返回,如果正确到达队列不执行。比如路由不到队列时触发回调
PS: RabbitTemplate.ReturnCallback
已经过时了,改用上面那个,加个 s
;
配置文件:
@Slf4j
@Component
public class RabbitTemplateConfig {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 通过实现 ReturnsCallback 接口,启动消息失败返回,如果正确到达队列不执行。rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("消息主体 message : " + returnedMessage.getMessage());System.out.println("消息主体 message : " + returnedMessage.getReplyCode());System.out.println("描述:" + returnedMessage.getReplyText());System.out.println("消息使用的交换器 exchange : " + returnedMessage.getExchange());System.out.println("消息使用的路由键 routing : " + returnedMessage.getRoutingKey());});// 消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中rabbitTemplate.setConfirmCallback((correlationData, arrival, cause) -> {assert correlationData != null;if (arrival) {log.info("消息已发送到交换机,MessageId:{}", correlationData.getId());} else {log.info("消息发送失败,MessageId:{},失败原因:{}", correlationData.getId(), cause);}});}
失败测试只需要写错路由,或者队列就行了。测试发送:
rabbitTemplate.convertAndSend("directExchange", "queue", map, new CorrelationData());
打印日志:
消息主体 message : (Body:'{data=发送数据体1631111311090, createTime=2021-09-08 22:28:31, messageId=206749b5-d3bd-4ebe-8acb-3070d99a40a2}' MessageProperties [headers={spring_returned_message_correlation=8b124ce9-f8a1-4196-bb2a-ca1170842e05}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
消息主体 message : 312
描述:NO_ROUTE
消息使用的交换器 exchange : directExchange
消息使用的路由键 routing : queue2021-09-08 22:28:31.104 INFO 10536 --- [nectionFactory1] c.d.shiva.confirm.RabbitTemplateConfig : 消息已发送到交换机,MessageId:8b124ce9-f8a1-4196-bb2a-ca1170842e05
消息持久化
消息持久化,需要把 queue
,exchange
都持久化。
上面创建交换机和队列时,已经使用了以下参数进行持久化:
durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
消息签收
rabbitMQ有个 ack 签收机制,简单来说就是三种模式:
AcknowledgeMode.NONE:默认推送的所有消息都已经消费成功,会不断地向消费端推送消息。所以推送出去的消息不会暂存在server
端
AcknowledgeMode.AUTO: 由 spring-rabbit
依据消息处理逻辑是否抛出异常自动发送 ack
(无异常)或 nack
(异常)到 server
端。
AcknowledgeMode.MANUAL:模式需要人为地获取到 channel
之后调用方法向 server
发送 ack
(或消费失败时的 nack
)信息
总结就是:无
ack
模式:效率高,存在丢失大量消息的风险。有ack
模式:效率低,不会丢消息。
在配置文件添加:
spring:rabbitmq:listener:simple:# 多消费者轮询模式prefetch: 1 #每个消费者都能收到的未被消费的最大消息数量# manual:手动,auto:根据情况确认,none:自动确认# 设置消费端手动,返回分为:ack(无异常),nack(存在异常),reject(存在异常)acknowledge-mode: manual
在消费结果方面,也有三种结果:
消费结果 | 结果 | 批量操作 |
---|---|---|
ack |
表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除void basicAck(long deliveryTag, boolean multiple)
|
允许 |
nack |
表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。void basicNack(long deliveryTag, boolean multiple, boolean requeue)
|
允许 |
reject |
拒绝消息,与 basicNack 区别在于不能进行批量操作,其他用法很相似。void basicReject(long deliveryTag, boolean requeue)
|
不允许 |
- deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag 都会递增。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
- multiple:为了减少网络流量,手动确认可以被批处理,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
举个栗子: 假设我先发送三条消息
deliveryTag
分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag
为8,multiple
设置为 true,会将5、6、7、8的消息全部进行确认。
下面看代码:
@Slf4j
@Component
@RabbitListener(queues = "directQueue")
public class ConfigDirectConsumer {@RabbitHandlerpublic void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {try {System.out.println("消费者收到消息 : " + message.toString());channel.basicAck(mqMsg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (mqMsg.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,不在返回队列...");channel.basicReject(mqMsg.getMessageProperties().getDeliveryTag(), false);} else {log.error("消息即将再次返回队列处理...");channel.basicNack(mqMsg.getMessageProperties().getDeliveryTag(), false, true);}}}
}
循环消费
如果对一条异常处理的消息,进行重新入队,就会无限循环重复消费,用确认处理然后返回队尾可以稍微缓减:
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(msg));
参考文章
docker 安装rabbitMQ - 风止雨歇 - 博客园 (cnblogs.com)
尚硅谷2021最新版RabbitMQ教程丨快速掌握MQ消息中间件_哔哩哔哩_bilibili
Springboot 整合RabbitMq ,用心看完这一篇就够了_默默不代表沉默-CSDN博客
RabbitMQ的应用场景以及基本原理介绍_杨龙飞的博客-CSDN博客_rabbitmq使用场景
SpringBoot+RabbitMQ 实现"工作队列"_Felix-CSDN博客
springboot + rabbitmq 消息确认机制_不忘初心 砥砺前行-CSDN博客
spring-rabbit消费过程解析及AcknowledgeMode选择_JinchaoLv的博客-CSDN博客_acknowledge-mode
RabbitMQ:消息发送确认 与 消息接收确认(ACK) - 简书 (jianshu.com)
Springboot中整合RabbitMq之Topic模式(单个springboot项目)_我的博客-CSDN博客_rabbitmq topic模式
RabbitMq从入门到精通-ConfirmCallback ReturnCallback 区别及使用_wxb880114的专栏-CSDN博客
rabbitTemplate.setReturnCallback()显示过时_kano_2525的博客-CSDN博客
Springboot RabbitMQ 基础使用、消息发送确认、签收相关推荐
- springboot + rabbitmq 用了消息确认机制,感觉掉坑里了
最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...
- rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK)
默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除 如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则 ...
- RabbitMQ:消息发送确认 与 消息接收确认(ACK)
默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除 如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则 ...
- RabbitMq 消息发送确认(可靠生产和推送确认)
RabbitMq 消息发送确认(可靠生产和推送确认) 此文档只是本人在项目中碰到的一些问题而产生的个人相关总结,实际上的消息确认机制可以做得更多(比如分布式事务等,但此处不做阐述). 一.消息发送确认 ...
- SpringBoot + RabbitMQ (保证消息100%投递成功并被消费)
点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | jianshu.com/p/dca01aad6 ...
- SpringBoot+RabbitMQ ,保证消息100%投递成功并被消费(附源码)
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 来源:rrd.me/f2cxz 一.先扔一张图 说明: 本文涵盖了 ...
- springboot + rabbitmq发送邮件(保证消息100%投递成功并被消费)
前言: RabbitMQ相关知识请参考: https://www.jianshu.com/p/cc3d2017e7b3 Linux安装RabbitMQ请参考: https://www.jianshu. ...
- RabbitMq(十三)消息发送确认与回调机制
在rabbitmq中如何确认消息发送到交换机上以及是否正常交换机转发到消息队列上,在rabbitmq上均有其确认实现接口. confirm-callback :消息确认机制,也就是消息是否正常发送到交 ...
- SCRM:SpringBoot + RabbitMQ + 企微 实现发送消息到企业微信
使用MQ实现企业微信应用程序发送消息到指定人或部门 提前部署好RabbitMQ服务,不会的同学请看我的另一篇部署文章 引入相关依赖 代码实现 提前部署好RabbitMQ服务,不会的同学请看我的另一篇部 ...
最新文章
- 学习 慕课网 PHP工程师学习计划--我的笔记汇总
- 让Qt Creator支持Windows Phone 8开发
- Mobile RDA 同步数据库的类--转
- 【深度学习】——DNN后向传播、CNN后向传播文章汇总
- 程序人生:给程序员的18个建议
- GitHub开源贡献榜:微软超越Facebook和Google排第一
- C++判断是否为素数、求一个数的因数、质因数分解
- 【蓝鸥Unity开发基础三】课时3 Terrain地形系统【未完】
- 【图像配准】基于matlab GUI光流场模型医学图像配准【含Matlab源码 747期】
- Canvas常用API
- ArcGIS Server Flex API接入MapABC地图
- Oracle数据库基础练习(一):Oracle数据库查询操作练习81题
- Docker【部署 02】可视化工具DockerUI和Shipyard安装使用实例
- 为什么 wps输入文字 是红色下划线的 ???????????
- 金立android怎么升级到7.o,金立手机怎么刷机?只需七步轻松刷机
- 编译apache安装
- 数据结构:双栈共享的初始化、入栈、出栈
- 幅值单位是v吗_振幅的单位是什么?急求
- java实现未读消息提醒_JS(jQuery)实现聊天接收到消息语言自动提醒功能详解【提示“您有新的消息请注意查收”】...
- 什么是销售管理软件及其重要性
热门文章
- 学习笔记26-- 在solidwork里面绘制世界环境,制作成gazebo的.world世界文件基本方法
- 用户运营都做些什么?你需要知道这些
- JQ中prop与attr区别
- 54篇不合格!一批硕士论文抽检结果公布
- A fast surrogate-assisted particle swarm optimization algorithm for computationally expensive proble
- 2021年茶艺师(中级)考试题及茶艺师(中级)复审考试
- Zabbix之ZABBIX WEB界面安装
- c语言geohash算法,geohash实现(c语言)
- JavaScript禁用浏览器后退按钮
- r语言进行go富集分析_R语言实现GO分析