一、RPC处理流程如下

  1. 当客户端启动时,创建一个匿名的回调队列(名称由RabbitMQ自动创建,如下图中的amqp.gen-Xa2…)。
  2. 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
  3. 请求被发送到rpc_queue队列中。
  4. RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
  5. 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。

二、服务端代码

这里基本跟《RabbitMQ实战指南一样》

public class RpcServer {private static final String RPC_QUEUE_NAME = "rpc_queue";public static void main(String args[]) throws Exception {Connection connection = BaseRabbitmq.initConnection();Channel channel = connection.createChannel();channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();String repsonse = "";try {String message = new String(body, "UTF-8");int n = Integer.parseInt(message);System.out.println(" [.] fib(" + message + ")");repsonse += fib(n);}catch (Exception e) {e.printStackTrace();} finally {channel.basicPublish("", properties.getReplyTo(), replyProps, repsonse.getBytes("UTF-8"));channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(RPC_QUEUE_NAME, false, consumer);}private static int fib(int n) throws Exception {if (n == 0) return 0;if (n == 1) return 1;return fib(n - 1) + fib(n - 2);}
}

三、客户端代码

这里由于QueuingConsumer已经在amqp-client4.x版本中被删除,所以稍作修改

public class RpcClient {private final Connection connection;private final Channel channel;private final String replyQueueName;private final DefaultConsumer consumer;String corrId = UUID.randomUUID().toString();public RpcClient() throws IOException, TimeoutException {connection = BaseRabbitmq.initConnection();channel = connection.createChannel();replyQueueName = channel.queueDeclare().getQueue();consumer = new DefaultConsumer(channel) {public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {try {if (properties.getCorrelationId().equals(corrId)){String response = new String(body);System.out.println("[.] Got" + response);}}catch (Exception e) {e.printStackTrace();}}};}public void call(String message) throws IOException,ShutdownSignalException, ConsumerCancelledException {AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();String requestQueueName = "rpc_queue";channel.basicPublish("", requestQueueName, props, message.getBytes());while (true) {channel.basicConsume(replyQueueName, true, consumer);}}public void close() throws Exception{connection.close();}public static void main(String args[]) throws Exception{RpcClient fibRpc = new RpcClient();System.out.println(" [x] Requesting fib(30)");fibRpc.call("30");fibRpc.close();}
}

四、RabbitMQ属性配置

public class BaseRabbitmq {public static Connection initConnection() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("root");connectionFactory.setPassword("123456");return connectionFactory.newConnection();}
}

五、结果

1、最终client端输出

RabbitMQ实现RPC相关推荐

  1. RabbitMQ中RPC的实现及其通信机制

    RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlati ...

  2. rabbitmq 简易RPC调用示例

    rabbitmq 简易RPC调用示例(后附go代码)) rabbimq 库代码获取 用例概述 客户端 服务端 rabbimq 库代码获取 https://github.com/streadway/am ...

  3. 精通RabbitMQ之RPC同步调用

    精通RabbitMQ之RPC同步调用 前面我们对应用解耦做过分析,我们能够使用消息中间件来完成应用解耦,很大一部分原因是因为我们的系统之间可以异步处理并且不关心结果回执.假如我们现在需要异步处理的结果 ...

  4. RabbitMQ之RPC实现

    2019独角兽企业重金招聘Python工程师标准>>> 什么是RPC? RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/ ...

  5. RabbitMQ除开RPC的五种消模型----原生API

    2.五种消息模型 RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习.那么也就剩下5种. 但是其实3.4.5这三种都属于订阅模型,只不过进行路由的方式不同. 通过一个 ...

  6. nodejs基于RabbitMq的RPC调用

    在微服务架构中,SpringCloud,Eureka,Dubbo,ZooKeeper这些框架再熟悉不过了,其中面向接口的远程方法调用是其主要核心功能之一,而MQ主要用来应用解耦,削峰填谷等作用:最近在 ...

  7. RabbitMQ (五)实现类似Dubbo的RPC调用

    springboot对rabbitMQ的接口做了封装,要实现 request/reponse 模式的调用,只需要调用 rabbitTemplate.convertSendAndReceive 方法即可 ...

  8. RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

    在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会 ...

  9. RabbitMQ+PHP 教程六(RPC)

    (using php-amqplib) 前提必读 本教程假设RabbitMQ是安装在标准端口上运行(5672).如果您使用不同的主机.端口或凭据,则连接设置需要调整. 如果您在本教程中遇到困难,可以通 ...

最新文章

  1. MATLAB线型和颜色对应说明
  2. kvm直通sata_基于KVM的SRIOV直通配置及性能测试
  3. 在Spring 框架中如何更有效的使用JDBC?
  4. Java单元测试技巧之PowerMock
  5. sql concat函数_使用SQL Plus(+)和SQL CONCAT函数SQL Server CONCATENATE操作
  6. python的dropna 和notna的性能_python数据分析学习(7)数据清洗与准备(1)
  7. python数据类型:序列(字符串,元组,列表,字典)
  8. vsftpd不支持目录软链接的解决办法
  9. 兼容IE8以下,获取className节点的元素(document.getElementsByClassName()兼容写法)。
  10. Python学习(列表)
  11. 【菜鸟小屁的成长日记】之ElasticSearch中的TimeStamp时间戳篇
  12. Bundle-Adjustment并行求解器
  13. 停车场车牌识别摄像机,传统提成行业颠覆者
  14. Android后台切回到应用显示广告页
  15. C#语言实例源码系列-实现Word转换RTF
  16. ubuntu/linux命令记录 长期更新
  17. HTML画布与SVG(Canvas vs. SVG)
  18. 【数据库技术】2PL(两阶段锁)下的死锁与饥饿处理手段
  19. NVT SDK 67X获取文件时长的一种方式
  20. 中国驻越南大使馆当地有关部门联系电话

热门文章

  1. php mongo in 查询语句,PHP 怎么执行mongodb 的 $in 和$size查询
  2. 接口文档要写在概要设计里吗_写代码的五个步骤,你会几个?
  3. tensorflow saver_机器学习入门(6):Tensorflow项目Mnist手写数字识别-分析详解
  4. 学习opencv之cvtColor
  5. 百度云域名解析如何添加? - [未完待续]
  6. phpcmsV9 自定义分页函数与调用 - 不影响后台SQL分页
  7. bootstrap悬停下拉导航的实现
  8. python图像增强_【Tool】Augmentor和imgaug——python图像数据增强库
  9. PHP在线无人值守源码交易网站源码,集成支付宝微信接口
  10. java 一一对应的替换_SpringMVC的Controller是如何将参数和前端传来的数据一一对应的...