springboot-rabbitmq-reply 消息直接回复模式
文章目录
- 一、使用场景
- 二、Reply实战
- (1)依赖与YML配置
- (2)RabbitMq bean配置
- (3)消息生产端
- (1)生产消息
- (2)接受Reply响应
- (4)消息消费端
- (1)方法一:sendTo注解+方法返回值
- (2)方法二:读取生产端的消息使用模板发送
- (3)方法三:方法返回值
- (4)测试
一、使用场景
MQ的作用包括了解耦、异步等。
通常生产者只负责生产消息,而不关心消息谁去获取,或者消费结果如何;消费者只负责接收指定的消息进行业务处理而不关心消息从哪里来一级回复业务处理情况。但我们项目中有特殊的业务存在,我们作为消息生产者在生产消息后需要接收消费者的响应结果(说白了就是类似同步调用 请求响应的MQ使用),经过研究,MQ的Reply模式(直接回复模式)就是为此种业务模式而产生。
二、Reply实战
(1)依赖与YML配置
依赖
我这里只列出最核心的rabbitMq所需依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
无其余特殊配置,因为reply就是rabbitmq的一种交互方式而已
spring:rabbitmq:host: 10.50.40.116port: 5673username: adminpassword: admin
(2)RabbitMq bean配置
package com.leilei.demo;import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author lei* @create 2022-09-19 21:44* @desc mq配置**/
@Configuration
public class RabbitMqConfig {@Beanpublic Queue bizQueue() {return new Queue("bizQueue");}@Beanpublic Queue replyQueue() {return new Queue("replyQueue");}@BeanFanoutExchange bizExchange() {return new FanoutExchange("bizExchange");}@Beanpublic Binding bizBind(Queue bizQueue, FanoutExchange bizExchange) {return BindingBuilder.bind(bizQueue).to(bizExchange);}
}
业务类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Vehicle implements Serializable {private Integer id;private String name;
}
(3)消息生产端
消息生产端需要做的事情:有生产消息、接受消息消费响应
(1)生产消息
1、生产消息,看业务场景选择是否生成全局唯一自定义的消息ID
2、指定消息消费后响应的队列(Reply)
/*** 生产消息** @param* @return void* @author lei* @date 2022-09-19 21:59:18*/public void replySend() {MessageProperties messageProperties = new MessageProperties();messageProperties.setReplyTo("replyQueue");//todo 根据业务,做一个严谨的全局唯一ID,我这里暂时用UUIDString correlationId = UUID.randomUUID().toString();// 我这里指定了唯一消息ID,看业务场景,消费者消费响应后,生产者端可根据消息ID做业务处理messageProperties.setCorrelationId(correlationId);Vehicle vehicle = new Vehicle(1, "川A0001");Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties);rabbitTemplate.convertAndSend("bizExchange","",message);System.out.println("生产者发送消息,自定义消息ID为:" + correlationId);}
(2)接受Reply响应
消费者消费消息后会将处理结果进行发送到一个队列,我们读取这里队列就可以拿到对应消息的响应结果进行业务处理了
/*** 接收消息响应** @param message* @return void* @author lei* @date 2022-09-19 21:59:27*/@RabbitListener(queues = "replyQueue")public void replyResponse(Message message) {String s = new String(message.getBody());String correlationId = message.getMessageProperties().getCorrelationId();System.out.println("收到客户端响应消息ID:" + correlationId);//todo 根据消息ID可判断这是哪一个消息的响应,我们就可做业务操作System.out.println("收到客户端响应消息:" + s);}
(4)消息消费端
消息消费端需要做的事有:接受消息然后进行业务处理、响应消息
(1)方法一:sendTo注解+方法返回值
一般来说,我们mq消费者监听方法不需要返回值,我们这里使用sendTo注解,则需要将要响应的消息定义为返回值,sendTo注解中指定要响应到哪个队列
重点:
1、sendTo注解指定要相应的队列(注意和生产端保持一致)
2、方法定义的返回值内容就是要响应的消息,最终会发送到sendTo注解指定要相应的队列
3、这种方法的缺点是消费端的主关性很高,因为sendTo指定的目标队列可以自己瞎写,导致生产者端无法正确收到消息响应,但我相信一般项目中也不会这么干
/*** 方式1 SendTo指定响应队列** @param message* @return String* @author lei* @date 2022-09-19 16:17:52*/@RabbitListener(queues ="bizQueue")@SendTo("replyQueue")public String handleEmailMessage(Message message) {try {String msg=new String(message.getBody(), StandardCharsets.UTF_8);log.info("---consumer接收到消息----{}",msg);return "客户端响应消息:"+msg+"处理完成!";} catch (Exception e) {log.error("处理业务消息失败",e);}return null;}
(2)方法二:读取生产端的消息使用模板发送
与普通的消费者方法一样,只需要RabbitListener注解监听业务队列;但还需要根据消息获取出ReplyTo地址,然后自己消费者方法内部手动发送消息
1、优点,更强烈的感受到消息请求 响应的交互性,流程看起来更清晰
2、缺点,代码不雅
/*** 方式2 message消息获取内部reply rabbitmq手动发送** @param message* @return String* @author lei* @date 2022-09-19 16:17:52*/@RabbitListener(queues = "bizQueue")public void handleEmailMessage2(Message message) {try {String msg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("---consumer接收到消息----{}", msg);String replyTo = message.getMessageProperties().getReplyTo();System.out.println("接收到的reply:" + replyTo);rabbitTemplate.convertAndSend(replyTo, "客户端响应消息:" + msg + "处理完成!", x -> {x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());return x;});} catch (Exception e) {log.error("处理业务消息失败",e);}}
(3)方法三:方法返回值
这种方式与1其实是一致的,但我经过测试,因为生产者消息指定了ReplyTo的地址,消费者端无需自己再次手动指定,即生产消息到哪里,是否响应以及响应消息发送到哪里全由生产端自己空,消费者只需要处理自身业务以及返回结果
/*** 方式三 方法有返回值,返回要响应的数据 (reply 由生产者发送消息时指定,消费者不做任何处理)** @param message* @return String* @author lei* @date 2022-09-19 23:17:47*/@RabbitListener(queues ="bizQueue")public String handleEmailMessage3(Message message) {try {String msg=new String(message.getBody(), StandardCharsets.UTF_8);log.info("---consumer接收到消息----{}",msg);return "客户端响应消息:"+msg+"处理完成!";}catch (Exception e) {log.error("处理业务消息失败",e);}return null;}
(4)测试
生产消息:
消费消息与响应:
收到的响应:
链路:
如此,MQ版本的请求响应模式就完成了,其实很多大佬使用MQ来实现RPC就是用的ReplyTo啦!
项目源码:springboot-lean
springboot-rabbitmq-reply 消息直接回复模式相关推荐
- 第四十六章:SpringBoot RabbitMQ完成消息延迟消费
在2018-3-1日SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0来构建,JDK最低支持也从原来的1.6也改成了1.8,不再兼容1.8以下的版本,更 ...
- java rabbitmq topic_java rabbitmq 发送消息是topic模式, 消费者 怎么消费多个不同名字的队列?...
这里有几个不同的队列 名字没有什么规则 就是xxx.xxx exchange也是和队列的名字一样的 package com.monitor.receiver.queue; import java.ut ...
- SpringBoot整合redis实现发布订阅模式
Redis的发布订阅模式 发布订阅(Pub/Sub):目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接 ...
- SpringBoot+RabbitMQ ,保证消息100%投递成功并被消费(附源码)
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 来源:rrd.me/f2cxz 一.先扔一张图 说明: 本文涵盖了 ...
- SpringBoot + RabbitMQ (保证消息100%投递成功并被消费)
点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | jianshu.com/p/dca01aad6 ...
- springboot + rabbitmq 用了消息确认机制,感觉掉坑里了
最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...
- springboot + rabbitmq发送邮件(保证消息100%投递成功并被消费)
前言: RabbitMQ相关知识请参考: https://www.jianshu.com/p/cc3d2017e7b3 Linux安装RabbitMQ请参考: https://www.jianshu. ...
- RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列
搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...
- 消息队列RabbitMQ入门与5种模式详解
1.RabbitMQ概述 简介: MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法: RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言 ...
最新文章
- x86架构和arm架构_RISC-V架构1000核CPU登场 x86架构腹背受敌
- erp系统服务器怎么关机,服务器怎么设置自动关机
- 内蒙古高考2021年成绩查询,内蒙古招生考试信息网:2021年内蒙古高考查分入口、查分系统...
- 2021 年 6 月程序员工资统计,反作弊算法工程师太可怕了。。
- 幼师资格证综合素质计算机知识点,幼师资格证复习资料
- springboot 接口文档 请求 enum_Spring Boot集成SpringFox 3:生成Swagger接口文档
- react-native 异常处理 Execution failed for task ':app:mergeDebugResources'.
- 【读书笔记】摘自Google开源项目风格指南-C++风格指南
- NTP漏洞可致Windows系统触发DoS
- 它们把色情版 “微信” 的底裤都给挖出来了,网友: 草率了。。。
- html标签各属性之间用空格隔开对吗,03-HTML标签(二)
- 【洛谷题解】B2033 A*B 问题
- Thinkpad T420 Fn热键在Windows10中的解决方案
- 品牌如何赋能加盟商,攻克时艰
- Java 3种批量插入更新操作的效率横向比较
- Polar码快速入门
- linux终端显示打印记录,Ubuntu使用-记录终端输出的LOG
- aabbcc本质不同的排列数
- 推一款基于原生的小程序(埋点)插件:ani-weapp
- 【noi.ac#1771】ball