SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)
目录
- 1、环境搭建
- 2、队列模式
- 3、发布订阅模式
- 4、路由模式
- 5、主题模式
- 6、消息手动应答机制
- 7、回调函数-确认机制(发布确认模式)
1、环境搭建
引入pom:
<!-- rabbitMQ -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置yaml:
server:port: 9090servlet:context-path: /rabbit
spring:application:name: rabbit# rabbitmq配置rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: adminvirtual-host: /# 消息开启手动确认listener:direct:acknowledge-mode: manual
常量类配置:
配置使用过程中可能用到的常量
public class RabbitmqContext {/*** 工作队列模式*/public static String QUEUE_WORK = "queue_work";/*** 订阅发布模式*/public static String QUEUE_FANOUT_ONE = "queue_fanout_one";public static String QUEUE_FANOUT_TWO = "queue_fanout_two";public static String EXCHANGE_FANOUT = "exchange_fanout";/*** 路由模式*/public static String QUEUE_DIRECT_ONE = "queue_direct_one";public static String QUEUE_DIRECT_TWO = "queue_direct_two";public static String EXCHANGE_DIRECT = "exchange_direct";public static String ROUTING_DIRECT_ONE = "routing_direct_one";public static String ROUTING_DIRECT_TWO = "routing_direct_two";public static String ROUTING_DIRECT_THREE = "routing_direct_three";/*** 主题模式*/public static String QUEUE_TOPIC_ONE = "queue_topic_one";public static String QUEUE_TOPIC_TWO = "queue_topic_two";public static String EXCHANGE_TOPIC = "exchange_topic";public static String ROUTING_TOPIC_ONE = "topic.one";public static String ROUTING_TOPIC_TWO = "topic.one.two";/*** 手动确认机制演示*/public static String QUEUE_ACK = "queue_ack";/*** 延时队列模式*/public static String QUEUE_DELAY = "delay_queue";public static String EXCHANGE_DELAY = "delay_exchange";public static String ROUTING_DELAY = "delay";
}
消息监听与配置:
整合SpringBoot后,通过RabbitListener
监听器与Config
配置实现消息的监听(消费);
消息发送:
// 引入RabbitTemplate
@Resource
private RabbitTemplate rabbitTemplate;// 发送消息
rabbitTemplate.convertAndSend(String routingKey, Object object);
rabbitTemplate.convertSendAndReceive(Object object);
发送消息时,通过**rabbitTemplate.convertAndSend()或者rabbitTemplate.convertSendAndReceive()**方法进行发送,区别在于:
- convertAndSend: 消息没有顺序,不管是否消费者是否确认,会一直发送消息;
- convertSendAndReceive: 按照一定的顺序,只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间;
2、队列模式
Controller:
@RestController
public class RabbitmqController { @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendWork")public Object sendWordQueue() {String msg = "工作模式消息,ID:";// 绑定队列发送消息for (int i = 1; i <= 10; i++) {String sendMsg = msg + i;rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_WORK, sendMsg);}return "发送成功...";}
}
Config:
@Configuration
public class WorkConfig {/*** 配置队列名称 - 工作模式** @return*/@Beanpublic Queue queueWork() {return new Queue(RabbitmqContext.QUEUE_WORK);}
}
Listener:
@Component
@Slf4j
public class WorkReceiveListener {/*** @param msg 接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_work")public void receiveQueueWork1(String msg, Channel channel, Message message) {log.info("消费者01-接收到消息:" + msg);}@RabbitListener(queues = "queue_work")public void receiveQueueWork2(String msg, Channel channel, Message message) {log.info("消费者02-接收到消息:" + msg);}
}
结果:
3、发布订阅模式
Controller:
@RestController
public class RabbitmqController { @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendFanout")public String sendFanout() {String msg = "发布|订阅模式消息,ID:";for (int i = 1; i <= 10; i++) {String sendMsg = msg + i;// 绑定交换机发送消息,路由key为 ""rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_FANOUT, "", sendMsg);}return "发送成功...";}
}
Config:
@Configuration
public class FanoutConfig {/*** 配置队列名称 - 发布订阅模式** @return*/@Beanpublic Queue queueFanoutOne() {return new Queue(RabbitmqContext.QUEUE_FANOUT_ONE);}@Beanpublic Queue queueFanoutTwo() {return new Queue(RabbitmqContext.QUEUE_FANOUT_TWO);}/*** 定义FanoutExchange类型的交换机** @return*/@Beanpublic FanoutExchange exchangeFanout() {return new FanoutExchange(RabbitmqContext.EXCHANGE_FANOUT);}/*** 将交换机和队列进行绑定** @return*/@Beanpublic Binding bindingExchangeOne() {return BindingBuilder.bind(queueFanoutOne()).to(exchangeFanout());}@Beanpublic Binding bindingExchangeTwo() {return BindingBuilder.bind(queueFanoutTwo()).to(exchangeFanout());}
}
Listener:
@Component
@Slf4j
public class FanoutReceiveListener {/*** @param msg 接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_fanout_one")public void consumerOne1(String msg, Channel channel, Message message) {System.out.println("queue_fanout_one队列 消费者1:收到消息:" + msg);}@RabbitListener(queues = "queue_fanout_one")public void consumerOne2(String msg, Channel channel, Message message) {System.out.println("queue_fanout_one队列 消费者2:收到消息:" + msg);}/*** -------------一个队列绑定两个消费者 --------------------------------*/@RabbitListener(queues = "queue_fanout_two")public void consumerTwo1(String msg, Channel channel, Message message) {System.out.println("queue_fanout_two队列 消费者1:收到消息:" + msg);}@RabbitListener(queues = "queue_fanout_two")public void consumerTwo2(String msg, Channel channel, Message message) {System.out.println("queue_fanout_two队列 消费者2:收到消息:" + msg);}
}
结果:
4、路由模式
路由模式与发布订阅模式相同,就是定义了路由,在将队列与交换机绑定时 以及 发送消息时设置路由名称
Controller:
@RestController
public class RabbitmqController { @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendDirect")public String sendDirect() {String msg = "路由模式消息,ID:";for (int i = 1; i <= 12; i++) {String sendMsg = null;String routingKey = "";if (i % 2 == 0) {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_TWO;routingKey = RabbitmqContext.ROUTING_DIRECT_TWO;} else if (i % 3 == 0) {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_THREE;routingKey = RabbitmqContext.ROUTING_DIRECT_THREE;} else {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_ONE;routingKey = RabbitmqContext.ROUTING_DIRECT_ONE;}// 绑定交换机,并且设置 路由Key 发送消息rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_DIRECT, routingKey, sendMsg);}return "发送成功...";}
}
Config:
@Configuration
public class DirectConfig {/*** 队列一** @return*/@Beanpublic Queue directQueueOne() {return new Queue(RabbitmqContext.QUEUE_DIRECT_ONE);}/*** 队列二** @return*/@Beanpublic Queue directQueueTwo() {return new Queue(RabbitmqContext.QUEUE_DIRECT_TWO);}/*** 定义交换机 direct类型** @return*/@Beanpublic DirectExchange myDirectExchange() {return new DirectExchange(RabbitmqContext.EXCHANGE_DIRECT);}/*** 队列 绑定到交换机 再指定一个路由键* directQueueOne() 会找到上方定义的队列bean** @return*/@Beanpublic Binding directExchangeOne() {return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_ONE);}@Beanpublic Binding directExchangeTwo() {return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_TWO);}/*** 队列 绑定到交换机 再指定一个路由键** @return*/@Beanpublic Binding directExchangeThree() {return BindingBuilder.bind(directQueueTwo()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_THREE);}
}
Listener:
@Component
@Slf4j
public class DirectReceiveListener {/*** @param msg 接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_direct_one")public void consumerOne1(String msg, Channel channel, Message message) {System.out.println("queue_direct_one队列 消费者1:收到消息:" + msg);}@RabbitListener(queues = "queue_direct_one")public void consumerTwo(String msg, Channel channel, Message message) {System.out.println("queue_direct_one队列 消费者2:收到消息:" + msg);}@RabbitListener(queues = "queue_direct_two")public void consumerOne(String msg, Channel channel, Message message) {System.out.println("queue_direct_two队列 消费者1:收到消息:" + msg);}@RabbitListener(queues = "queue_direct_two")public void consumerDirect(String msg, Channel channel, Message message) {System.out.println("queue_direct_two队列 消费者2:收到消息:" + msg);}
}
结果:
5、主题模式
与路由模式的区别在于:可以通过不同规则匹配多个路由;
Controller:
@RestController
public class RabbitmqController { @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendTopic")public String sendTopic() {String msg = "消息ID:";for (int i = 1; i <= 10; i++) {String sendMsg = null;String routingKey = "";if (i % 2 == 0) {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_TWO;routingKey = RabbitmqContext.ROUTING_TOPIC_TWO;} else {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_ONE;routingKey = RabbitmqContext.ROUTING_TOPIC_ONE;}// 绑定交换机,并且设置 路由Key 发送消息rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_TOPIC, routingKey, sendMsg);}return "发送成功...";}
}
Config:
@Configuration
public class TopicConfig {/*** 队列定义** @return*/@Beanpublic Queue topicQueueOne() {return new Queue(RabbitmqContext.QUEUE_TOPIC_ONE);}/*** 队列定义** @return*/@Beanpublic Queue topicQueueTwo() {return new Queue(RabbitmqContext.QUEUE_TOPIC_TWO);}/*** 定义 TopicExchange 类型交换机** @return*/@Beanpublic TopicExchange exchangeTopic() {return new TopicExchange(RabbitmqContext.EXCHANGE_TOPIC);}/*** 队列一绑定到交换机 且设置路由键为 topic.#** @return*/@Beanpublic Binding bindingTopic1() {return BindingBuilder.bind(topicQueueOne()).to(exchangeTopic()).with("topic.#");}/*** 队列二绑定到交换机 且设置路由键为 topic.*** @return*/@Beanpublic Binding bindingTopic2() {return BindingBuilder.bind(topicQueueTwo()).to(exchangeTopic()).with("topic.*");}
}
Listener:
@Component
@Slf4j
public class TopicReceiveListener {/*** @param msg 接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_topic_one")public void listenOne1(String msg, Channel channel, Message message) {System.out.println("queue_topic_one队列 消费者1,路由匹配:topic.#,收到消息:" + msg);}@RabbitListener(queues = "queue_topic_one")public void listenOne2(String msg, Channel channel, Message message) {System.out.println("queue_topic_one队列 消费者2,路由匹配:topic.#,收到消息:" + msg);}@RabbitListener(queues = "queue_topic_two")public void listenTwo1(String msg, Channel channel, Message message) {System.out.println("queue_topic_two队列 消费者1,路由匹配:topic.*,收到消息:" + msg);}@RabbitListener(queues = "queue_topic_two")public void listenTwo2(String msg, Channel channel, Message message) {System.out.println("queue_topic_two队列 消费者2,路由匹配:topic.*,收到消息:" + msg);}
}
结果:
6、消息手动应答机制
说明:
通过配置自定义的SimpleRabbitListenerContainerFactory
,并且在监听器中通过@RabbitListener
注解通过containerFactory
属性设置的SimpleRabbitListenerContainerFactory
,比如:@RabbitListener(containerFactory = “listenerContainerFactory”)
yml配置:
在原有的yaml
基础上追加,以下配置:
spring:rabbitmq:# 确认消息已发送到队列 returnpublisher-returns: true# 开启消息确认机制 confirm 异步publisher-confirm-type: correlatedlistener:direct:# 消息开启手动确认acknowledge-mode: manual# 拒绝消息是否重回队列default-requeue-rejected: true
配置类:
@Configuration
@Slf4j
public class RabbitConfig {@Bean("listenerContainerFactory")public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 如果需要批量确认消息,则做以下设置// 设置批量// factory.setBatchListener(true);// 设置BatchMessageListener生效// factory.setConsumerBatchEnabled(true);// 设置批量确认数量// factory.setBatchSize(10);//设置消息为手动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}
}
Controller:
@RestController
public class RabbitmqController { @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendAck")public String sendAck() {Object msg = "这是手动确认机制的消息";rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_ACK, msg);return "发送成功...";}
}
Config:
@Configuration
public class AckConfig {/*** 配置队列名称 - 工作模式** @return*/@Beanpublic Queue queueConfirm() {return new Queue(RabbitmqContext.QUEUE_ACK);}
}
Listener:
@Component
@Slf4j
public class AckReceiveListener {/*** @param msg 接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")public void queueAck(String msg, Channel channel, Message message) {try {long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, false);System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);} catch (Exception e) {e.printStackTrace();}}/*** 批量手动确认机制** @param messages* @param channel*/
// @RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")
// public void queueAck(List<Message> messages, Channel channel) {// try {// for (Message message : messages) {// long deliveryTag = message.getMessageProperties().getDeliveryTag();
// String msg = new String(message.getBody(), StandardCharsets.UTF_8);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);
// channel.basicAck(deliveryTag, false);
// }
// } catch (Exception e) {// e.printStackTrace();
// }
// }
}
7、回调函数-确认机制(发布确认模式)
通过配置消息确认机制,可以监听到:消息发送的情况,消息接收的情况;
yaml:
在原有yaml
的基础上,追加以下配置:
spring:rabbitmq:# 开启消息回退 returnpublisher-returns: true# 开启消息确认机制 confirm 异步publisher-confirm-type: correlated
配置类:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: LiHuaZhi* @Date: 2021/10/24 23:06* @Description:**/
@Configuration
@Slf4j
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);/** 触发条件:* 1、消息推送到server,但是在server里找不到交换机* 2、消息推送到server,找到交换机了,但是没找到队列* 3、消息推送到sever,交换机和队列啥都没找到* 4、消息推送成功*/rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("ConfirmCallback-------消息成功推送到MQ");} else {log.error("ConfirmCallback------消息推送到MQ失败,原因:{}", cause);}});/** 触发条件:* 1、消息推送到server,找到交换机了,但是没找到队列,被交换机退回;*/rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}", newString(message.getBody()), exchange, replyText, routingKey);});return rabbitTemplate;}
}
使用:
当消息发送后,会自动回调RabbitConfig
配置类中的rabbitTemplate.setConfirmCallback
方法;
SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)相关推荐
- RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)
说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...
- SpringBoot整合RabbitMQ 实现五种消息模型
目录 SpringBoot中使用RabbitMQ 搭建初始环境 引入依赖 配置配置文件 测试类 注入 rabbitTemplate 消息队列RabbitMQ之五种消息模型 第一种直连模型使用 开发生产 ...
- Springboot整合RabbitMQ,包含direct,topic,fanout三种模式的整合
一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿 ...
- SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压
1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...
- RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ
什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...
- 【分布式】SpirngBoot 整合RabbitMQ、Exchagne模式、确认消费
分布式 内容管理 SpringBoot 整合RabbitMQ 整合过程 多种消息模型 --- Exchange调度策略 Fanout 订阅 .广播模式 ---- 适用于 业务数据需要广播场景: 用户操 ...
- Springboot——整合Rabbitmq之Confirm和Return详解
文章目录 前言 为什么会有Confirm Springboot 整合 Mq 实现 Confirm 监听机制 依赖引入 增加配置文件,设定连接信息 配置队列.交换机,以及对其进行绑定 编写mq消息发送服 ...
- Springboot 整合RabbitMq ,用心看完这一篇就够了
该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct.Topic.Fanout的使用,消息回调.手动确认等. (但是 ...
- SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka
1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...
最新文章
- 行业 AI 落地新范式,华为云下午茶等你来聊知识计算
- android WebView总结
- libSVM 参数选择
- Spring boot登录错误提示
- 多个项目怎么配置到服务器上,多个项目怎么配置到服务器
- 摘自《解析极限编程-拥抱变化》
- python 关键字大全_一日一技:用实例列举python中所有的关键字(01)
- 通用mapper 如何处理多表条件查询通过list封装(一对多)
- 信息学奥赛一本通(1411:区间内的真素数)
- Linux运维之如何查看目录被哪些进程所占用,lsof命令、fuser命令
- [单片机框架][drivers层][ADC] fuelgauge 软件电量计(二)
- mysql查找数据库文件位置
- YUV420图像旋转
- 院校-国外-美国:斯坦福大学( Stanford)
- PREEMPT RT 实现原理
- 零钱兑换(完全背包)
- 小彩蛋:springboot banner 在线生成
- 计算机休眠拖动鼠标不起作用,电脑待机后按鼠标无法唤醒怎么办
- MinGW安装包下载及下载失败解决
- 罗克韦尔 DeviceNet配置软件