文章目录

  • 1. RPC
  • 2. 实现原理
  • 3. 代码实现
    • 3.1 客户端实现
    • 3.2 服务端实现
    • 3.3 测试
  • 4. 小结

1. RPC

对于微服务开发者,对于 RPCRemote Procedure Call Protocol 远程过程调用协议)并不会陌生吧, RESTful APIDubboWebService等都是RPC的实现调用

RabbitMQ中也提供了 RPC 功能,并且使用起来很简单,下面就来学习一下

2. 实现原理

再来熟悉下原理图

上图把RPC的过程描述的很清楚:

  • Client先发送一条消息,和普通的消息相比,消息多了两个关键内容:一个是 correlation_id,表示这条消息的唯一 id,一个是 reply_to,表示回复队列的名字
  • Server从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to指定的回调队列中
  • Client从回调队列中读取消息,就可知道执行结果

3. 代码实现

3.1 客户端实现

客户端配置文件:application.properties

server.port=8889
spring.rabbitmq.host=192.168.3.157
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 开启消息确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true

spring.rabbitmq.publisher-confirm-type=correlated这项配置作用是:通过 correlated来确认消息。

只有开启了这个配置,将来的消息中才会带 correlation_id,只有通过 correlation_id才能将发送的消息和返回值之间关联起来

客户端配置类:

package com.scorpios.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RPCRabbitMQConfig {// 交换机的名称public static final String SCORPIOS_RPC_EXCHANGE_NAME = "scorpios_rpc_exchange_name";// 发送队列名称public static final String SCORPIOS_RPC_MSG_QUEUE = "scorpios_rpc_msg_queue";// 返回队列名称public static final String SCORPIOS_RPC_REPLY_QUEUE = "scorpios_rpc_reply_queue";@BeanTopicExchange topicExchange(){return new TopicExchange(RPCRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME,true,false);}@BeanQueue queueOne() {return new Queue(RPCRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE,true,false,false);}@BeanQueue queueTwo() {return new Queue(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE,true,false,false);}/*** 请求队列和交换器绑定*/@BeanBinding bindingMsg(){return BindingBuilder.bind(queueOne()).to(topicExchange()).with(RPCRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE);}/*** 返回队列和交换器绑定*/@BeanBinding bindingReply(){return BindingBuilder.bind(queueTwo()).to(topicExchange()).with(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);}/*** 自定义 RabbitTemplate发送和接收消息,因为要设置回调队列地址*/@BeanRabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setReplyAddress(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);rabbitTemplate.setReplyTimeout(5000);return rabbitTemplate;}/*** 给返回队列设置监听器*/@BeanSimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);container.setMessageListener(rabbitTemplate(connectionFactory));return container;}}

上面代码解释说明:

  • 定义一个TopicExchange交换机,一个MsgQueue队列,一个ReplyQueue,并与交换机进行绑定
  • 自定义一个RabbitTemplate用户发送消息,虽然在 SpringBoot中,默认情况下系统自动提供RabbitTemplate,但是这里需要对该RabbitTemplate重新进行定制,因为要给RabbitTemplate添加返回队列,最后还需要给返回队列设置一个监听器

下面来编写消息发送代码:

@Slf4j
@RestController
public class RabbitMQController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send/message")public String send(String message) {// 创建消息对象Message newMessage = MessageBuilder.withBody(message.getBytes()).build();log.info("Client 发送的消息为:{}", newMessage);// 客户端给消息队列发送消息,并返回响应结果Message result = rabbitTemplate.sendAndReceive(RPCRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME, RPCRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE, newMessage);String response = "";if (result != null) {// 获取已发送的消息的 correlationIdString correlationId = newMessage.getMessageProperties().getCorrelationId();log.info("发送消息的correlationId为:{}", correlationId);// 获取响应头信息HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders();// 获取 server 返回的消息 correlationIdString msgId = (String) headers.get("spring_returned_message_correlation");// 将已发送的消息的 correlationId与server返回的消息 correlationId进行对比,相同则取出响应结果if (msgId.equals(correlationId)) {response = new String(result.getBody());log.info("client 收到的响应结果为:{}", response);}}return response;}}

解释说明:

  • 消息发送调用 sendAndReceive方法,该方法自带返回值,返回值就是服务端返回的消息
  • 服务端返回的消息中,头信息中包含了 spring_returned_message_correlation字段,这就是消息发送时的 correlation_id,通过消息发送时的 correlation_id以及返回消息头中的 spring_returned_message_correlation字段值,就可以将返回的消息内容和发送的消息绑定到一起,确认出这个返回的内容就是针对这个发送的消息的

注意:如果没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id,这样就无法将返回的消息内容和发送的消息内容关联起来

3.2 服务端实现

服务端配置文件 application.properties与客户端中的配置文件一致

服务端配置类:

@Configuration
public class RPCServerRabbitMQConfig {// 交换机的名称public static final String SCORPIOS_RPC_EXCHANGE_NAME = "scorpios_rpc_exchange_name";// 发送队列名称public static final String SCORPIOS_RPC_MSG_QUEUE = "scorpios_rpc_msg_queue";// 返回队列名称public static final String SCORPIOS_RPC_REPLY_QUEUE = "scorpios_rpc_reply_queue";@BeanTopicExchange topicExchange(){return new TopicExchange(RPCServerRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME,true,false);}@BeanQueue queueOne() {return new Queue(RPCServerRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE,true,false,false);}@BeanQueue queueTwo() {return new Queue(RPCServerRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE,true,false,false);}@BeanBinding bindingMsg(){return BindingBuilder.bind(queueOne()).to(topicExchange()).with(RPCServerRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE);}@BeanBinding bindingReply(){return BindingBuilder.bind(queueTwo()).to(topicExchange()).with(RPCServerRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);}}

最后我们再来看下消息的消费:

@Slf4j
@Component
public class RpcServerConsumer {@Autowiredprivate RabbitTemplate rabbitTemplate;// 此消费者消费msgQueue队列中的消息@RabbitListener(queues = RPCServerRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE)public void process(Message msg) {log.info("server 收到msgQueue队列中的消息为 : {}",msg.toString());Message response = MessageBuilder.withBody(("我是服务端Server,收到的消息为:"+new String(msg.getBody())).getBytes()).build();// 把收到的原消息的CorrelationId取出CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());// 想replyQueue队列发送确认消息rabbitTemplate.sendAndReceive(RPCServerRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME, RPCServerRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE, response, correlationData);}}

解释说明:

  • 服务端首先收到消息并打印出来
  • 服务端提取出原消息中的 correlation_id
  • 服务端调用 sendAndReceive方法,将消息发送给 replyQueue队列,同时带上 correlation_id参数

3.3 测试

启动ClientServer服务,并在浏览器中输入:http://localhost:8889/send/scorpios

Client服务日志:

Server服务日志:

浏览器响应结果:

4. 小结

再来看一下这个原理图:

  • 定义一个Exchange交换机,两个队列:MsgQueueReplyQueue
  • Client调用 sendAndReceive方法向MsgQueue队列中发送消息,该方法自带返回值,返回值就是服务端返回的消息
  • Server端消费MsgQueue队列消息后,往ReplayQueue中发送消息

代码地址:https://github.com/Hofanking/springboot-rabbitmq-example

springboot-rabbitmq-rpc-client

springboot-rabbitmq-rpc-server

消息中间件RabbitMQ(五)——实现RPC调用相关推荐

  1. RabbitMQ (五)实现类似Dubbo的RPC调用

    springboot对rabbitMQ的接口做了封装,要实现 request/reponse 模式的调用,只需要调用 rabbitTemplate.convertSendAndReceive 方法即可 ...

  2. 基于消息中间件RabbitMQ实现简单的RPC服务

    转载自  基于消息中间件RabbitMQ实现简单的RPC服务 RPC(Remote Procedure Call,远程过程调用),是一种计算机通信协议.对于两台机器而言,就是A服务器上的应用程序调用B ...

  3. rabbitmq 简易RPC调用示例

    rabbitmq 简易RPC调用示例(后附go代码)) rabbimq 库代码获取 用例概述 客户端 服务端 rabbimq 库代码获取 https://github.com/streadway/am ...

  4. RabbitMQ教程远程过程调用RPC

    前言:在前面的教程里我们学习了工作队列,实现了将工作任务发给不同的工人,如果任务是需要在另一台计算机上运行,我们如何实现运行远程计算机上的一个函数任务并等待其返回的结果呢,这种模式通常被称为远程过程调 ...

  5. nodejs基于RabbitMq的RPC调用

    在微服务架构中,SpringCloud,Eureka,Dubbo,ZooKeeper这些框架再熟悉不过了,其中面向接口的远程方法调用是其主要核心功能之一,而MQ主要用来应用解耦,削峰填谷等作用:最近在 ...

  6. rpc 调用webservice怎样传递参数_五分钟让你了解RPC原理详解

    欢迎关注专栏[以架构赢天下]--每天持续分享Java相关知识点 以架构赢天下​zhuanlan.zhihu.com 以架构赢天下--持续分享Java相关知识点 每篇文章首发此专栏 欢迎各路Java程序 ...

  7. 分布式系统消息中间件-RabbitMQ介绍及其应用

    分布式系统消息中间件-RabbitMQ 一.消息中间件 1.1 中间件 1.1.1 什么是中间件? 中间件(Middleware)是处于操作系统和应用程序之间的软件.人们在使用中间件时,往往是一组中间 ...

  8. java分布式 mq_分布式系统消息中间件—RabbitMQ的使用进阶篇

    前言: 这篇文章主要总结一下RabbitMQ在日常项目开发中比较常用的几个特性. 一. mandatory 参数 上一篇文章中我们知道,生产者将消息发送到RabbitMQ的交换器中通过RoutingK ...

  9. rabbitmq接口异常函数方法_分布式系统消息中间件——RabbitMQ的使用进阶篇

    一 mandatory 参数 上一篇文章中我们知道,生产者将消息发送到RabbitMQ的交换器中通过RoutingKey与BindingKey的匹配将之路由到具体的队列中以供消费者消费.那么当我们通过 ...

  10. python rpc调用_从0到1:全面理解 RPC 远程调用

    上一篇关于 WSGI 的硬核长文,不知道有多少同学,能够从头看到尾的,不管你们有没有看得很过瘾,反正我是写得很爽,总有一种将一样知识吃透了的错觉. 今天我又给自己挖坑了,打算将 rpc 远程调用的知识 ...

最新文章

  1. 2019年不可错过的45个AI开源工具,你想要的都在这里
  2. Android之Socket通信、List加载更多、Spinner下拉列表
  3. Android--Genymotion虚拟机(模拟器)的配置
  4. C和C++Everything教程的简介
  5. XSS之xssprotect
  6. 数据库:MYSQL相关设计规范梳理,值得收藏!
  7. php io select,Python IO多路复用之——select方案服务端和客户端代码【python源码详解】...
  8. 滴滴不倒闭,世界和中国的奇迹!
  9. 监控摄像机的区别和分类
  10. SQLi LABS Less 10 时间盲注
  11. Cerberus 银行木马开发团队解散,源代码5万美元起拍
  12. git 第二次提交_win10 将本地项目上传到github (第一次+再次上传)
  13. easyUI 属性总结
  14. FZU 2148 Moon Game --判凹包
  15. wordpress文章发布时区时间延迟8小时解决方法
  16. 0代码实现接口自动化测试-RF框架实践
  17. 华为手机计算机快捷,快速让华为手机变成一台电脑,INNOCN便携显示器的更多玩法...
  18. 书蠹诗魔——张岱《湖心亭看雪》
  19. 《供应链架构师》读书笔记
  20. 从计算机网络系统组成看 计算机网络可分为,从计算机网络系统组成的角度看,计算机网络可以分为 子网和资源子网。...

热门文章

  1. 模糊聚类及matlab实现,模糊聚类分析及matlab程序实现
  2. net3.5离线一键安装工具_一键获取抖音直播源地址(无水印高清下载),无需安装Fiddler抓包工具...
  3. 药物临床试验数据递交FDA的规定
  4. js定义对象时属性名是否加引号问题
  5. [ustc]那些杀手不太冷
  6. 项目开发计划——机房收费系统
  7. c语言编程串级控制,组态王-串级控制
  8. 牛X的规则引擎urule2
  9. wps纸张大小设置成A4_pdf两页合并一页a4,只需这招轻松搞定
  10. 工程项目管理工作流程图大全(打包带走)