目录

Windows安装RabbitMQ

环境工具下载

rabbitMQ是Erlang语言开发的所以先下载Erlang;

RabbitMQ官网地址: https://www.rabbitmq.com/
Erlang下载: https://www.erlang.org/downloads

Erlang环境安装

直接运行: otp_win64_23.0.exe 程序一直next即可,如需改变安装位置自行选择,安装完成后对系统环境变量新增ERLANG_HOME地址为:

C:\Program Files\erl-23.0

双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中。

win+R键,输入cmd,再输入erl,看到erlang版本号就说明erlang安装成功了。

RabbitMQ安装

直接运行: rabbitmq-server-3.8.8 程序一直next即可,如需改变安装位置自行选择.

RabbitMQ Web管理端安裝

进入安装后的RabbitMQ的sbin目录中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)

Cmd命令执行: rabbitmq-plugins enable rabbitmq_managementr

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin>rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@LX-P1DMPLUV:
rabbitmq_management
The following plugins have been configured:rabbitmq_managementrabbitmq_management_agentrabbitmq_web_dispatch
Applying plugin configuration to rabbit@LX-P1DMPLUV...
Plugin configuration unchanged

常用命令:

# 启动RabbitMQ
rabbitmq-service start# 停止RabbitMQ
rabbitmq-service stop# 启用RabbitMQ Web可视化界面插件
rabbitmq-plugins enable rabbitmq_management# 停用RabbitMQ Web可视化界面插件
rabbitmq-plugins disable rabbitmq_management# 查看RabbitMQ状态
rabbitmqctl status

访问管理端页面,默认账号密码为: guest

可视化界面: http://127.0.0.1:15672/#/

RabbitMQ新增超级管理员

进入安装后的RabbitMQ的sbin目录中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)

<# 创建用户root用户 密码为123456
rabbitmqctl add_user root 123456
# 为该用户分配所有权限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
# 设置该用户为管理员角色
rabbitmqctl set_user_tags root administrator

RabbitMQ特点

RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点:

  • 可靠性:支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略:在消息进入MQ前由Exchange(交换机)进行路由消息。
  • 分发消息策略:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群:多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议:RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端:RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制:RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。

RabbitMQ 3种常用交换机

  • Direct Exchange 直连型交换机:根据消息携带的路由键将消息投递给对应队列。
  • Fanout Exchange 扇型交换机:这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
  • Topic Exchange 主题交换机:这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的

RabbitMQ 5种常用模式

  • Simple Work Queue 简单工作队列:该模式是很少用到的一个场景,一般都会通过Exchange进行消息分配到队列从而为以后扩展预留一个入口。
  • Publish/Subscribe 发布订阅模式:该模式性能最好,拿到消息直接放入队列。
  • Routing 路由模式:该模式通过routing key 进行全字匹配,匹配上将相关消息放入相关队列。
  • Topics 主题模式:该模式通过routng key进行模糊匹配,匹配上将相关信息放入相关队列。
  • Header 模式:通过message header头部信息进行比对,可以根据定义全匹配、部分匹配等规则。

RabbitMQ名词解释

  • Producer/Publisher:生产者,投递消息的一方。
  • Consumer:消费者,接收消息的一方。
  • Message消息:实际的数据,如demo中的order订单消息载体。
  • Queue队列:是RabbitMQ的内部对象,用于存储消息,最终将消息传输到消费者。
  • Exchange交换机:在RabbitMQ中,生产者发送消息到交换机,由交换机将消息路由到一个或者多个队列中
  • RoutingKey路由键:生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。
  • Binding绑定:RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列。

MQ适用场景

异步处理场景

如当一个站点新增用户时需要走以下流程:验证账号信息->用户入库->发送注册成功欢迎邮箱给用户;

从该流程中分析用户注册成功后首先期望的是能够成功登录上站点,而对于能否收到注册成功的邮件对于用户而言并不重要,

而邮件发送对于如遇到网络问题可能导致发送邮件缓慢从来导致整个用户注册流程响应很慢;

对于通知邮件发送对于功能而言并不重要的时候,这个时候就可以将该业务放在MQ中异步执行从而可以从一定程度上提升整个流程的性能。

应用解耦

如当一个站点新增用户时需要走以下流程:验证账号信息->用户入库->发送注册成功欢迎邮箱给用户;

通常通过系统划分会划分为:用户模块,消息模块;

以Spring Cloud的为例按照原始做法会在用户入库成功后会通过Feign调用消息模块的发送邮件接口,但是如果消息模块全集群宕机就会导致Feign请求失败从而导致业务不可用;

使用MQ就不会造成上述的问题,因为我们在用户注册完成后想消息模块对应的邮件发送业务队列去发送消息即可,队列会监督消息模块完成,如果完不成队列会一直监督,直到完成为止

流量削峰

秒杀和抢购等场景经常使用 MQ 进行流量削峰。活动开始时流量暴增,用户的请求写入 MQ,超过 MQ 最大长度丢弃请求,业务系统接收 MQ 中的消息进行处理,达到流量削峰、保证系统可用性的目的。

影响:MQ是排队执行的所以对性能有一定的影响,并且请求过多后会导致请求被丢弃问题

消息通讯

点对点或者订阅发布模式,通过消息进行通讯。如微信的消息发送与接收、聊天室等。

SpringBoot中使用RabbitMQ

工程创建&准备

说明该工程按照包区分同时担任生产者与消费者

POM导入依赖:

<dependencies><!-- RabbitMQ依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 导入Web服务方便测试 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 代码简化工具 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

创建SpringBoot启动类:

@SpringBootApplication
public class SimpleRabbitMQCaseApplication {public static void main(String[] args) {SpringApplication.run(SimpleRabbitMQCaseApplication.class,args);}
}

创建applicatin.yaml:

server:port: 8021
spring:application:name: rabbitmq-simple-case#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: 123456virtual-host: / # 虚拟host 可以不设置,使用server默认hostlistener:simple:concurrency: 10 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)max-concurrency: 10 # 消费端的监听最大个数prefetch: 5acknowledge-mode: auto # MANUAL:手动处理 AUTO:自动处理default-requeue-rejected: true # 消费不成功的消息拒绝入队retry:enabled: true # 开启消息重试max-attempts: 5 # 重试次数max-interval: 10000   # 重试最大间隔时间initial-interval: 2000  # 重试初始间隔时间

简单队列生产消费

生产者:

/*** 简单队列消息生产* @author wuwentao*/
@RestController
@RequestMapping("/simple/queue")
@AllArgsConstructor
public class SimpleQueueProducer {private RabbitTemplate rabbitTemplate;// 发送到的队列名称public static final String AMQP_SIMPLE_QUEUE = "amqp.simple.queue";/*** 发送简单消息* @param message 消息内容*/@GetMapping("/sendMessage")public String sendMessage(@RequestParam(value = "message") String message){rabbitTemplate.convertAndSend(AMQP_SIMPLE_QUEUE, message);return "OK";}
}

消费者:

/*** 简单队列消息消费者* @author wuwentao*/
@Component
@Slf4j
public class SimpleQueueConsumer {/*** 监听一个简单的队列,队列不存在时候会创建* @param content 消息*/@RabbitListener(queuesToDeclare = @Queue(name = SimpleQueueProducer.AMQP_SIMPLE_QUEUE))public void consumerSimpleMessage(String content, Message message, Channel channel) throws IOException {// 通过Message对象解析消息String messageStr = new String(message.getBody());log.info("通过参数形式接收的消息:{}" ,content);//log.info("通过Message:{}" ,messageStr); // 可通过Meessage对象解析消息// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认消息消费成功// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 手动确认消息消费失败}
}

测试生成消息访问接口地址:

http://localhost:8021/simple/queue/sendMessage?message=这是一条简单的消息序号1
http://localhost:8021/simple/queue/sendMessage?message=这是一条简单的消息序号2
http://localhost:8021/simple/queue/sendMessage?message=这是一条简单的消息序号3

控制台打印消费信息:

2022-08-22 09:45:26.846  INFO 14400 --- [ntContainer#0-1] c.g.b.s.consumer.SimpleQueueConsumer     : 通过参数形式接收的消息:这是一条简单的消息序号1
2022-08-22 09:45:29.064  INFO 14400 --- [tContainer#0-10] c.g.b.s.consumer.SimpleQueueConsumer     : 通过参数形式接收的消息:这是一条简单的消息序号2
2022-08-22 09:45:31.441  INFO 14400 --- [ntContainer#0-4] c.g.b.s.consumer.SimpleQueueConsumer     : 通过参数形式接收的消息:这是一条简单的消息序号3

注意事项:在YAML中开启的配置acknowledge-mode为auto也是默认的所以消息不需要手动确认默认没有异常则消费成功,如果需要定制ACK方式可以将acknowledge-mode修改为MANUAL则要在消费完成后自行ACK或NACK否则将导致消息重复消费

Fanout Exchange 扇形交换机 广播模式

fanout模式也叫广播模式,每一条消息可以被绑定在同一个交换机上的所有队列的消费者消费

生产者:

@RestController
@RequestMapping("/exchange/fanout")
@AllArgsConstructor
public class ExchangeFanoutProducer {private RabbitTemplate rabbitTemplate;// 扇形交换机定义public static final String EXCHANGE_FANOUT = "exchange.fanout";// 绑定扇形交换机的队列1public static final String EXCHANGE_FANOUT_QUEUE_1 = "exchange.fanout.queue1";// 绑定扇形交换机的队列2public static final String EXCHANGE_FANOUT_QUEUE_2 = "exchange.fanout.queue2";/*** 发送扇形消息消息能够被所有绑定该交换机的队列给消费* @param message 消息内容*/@GetMapping("/sendMessage")public String sendMessage(@RequestParam(value = "message") String message){// routingkey 在fanout模式不使用,会在direct和topic模式使用,所以这里给空rabbitTemplate.convertAndSend(EXCHANGE_FANOUT,"", message);return "OK";}
}

消费者:

这里定义两个消费者同时绑定同一个扇形交换机,这里主要声明交换机Type为ExchangeTypes.FANOUT

/*** 扇形交换机队列消费者* @author wuwentao*/
@Component
@Slf4j
public class ExchangeFanoutConsumer {/*** 创建交换机并且绑定队列(队列1)** @param content 内容* @param channel 通道* @param message 消息* @throws IOException      ioexception* @throws TimeoutException 超时异常*/@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT),value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_1, durable = "true")))@RabbitHandlerpublic void exchangeFanoutQueue1(String content, Channel channel, Message message) {log.info("EXCHANGE_FANOUT_QUEUE_1队列接收到消息:{}",content);}/*** 创建交换机并且绑定队列(队列2)*/@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT),value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_2, durable = "true")))@RabbitHandlerpublic void exchangeFanoutQueue2(String content, Channel channel, Message message) {log.info("EXCHANGE_FANOUT_QUEUE_2队列接收到消息:{}",content);}}

测试生成消息访问接口地址:

http://localhost:8021/exchange/fanout/sendMessage?message=这是一条扇形交换机中的消息序号1
http://localhost:8021/exchange/fanout/sendMessage?message=这是一条扇形交换机中的消息序号2
http://localhost:8021/exchange/fanout/sendMessage?message=这是一条扇形交换机中的消息序号3

控制台打印消费信息:

2022-08-22 10:10:43.285  INFO 12016 --- [ntContainer#1-2] c.g.b.s.consumer.ExchangeFanoutConsumer  : EXCHANGE_FANOUT_QUEUE_2队列接收到消息:这是一条扇形交换机中的消息序号1
2022-08-22 10:10:43.285  INFO 12016 --- [ntContainer#0-7] c.g.b.s.consumer.ExchangeFanoutConsumer  : EXCHANGE_FANOUT_QUEUE_1队列接收到消息:这是一条扇形交换机中的消息序号1
2022-08-22 10:10:49.151  INFO 12016 --- [tContainer#0-10] c.g.b.s.consumer.ExchangeFanoutConsumer  : EXCHANGE_FANOUT_QUEUE_1队列接收到消息:这是一条扇形交换机中的消息序号2
2022-08-22 10:10:49.151  INFO 12016 --- [ntContainer#1-4] c.g.b.s.consumer.ExchangeFanoutConsumer  : EXCHANGE_FANOUT_QUEUE_2队列接收到消息:这是一条扇形交换机中的消息序号2
2022-08-22 10:10:54.254  INFO 12016 --- [ntContainer#0-6] c.g.b.s.consumer.ExchangeFanoutConsumer  : EXCHANGE_FANOUT_QUEUE_1队列接收到消息:这是一条扇形交换机中的消息序号3
2022-08-22 10:10:54.255  INFO 12016 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeFanoutConsumer  : EXCHANGE_FANOUT_QUEUE_2队列接收到消息:这是一条扇形交换机中的消息序号3

Direct Exchange 直连型交换机,

直连交换机与扇形交换机的区别在于,队列都是绑定同一个交换机,但是在队列上会添加routingkey标识,消费者会根据对应的不同routingkey消费对应的消息。

生产者:

@RestController
@RequestMapping("/exchange/direct")
@AllArgsConstructor
public class ExchangeDirectProducer {private RabbitTemplate rabbitTemplate;// 直连交换机定义public static final String EXCHANGE_DIRECT = "exchange.direct";// 直连交换机队列定义1public static final String EXCHANGE_DIRECT_QUEUE_1 = "exchange.direct.queue1";// 直连交换机队列定义2public static final String EXCHANGE_DIRECT_QUEUE_2 = "exchange.direct.queue2";// 直连交换机路由KEY定义1public static final String EXCHANGE_DIRECT_ROUTING_KEY_1 = "exchange.direct.routing.key1";// 直连交换机路由KEY定义2public static final String EXCHANGE_DIRECT_ROUTING_KEY_2 = "exchange.direct.routing.key2";/*** 发送消息到直连交换机并且指定对应routingkey* @param message 消息内容*/@GetMapping("/sendMessage")public String sendMessage(@RequestParam(value = "message") String message,@RequestParam(value = "routingkey") int routingkey){if(routingkey == 1){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_1, message);} else if (routingkey == 2){rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_2, message);}else{return "非法参数!";}return "OK";}
}

消费者:

这里定义多个消费者同时绑定同一个直连交换机,这里主要声明交换机Type为ExchangeTypes.DIRECT,不声明则默认为DIRECT。

/*** 直连交换机队列消费者* @author wuwentao*/
@Component
@Slf4j
public class ExchangeDirectConsumer {/*** 创建交换机并且绑定队列1(绑定routingkey1)** @param content 内容* @param channel 通道* @param message 消息* @throws IOException      ioexception* @throws TimeoutException 超时异常*/@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT),value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_1, durable = "true"),key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_1))@RabbitHandlerpublic void exchangeDirectRoutingKey1(String content, Channel channel, Message message) {log.info("队列1 KEY1接收到消息:{}",content);}/*** 创建交换机并且绑定队列2(绑定routingkey2)*/@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT),value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_2, durable = "true"),key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_2))@RabbitHandlerpublic void exchangeDirectRoutingKey2(String content, Channel channel, Message message) {log.info("队列2 KEY2接收到消息:{}",content);}}

测试生成消息访问接口地址:

http://localhost:8021/exchange/direct/sendMessage?routingkey=1&message=这是一条发给路由key为1的消息
http://localhost:8021/exchange/direct/sendMessage?routingkey=2&message=这是一条发给路由key为2的消息

控制台打印消费信息:

2022-08-22 10:37:22.173  INFO 4380 --- [ntContainer#0-1] c.g.b.s.consumer.ExchangeDirectConsumer  : 队列1 KEY1接收到消息:这是一条发给路由key为1的消息
2022-08-22 10:37:26.882  INFO 4380 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeDirectConsumer  : 队列2 KEY2接收到消息:这是一条发给路由key为2的消息

Topic Exchange 主题交换机

这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的;规则如下:

Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割

Topic交换机与队列绑定时的routingKey可以指定通配符

#:代表0个或多个词
*:代表1个词

生产者:

@RestController
@RequestMapping("/exchange/topic")
@AllArgsConstructor
public class ExchangeTopicProducer {private RabbitTemplate rabbitTemplate;// 主題交换机定义public static final String EXCHANGE_TOPIC = "exchange.topic";// 主題交换机队列定义1public static final String EXCHANGE_TOPIC_QUEUE_1 = "exchange.topic.queue1";// 主題交换机队列定义1public static final String EXCHANGE_TOPIC_QUEUE_2 = "exchange.topic.queue2";// 主題交换机队列路由Key定义1public static final String EXCHANGE_TOPIC_ROUTING1_KEY_1 = "#.routingkey.#";// 主題交换机队列路由Key定义2public static final String EXCHANGE_TOPIC_ROUTING2_KEY_2 = "routingkey.*";// 案例KEY1 可以被EXCHANGE_TOPIC_ROUTING1_KEY_1匹配不能被EXCHANGE_TOPIC_ROUTING2_KEY_2匹配public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1";// 案例KEY2 同时可以被EXCHANGE_TOPIC_ROUTING1_KEY_1和EXCHANGE_TOPIC_ROUTING2_KEY_2匹配public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2";/*** 发送消息到主题交换机并且指定对应可通配routingkey* @param message 消息内容*/@GetMapping("/sendMessage")public String sendMessage(@RequestParam(value = "message") String message,@RequestParam(value = "routingkey") int routingkey){if(routingkey == 1){// 同时匹配 topic.routingkey.case1 和 routingkey.case2rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_1, message);} else if (routingkey == 2){// 只能匹配 routingkey.case2rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_2, message);}else{return "非法参数!";}return "OK";}
}

消费者:

这里定义多个消费者同时绑定同一个直主题交换机,这里主要声明交换机Type为ExchangeTypes.TOPIC,当routingkey发送的消息能够被消费者给匹配仅能够接收到消息。

@Component
@Slf4j
public class ExchangeTopicConsumer {/*** 创建交换机并且绑定队列1(绑定routingkey1)** @param content 内容* @param channel 通道* @param message 消息* @throws IOException      ioexception* @throws TimeoutException 超时异常*/@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC),value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_1, durable = "true"),key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING1_KEY_1))@RabbitHandlerpublic void exchangeTopicRoutingKey1(String content, Channel channel, Message message) {log.info("#号统配符号队列1接收到消息:{}",content);}/*** 创建交换机并且绑定队列2(绑定routingkey2)*/@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC),value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_2, durable = "true"),key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING2_KEY_2))@RabbitHandlerpublic void exchangeTopicRoutingKey2(String content, Channel channel, Message message) {log.info("*号统配符号队列2接收到消息:{}",content);}}

测试生成消息访问接口地址:

http://localhost:8021/exchange/topic/sendMessage?routingkey=1&message=前后多重匹配
http://localhost:8021/exchange/topic/sendMessage?routingkey=2&message=后一个词匹配

控制台打印消费信息:

2022-08-22 15:10:50.444  INFO 1376 --- [ntContainer#4-8] c.g.b.s.consumer.ExchangeTopicConsumer   : #号统配符号队列1接收到消息:前后多重匹配
2022-08-22 15:10:55.118  INFO 1376 --- [ntContainer#5-8] c.g.b.s.consumer.ExchangeTopicConsumer   : *号统配符号队列2接收到消息:后一个词匹配
2022-08-22 15:10:55.119  INFO 1376 --- [ntContainer#4-9] c.g.b.s.consumer.ExchangeTopicConsumer   : #号统配符号队列1接收到消息:后一个词匹配

手动ACK与消息确认机制

新增SpringBoot配置文件YAML,这里主要将自动ACK修改为手工ACK并且开启消息确认模式与消息回退:

spring:rabbitmq:listener:acknowledge-mode: manual # MANUAL:手动处理 AUTO:自动处理# NONE值是禁用发布确认模式,是默认值# CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例# SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;publisher-confirm-type: simple #消息确认机制publisher-returns: true # 消息回退确认机制

定义消息回调确认实现类:

/*** 消费者确认收到消息后,手动ack回执回调处理* @author wuwentao*/
@Slf4j
@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("===================================================");log.info("消息确认机制回调函数参数信息如下:");log.info("ACK状态:{}",ack);log.info("投递失败原因:{}",cause);log.info("===================================================");}
}

消费者:

/*** RabbitMQ Message 回调地址消费者测试* @author wuwentao*/
@Component
@Slf4j
public class MessagesCallbackConsumer {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = MessagesCallbackProducer.MESSAGE_CALLBACK_EXCHANGE, durable = "true", type = ExchangeTypes.DIRECT),value = @Queue(value = MessagesCallbackProducer.MESSAGE_CALLBACK_QUEUE, durable = "true"),key = MessagesCallbackProducer.MESSAGE_CALLBACK_ROUTINGKEY))@RabbitHandlerpublic void consumer(String content, Channel channel, Message message) throws IOException {if("成功".equals(content)){log.info("消息处理成功:{}",content);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认消息消费成功}else{if(message.getMessageProperties().getRedelivered()){log.info("消息已被处理过了请勿重复处理消息:{}",content);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝该消息,消息会被丢弃,不会重回队列}else{log.info("消息处理失败等待重新处理:{}",content);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}
}

生产者:

/*** 消息回调机制测试* @author wuwentao*/
@RestController
@RequestMapping("/message/callback")
@AllArgsConstructor
public class MessagesCallbackProducer {private RabbitTemplate rabbitTemplate;private MessageConfirmCallback messageConfirmCallback;// 发送到的队列名称public static final String MESSAGE_CALLBACK_QUEUE = "amqp.message.callback.queue";public static final String MESSAGE_CALLBACK_EXCHANGE = "amqp.message.callback.exchange";public static final String MESSAGE_CALLBACK_ROUTINGKEY = "amqp.message.callback.routingkey";/*** 测试消息确认机制* @param message 消息内容*/@GetMapping("/sendMessage")public String sendMessage(@RequestParam(value = "message") String message){// 设置失败和确认回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(messageConfirmCallback);//构建回调id为uuidString callBackId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(callBackId);if("失败的消息".equals(message)){// 写一个不存的交换机器 和不存在的路由KEYrabbitTemplate.convertAndSend("sdfdsafadsf","123dsfdasf",message,msg -> {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;},correlationData);}else{rabbitTemplate.convertAndSend(MESSAGE_CALLBACK_EXCHANGE,MESSAGE_CALLBACK_ROUTINGKEY,message,msg -> {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;},correlationData);}return "OK";}
}

测试生成消息访问接口地址:

# 发送找不到交换机的消息
http://localhost:8021/message/callback/sendMessage?message=失败的消息
# 发送手动ACK成功的消息
http://localhost:8021/message/callback/sendMessage?message=成功
# 发送手动ACK失败的消息
http://localhost:8021/message/callback/sendMessage?message=失败

控制台打印消费信息:

2022-08-24 09:11:50.122  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : ===================================================
2022-08-24 09:11:50.122  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : 消息确认机制回调函数参数信息如下:
2022-08-24 09:11:50.123  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : ACK状态:false
2022-08-24 09:11:50.127  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : 投递失败原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'sdfdsafadsf' in vhost '/', class-id=60, method-id=40)
2022-08-24 09:11:50.127  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : ===================================================
2022-08-24 09:12:02.704  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : ===================================================
2022-08-24 09:12:02.705  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : 消息确认机制回调函数参数信息如下:
2022-08-24 09:12:02.705  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : ACK状态:true
2022-08-24 09:12:02.705  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : 投递失败原因:null
2022-08-24 09:12:02.705  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : ===================================================
2022-08-24 09:12:02.735  INFO 11440 --- [ntContainer#6-1] c.g.b.s.c.MessagesCallbackConsumer       : 消息处理成功:成功
2022-08-24 09:12:16.680  INFO 11440 --- [ntContainer#6-4] c.g.b.s.c.MessagesCallbackConsumer       : 消息处理失败等待重新处理:失败
2022-08-24 09:12:16.688  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : ===================================================
2022-08-24 09:12:16.689  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : 消息确认机制回调函数参数信息如下:
2022-08-24 09:12:16.689  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : ACK状态:true
2022-08-24 09:12:16.689  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : 投递失败原因:null
2022-08-24 09:12:16.689  INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback  : ===================================================
2022-08-24 09:12:16.693  INFO 11440 --- [ntContainer#6-7] c.g.b.s.c.MessagesCallbackConsumer       : 消息已被处理过了请勿重复处理消息:失败

SpringBoot RabbitMQ 注解版 基本概念与基本案例相关推荐

  1. SpringBoot + MyBatis(注解版),常用的SQL方法

    一.新建项目及配置 1.1 新建一个SpringBoot项目,并在pom.xml下加入以下代码 <dependency> <groupId>org.mybatis.spring ...

  2. SpringBoot集成Mybatis(0配置注解版)

    Mybatis初期使用比较麻烦,需要各种配置文件.实体类.dao层映射关联.还有一大推其它配置.当然Mybatis也发现了这种弊端,初期开发了generator可以根据表结构自动生成实体类.配置文件和 ...

  3. SpringBoot数据访问Mybatis注解版,配置版,注解与配置一体版

    SpringBoot数据访问Mybatis注解版,配置版,注解与配置一体版 注解版: 1.改druid 连接池,不改可以跳过这步 添加依赖 <dependency><groupId& ...

  4. springboot update数据_SpringBoot整合Mybatis+Druid+数据库(注解版)

    运行展示 正题 Spring boot :2.1.5RELEASE :数据库(Mysql.Oracle):Mybatis:阿里云的连接池 : Druid : 步骤 1.POM依赖 <!-- My ...

  5. Springboot整合Poi导出excel(注解版)

    简介 博客专栏: Springboot整合Poi导出excel(简单版) Springboot整合Poi导出excel(注解版) 上文提到通过poi简单导出Excel后,很多读者反应需要解决导出自适应 ...

  6. 我也没想到 springboot + rabbitmq 做智能家居,会这么简单

    前一段有幸参与到一个智能家居项目的开发,由于之前都没有过这方面的开发经验,所以对智能硬件的开发模式和技术栈都颇为好奇. 智能可燃气体报警器 产品是一款可燃气体报警器,如果家中燃气泄露浓度到达一定阈值, ...

  7. SpringBoot +RabbitMQ 做智能家居,居然如此简单!

    前一段有幸参与到一个智能家居项目的开发,由于之前都没有过这方面的开发经验,所以对智能硬件的开发模式和技术栈都颇为好奇. 智能可燃气体报警器 产品是一款可燃气体报警器,如果家中燃气泄露浓度到达一定阈值, ...

  8. 第四十六章:SpringBoot RabbitMQ完成消息延迟消费

    在2018-3-1日SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0来构建,JDK最低支持也从原来的1.6也改成了1.8,不再兼容1.8以下的版本,更 ...

  9. springboot + rabbitmq 用了消息确认机制,感觉掉坑里了

    最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...

最新文章

  1. 制作ubuntu16.04的docker镜像
  2. Exact跻身全球发展最快的云企业行列
  3. oom 如何避免 高并发_【面试题】如何设计一个高并发系统?
  4. axios学习笔记(一):学习HTTP相关的技术知识点
  5. 心情有些复杂,不知道还能做多久,未来也不知道该如何选择
  6. Ubuntu部署python3.7的开发和运行环境
  7. 11gR2 Grid Infrastructure Installation prerequisites On Windows
  8. java图片色阶调整、亮度调整
  9. windows守护进程工具_Linux进程管理
  10. matlab画图一片空白的问题
  11. gsp计算机管理制度,GSP飞检项目——质量体系文件
  12. BSH验厂介绍BSH博世社会责任审核内容
  13. Leetcode典型题解答和分析、归纳和汇总——T51(N皇后)
  14. 什么是MT4软件?炒汇MT4软件有哪些功能和特点?
  15. potplayer录制视频包含字幕
  16. 我裁完兄弟后,辞职了,转行做了一名小职员
  17. 考计算机基础a的ap考试,用AP考试,敲开计算机名校大门!
  18. 五面拿下阿里飞猪offer,mongodbmysqlredis
  19. 锐捷网络认证客户端RG-SU苹果MAC OSX官方版下载,附使用说明
  20. 手游平台源码搭建有什么好处?

热门文章

  1. 机器视觉算法与应用:2.2 镜头
  2. 09年美国最热门的100个B2C网站,他们是怎么成功的?
  3. 手机通话记录重复显示怎么处理_当手机出现陌生号码,且有通话记录后,我恐慌了...
  4. 【软件推荐系列第 1 篇】Quicker 一个相见恨晚的Windows效率神器
  5. 世上再无武侠梦,无剑,也无女朋友
  6. Springboot 统一异常处理 Assert @ControllerAdvice
  7. JSON Web Token (JWT)笔记(token实现单点登录功能)
  8. 碾转相除法求 两个值 的最大公约数
  9. facade外观模式/门面模式(通过积分商城的例子理解外观模式)-设计模式
  10. 简单易懂:iPhone USB PD快充从入门到精通(转)