目录

  • 1、环境搭建
  • 2、队列模式
  • 3、发布订阅模式
  • 4、路由模式
  • 5、主题模式
  • 6、消息手动应答机制
  • 7、回调函数-确认机制(发布确认模式)

1、环境搭建

引入pom:

<!-- rabbitMQ -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置yaml:

server:port: 9090servlet:context-path: /rabbit
spring:application:name: rabbit# rabbitmq配置rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: adminvirtual-host: /# 消息开启手动确认listener:direct:acknowledge-mode: manual

常量类配置:
配置使用过程中可能用到的常量

public class RabbitmqContext {/*** 工作队列模式*/public static String QUEUE_WORK = "queue_work";/*** 订阅发布模式*/public static String QUEUE_FANOUT_ONE = "queue_fanout_one";public static String QUEUE_FANOUT_TWO = "queue_fanout_two";public static String EXCHANGE_FANOUT = "exchange_fanout";/*** 路由模式*/public static String QUEUE_DIRECT_ONE = "queue_direct_one";public static String QUEUE_DIRECT_TWO = "queue_direct_two";public static String EXCHANGE_DIRECT = "exchange_direct";public static String ROUTING_DIRECT_ONE = "routing_direct_one";public static String ROUTING_DIRECT_TWO = "routing_direct_two";public static String ROUTING_DIRECT_THREE = "routing_direct_three";/*** 主题模式*/public static String QUEUE_TOPIC_ONE = "queue_topic_one";public static String QUEUE_TOPIC_TWO = "queue_topic_two";public static String EXCHANGE_TOPIC = "exchange_topic";public static String ROUTING_TOPIC_ONE = "topic.one";public static String ROUTING_TOPIC_TWO = "topic.one.two";/*** 手动确认机制演示*/public static String QUEUE_ACK = "queue_ack";/*** 延时队列模式*/public static String QUEUE_DELAY = "delay_queue";public static String EXCHANGE_DELAY = "delay_exchange";public static String ROUTING_DELAY = "delay";
}

消息监听与配置:
整合SpringBoot后,通过RabbitListener监听器与Config配置实现消息的监听(消费);

消息发送:

// 引入RabbitTemplate
@Resource
private RabbitTemplate rabbitTemplate;// 发送消息
rabbitTemplate.convertAndSend(String routingKey, Object object);
rabbitTemplate.convertSendAndReceive(Object object);

发送消息时,通过**rabbitTemplate.convertAndSend()或者rabbitTemplate.convertSendAndReceive()**方法进行发送,区别在于:

  • convertAndSend: 消息没有顺序,不管是否消费者是否确认,会一直发送消息;
  • convertSendAndReceive: 按照一定的顺序,只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间;

2、队列模式

Controller:

@RestController
public class RabbitmqController {  @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendWork")public Object sendWordQueue() {String msg = "工作模式消息,ID:";// 绑定队列发送消息for (int i = 1; i <= 10; i++) {String sendMsg = msg + i;rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_WORK, sendMsg);}return "发送成功...";}
}

Config:

@Configuration
public class WorkConfig {/*** 配置队列名称 - 工作模式** @return*/@Beanpublic Queue queueWork() {return new Queue(RabbitmqContext.QUEUE_WORK);}
}

Listener:

@Component
@Slf4j
public class WorkReceiveListener {/*** @param msg     接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_work")public void receiveQueueWork1(String msg, Channel channel, Message message) {log.info("消费者01-接收到消息:" + msg);}@RabbitListener(queues = "queue_work")public void receiveQueueWork2(String msg, Channel channel, Message message) {log.info("消费者02-接收到消息:" + msg);}
}

结果:

3、发布订阅模式

Controller:

@RestController
public class RabbitmqController {  @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendFanout")public String sendFanout() {String msg = "发布|订阅模式消息,ID:";for (int i = 1; i <= 10; i++) {String sendMsg = msg + i;// 绑定交换机发送消息,路由key为 ""rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_FANOUT, "", sendMsg);}return "发送成功...";}
}

Config:

@Configuration
public class FanoutConfig {/*** 配置队列名称 - 发布订阅模式** @return*/@Beanpublic Queue queueFanoutOne() {return new Queue(RabbitmqContext.QUEUE_FANOUT_ONE);}@Beanpublic Queue queueFanoutTwo() {return new Queue(RabbitmqContext.QUEUE_FANOUT_TWO);}/*** 定义FanoutExchange类型的交换机** @return*/@Beanpublic FanoutExchange exchangeFanout() {return new FanoutExchange(RabbitmqContext.EXCHANGE_FANOUT);}/*** 将交换机和队列进行绑定** @return*/@Beanpublic Binding bindingExchangeOne() {return BindingBuilder.bind(queueFanoutOne()).to(exchangeFanout());}@Beanpublic Binding bindingExchangeTwo() {return BindingBuilder.bind(queueFanoutTwo()).to(exchangeFanout());}
}

Listener:

@Component
@Slf4j
public class FanoutReceiveListener {/*** @param msg     接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_fanout_one")public void consumerOne1(String msg, Channel channel, Message message) {System.out.println("queue_fanout_one队列 消费者1:收到消息:" + msg);}@RabbitListener(queues = "queue_fanout_one")public void consumerOne2(String msg, Channel channel, Message message) {System.out.println("queue_fanout_one队列 消费者2:收到消息:" + msg);}/*** -------------一个队列绑定两个消费者 --------------------------------*/@RabbitListener(queues = "queue_fanout_two")public void consumerTwo1(String msg, Channel channel, Message message) {System.out.println("queue_fanout_two队列 消费者1:收到消息:" + msg);}@RabbitListener(queues = "queue_fanout_two")public void consumerTwo2(String msg, Channel channel, Message message) {System.out.println("queue_fanout_two队列 消费者2:收到消息:" + msg);}
}

结果:

4、路由模式

路由模式与发布订阅模式相同,就是定义了路由,在将队列与交换机绑定时 以及 发送消息时设置路由名称
Controller:

@RestController
public class RabbitmqController {  @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendDirect")public String sendDirect() {String msg = "路由模式消息,ID:";for (int i = 1; i <= 12; i++) {String sendMsg = null;String routingKey = "";if (i % 2 == 0) {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_TWO;routingKey = RabbitmqContext.ROUTING_DIRECT_TWO;} else if (i % 3 == 0) {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_THREE;routingKey = RabbitmqContext.ROUTING_DIRECT_THREE;} else {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_ONE;routingKey = RabbitmqContext.ROUTING_DIRECT_ONE;}// 绑定交换机,并且设置 路由Key 发送消息rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_DIRECT, routingKey, sendMsg);}return "发送成功...";}
}

Config:

@Configuration
public class DirectConfig {/*** 队列一** @return*/@Beanpublic Queue directQueueOne() {return new Queue(RabbitmqContext.QUEUE_DIRECT_ONE);}/*** 队列二** @return*/@Beanpublic Queue directQueueTwo() {return new Queue(RabbitmqContext.QUEUE_DIRECT_TWO);}/*** 定义交换机 direct类型** @return*/@Beanpublic DirectExchange myDirectExchange() {return new DirectExchange(RabbitmqContext.EXCHANGE_DIRECT);}/*** 队列 绑定到交换机 再指定一个路由键* directQueueOne() 会找到上方定义的队列bean** @return*/@Beanpublic Binding directExchangeOne() {return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_ONE);}@Beanpublic Binding directExchangeTwo() {return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_TWO);}/*** 队列 绑定到交换机 再指定一个路由键** @return*/@Beanpublic Binding directExchangeThree() {return BindingBuilder.bind(directQueueTwo()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_THREE);}
}

Listener:

@Component
@Slf4j
public class DirectReceiveListener {/*** @param msg     接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_direct_one")public void consumerOne1(String msg, Channel channel, Message message) {System.out.println("queue_direct_one队列 消费者1:收到消息:" + msg);}@RabbitListener(queues = "queue_direct_one")public void consumerTwo(String msg, Channel channel, Message message) {System.out.println("queue_direct_one队列 消费者2:收到消息:" + msg);}@RabbitListener(queues = "queue_direct_two")public void consumerOne(String msg, Channel channel, Message message) {System.out.println("queue_direct_two队列 消费者1:收到消息:" + msg);}@RabbitListener(queues = "queue_direct_two")public void consumerDirect(String msg, Channel channel, Message message) {System.out.println("queue_direct_two队列 消费者2:收到消息:" + msg);}
}

结果:

5、主题模式

与路由模式的区别在于:可以通过不同规则匹配多个路由;

Controller:

@RestController
public class RabbitmqController {  @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendTopic")public String sendTopic() {String msg = "消息ID:";for (int i = 1; i <= 10; i++) {String sendMsg = null;String routingKey = "";if (i % 2 == 0) {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_TWO;routingKey = RabbitmqContext.ROUTING_TOPIC_TWO;} else {sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_ONE;routingKey = RabbitmqContext.ROUTING_TOPIC_ONE;}// 绑定交换机,并且设置 路由Key 发送消息rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_TOPIC, routingKey, sendMsg);}return "发送成功...";}
}

Config:

@Configuration
public class TopicConfig {/*** 队列定义** @return*/@Beanpublic Queue topicQueueOne() {return new Queue(RabbitmqContext.QUEUE_TOPIC_ONE);}/*** 队列定义** @return*/@Beanpublic Queue topicQueueTwo() {return new Queue(RabbitmqContext.QUEUE_TOPIC_TWO);}/*** 定义 TopicExchange 类型交换机** @return*/@Beanpublic TopicExchange exchangeTopic() {return new TopicExchange(RabbitmqContext.EXCHANGE_TOPIC);}/*** 队列一绑定到交换机 且设置路由键为 topic.#** @return*/@Beanpublic Binding bindingTopic1() {return BindingBuilder.bind(topicQueueOne()).to(exchangeTopic()).with("topic.#");}/*** 队列二绑定到交换机 且设置路由键为 topic.*** @return*/@Beanpublic Binding bindingTopic2() {return BindingBuilder.bind(topicQueueTwo()).to(exchangeTopic()).with("topic.*");}
}

Listener:

@Component
@Slf4j
public class TopicReceiveListener {/*** @param msg     接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_topic_one")public void listenOne1(String msg, Channel channel, Message message) {System.out.println("queue_topic_one队列 消费者1,路由匹配:topic.#,收到消息:" + msg);}@RabbitListener(queues = "queue_topic_one")public void listenOne2(String msg, Channel channel, Message message) {System.out.println("queue_topic_one队列 消费者2,路由匹配:topic.#,收到消息:" + msg);}@RabbitListener(queues = "queue_topic_two")public void listenTwo1(String msg, Channel channel, Message message) {System.out.println("queue_topic_two队列 消费者1,路由匹配:topic.*,收到消息:" + msg);}@RabbitListener(queues = "queue_topic_two")public void listenTwo2(String msg, Channel channel, Message message) {System.out.println("queue_topic_two队列 消费者2,路由匹配:topic.*,收到消息:" + msg);}
}

结果:

6、消息手动应答机制

说明:
通过配置自定义的SimpleRabbitListenerContainerFactory,并且在监听器中通过@RabbitListener注解通过containerFactory属性设置的SimpleRabbitListenerContainerFactory,比如:@RabbitListener(containerFactory = “listenerContainerFactory”)

yml配置:
在原有的yaml基础上追加,以下配置:

spring:rabbitmq:# 确认消息已发送到队列 returnpublisher-returns: true# 开启消息确认机制 confirm 异步publisher-confirm-type: correlatedlistener:direct:# 消息开启手动确认acknowledge-mode: manual# 拒绝消息是否重回队列default-requeue-rejected: true

配置类:

@Configuration
@Slf4j
public class RabbitConfig {@Bean("listenerContainerFactory")public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 如果需要批量确认消息,则做以下设置// 设置批量// factory.setBatchListener(true);// 设置BatchMessageListener生效// factory.setConsumerBatchEnabled(true);// 设置批量确认数量// factory.setBatchSize(10);//设置消息为手动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}
}

Controller:

@RestController
public class RabbitmqController {  @Resourceprivate RabbitTemplate rabbitTemplate;@PostMapping("/sendAck")public String sendAck() {Object msg = "这是手动确认机制的消息";rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_ACK, msg);return "发送成功...";}
}

Config:

@Configuration
public class AckConfig {/*** 配置队列名称 - 工作模式** @return*/@Beanpublic Queue queueConfirm() {return new Queue(RabbitmqContext.QUEUE_ACK);}
}

Listener:

@Component
@Slf4j
public class AckReceiveListener {/*** @param msg     接收的文本消息* @param channel 通道信息* @param message 附加的参数信息*/@RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")public void queueAck(String msg, Channel channel, Message message) {try {long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, false);System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);} catch (Exception e) {e.printStackTrace();}}/*** 批量手动确认机制** @param messages* @param channel*/
//    @RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")
//    public void queueAck(List<Message> messages, Channel channel) {//        try {//            for (Message message : messages) {//                long deliveryTag = message.getMessageProperties().getDeliveryTag();
//                String msg = new String(message.getBody(), StandardCharsets.UTF_8);
//                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//                System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);
//                channel.basicAck(deliveryTag, false);
//            }
//        } catch (Exception e) {//            e.printStackTrace();
//        }
//    }
}

7、回调函数-确认机制(发布确认模式)

通过配置消息确认机制,可以监听到:消息发送的情况,消息接收的情况;

yaml:
在原有yaml的基础上,追加以下配置:

spring:rabbitmq:# 开启消息回退 returnpublisher-returns: true# 开启消息确认机制 confirm 异步publisher-confirm-type: correlated

配置类:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: LiHuaZhi* @Date: 2021/10/24 23:06* @Description:**/
@Configuration
@Slf4j
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);/** 触发条件:* 1、消息推送到server,但是在server里找不到交换机* 2、消息推送到server,找到交换机了,但是没找到队列* 3、消息推送到sever,交换机和队列啥都没找到* 4、消息推送成功*/rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("ConfirmCallback-------消息成功推送到MQ");} else {log.error("ConfirmCallback------消息推送到MQ失败,原因:{}", cause);}});/** 触发条件:* 1、消息推送到server,找到交换机了,但是没找到队列,被交换机退回;*/rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}", newString(message.getBody()), exchange, replyText, routingKey);});return rabbitTemplate;}
}

使用:
当消息发送后,会自动回调RabbitConfig配置类中的rabbitTemplate.setConfirmCallback方法;

SpringBoot整合RabbitMQ(六大消息模式、消息手动应答机制)相关推荐

  1. RabbitMQ(八):SpringBoot 整合 RabbitMQ(三种消息确认机制以及消费端限流)

    说明 本文 SpringBoot 与 RabbitMQ 进行整合的时候,包含了三种消息的确认模式,如果查询详细的确认模式设置,请阅读:RabbitMQ的三种消息确认模式 同时消费端也采取了限流的措施, ...

  2. SpringBoot整合RabbitMQ 实现五种消息模型

    目录 SpringBoot中使用RabbitMQ 搭建初始环境 引入依赖 配置配置文件 测试类 注入 rabbitTemplate 消息队列RabbitMQ之五种消息模型 第一种直连模型使用 开发生产 ...

  3. Springboot整合RabbitMQ,包含direct,topic,fanout三种模式的整合

    一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿 ...

  4. SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压

    1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...

  5. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

  6. 【分布式】SpirngBoot 整合RabbitMQ、Exchagne模式、确认消费

    分布式 内容管理 SpringBoot 整合RabbitMQ 整合过程 多种消息模型 --- Exchange调度策略 Fanout 订阅 .广播模式 ---- 适用于 业务数据需要广播场景: 用户操 ...

  7. Springboot——整合Rabbitmq之Confirm和Return详解

    文章目录 前言 为什么会有Confirm Springboot 整合 Mq 实现 Confirm 监听机制 依赖引入 增加配置文件,设定连接信息 配置队列.交换机,以及对其进行绑定 编写mq消息发送服 ...

  8. Springboot 整合RabbitMq ,用心看完这一篇就够了

    该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct.Topic.Fanout的使用,消息回调.手动确认等. (但是 ...

  9. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

最新文章

  1. 行业 AI 落地新范式,华为云下午茶等你来聊知识计算
  2. android WebView总结
  3. libSVM 参数选择
  4. Spring boot登录错误提示
  5. 多个项目怎么配置到服务器上,多个项目怎么配置到服务器
  6. 摘自《解析极限编程-拥抱变化》
  7. python 关键字大全_一日一技:用实例列举python中所有的关键字(01)
  8. 通用mapper 如何处理多表条件查询通过list封装(一对多)
  9. 信息学奥赛一本通(1411:区间内的真素数)
  10. Linux运维之如何查看目录被哪些进程所占用,lsof命令、fuser命令
  11. [单片机框架][drivers层][ADC] fuelgauge 软件电量计(二)
  12. mysql查找数据库文件位置
  13. YUV420图像旋转
  14. 院校-国外-美国:斯坦福大学( Stanford)
  15. PREEMPT RT 实现原理
  16. 零钱兑换(完全背包)
  17. 小彩蛋:springboot banner 在线生成
  18. 计算机休眠拖动鼠标不起作用,电脑待机后按鼠标无法唤醒怎么办
  19. MinGW安装包下载及下载失败解决
  20. 罗克韦尔 DeviceNet配置软件

热门文章

  1. 第43届ACM亚洲区域赛(青岛)总结
  2. Vue 路由跳转至外界页面
  3. 高等工程热力学复习05
  4. JTopo绘制网络拓扑图
  5. fat16和fat32文件系统学习
  6. 使用PC Access实现WinCC v7.4和S7-200 PLC之间的通信
  7. 孙志忠 偏微分方程数值解作业算例的matlab程序
  8. 仿真测试 | HIL测试简单介绍
  9. 第六章 网络编程-SOCKET开发
  10. 【手机电子杂志制作】名编辑电子杂志大师教程 | 手机版模板以及主题设置