前言:前面我们讲解的都是本地服务器,现在如果需要远程计算机上运行一个函数,等待结果。这就是一个不同的故事了,这种模式通常被称为远程过程调用或者RPC。

本章教程我们使用RabbitMQ搭建一个RPC系统,一个客户端和一个可扩展的RPC服务器,现在我们开始吧。

Callback queue

一般做rpc在RabbitMQ是比较容易的,一个客户端发送一个请求信息和一个响应信息的服务器回复,为了得到一个响应,我们需要发送一个回调队列地址请求。如下

Message属性:

AMQP协议一共预定义了14个属性,但是大多数属性很少使用,下面几个可能用的比较多

deliveryMode:有2个值,一个是持久,另一个表示短暂(第二篇说过)

contentType:内容类型:用来描述编码的MIME类型。例如,经常使用JSON编码是将此属性设置为一个很好的做法:application/json。

replyTo:经常使用的是回调队列的名字

correlationid:RPC响应请求的相关应用

Correlation Id

在队列上接收到一个响应,但它并不清楚响应属于哪一个,当我们使用CorrelationId属性的时候,我们就可以将它设置为每个请求的唯一值,稍后当我们在回调队列中接收消息的时候,我们会看到这个属性,如果我们看到一个未知的CorrelationId,我们就可以安全地忽略信息-它不属于我们的请求。为什么我们应该忽略未知的消息在回调队列中,而不是失败的错误?这是由于服务器端的一个竞争条件的可能性。比如还未发送了一个确认信息给请求,但是此时RPC服务器挂了。如果这种情况发生,将再次重启RPC服务器处理请求。这就是为什么在客户端必须处理重复的反应。

需求

我们的rpc工作方式如下:

1:当客户端启动时,它创建一个匿名的独占回调队列。

2:对于rpc请求,客户端发送2个属性,一个是replyTo设置回调队列,另一是correlationId为每个队列设置唯一值

3:请求被发送到一个rpc_queue队列中

4:rpc服务器是等待队列的请求,当收到一个请求的时候,他就把消息返回的结果返回给客户端,使请求结束。

5:客户端等待回调队列上的数据,当消息出现的时候,他检查correlationId,如果它和从请求返回的值匹配,就进行响应。

编码

RPCServer.Java

public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";private static int fib(int n) {if (n == 0) {return 0;}if (n == 1) {return 1;}return fib(n - 1) + fib(n - 1);}public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(RPC_QUEUE_NAME, false, consumer);System.out.println("RPCServer Awating RPC request");while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();BasicProperties props = delivery.getProperties();BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(props.getCorrelationId()).build();String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);System.out.println("RPCServer fib(" + message + ")");String response = "" + fib(n);channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}

服务器代码比较简单

1:建立连接,通道,队列

2:我们可能运行多个服务器进程,为了分散负载服务器压力,我们设置channel.basicQos(1);

3:我们用basicconsume访问队列。然后进入循环,在其中我们等待请求消息并处理消息然后发送响应。

RPCClient.java

public class RPCClient {private Connection connection;private Channel channel;private String requestQueueName = "rpc_queue";private String replyQueueName;private QueueingConsumer consumer;public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");connection = factory.newConnection();channel = connection.createChannel();replyQueueName = channel.queueDeclare().getQueue();consumer = new QueueingConsumer(channel);channel.basicConsume(replyQueueName, true, consumer);}public String call(String message) throws IOException, InterruptedException {String response;String corrID = UUID.randomUUID().toString();AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrID).replyTo(replyQueueName).build();channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();if (delivery.getProperties().getCorrelationId().equals(corrID)) {response = new String(delivery.getBody(), "UTF-8");break;}}return response;}public void close() throws Exception {connection.close();}public static void main(String[] args) throws Exception {RPCClient rpcClient = null;String response;try {rpcClient = new RPCClient();System.out.println("RPCClient  Requesting fib(20)");response = rpcClient.call("20");System.out.println("RPCClient  Got '" + response + "'");} catch (Exception e) {e.printStackTrace();} finally {if (rpcClient != null) {rpcClient.close();}}}
}

客户端代码解读

1:建立一个连接和通道,并声明了一个唯一的“回调”队列的答复

2:我们订阅回调队列,这样就可以得到RPC的响应

3:定义一个call方法用于发送当前的回调请求

4:生成一个唯一的correlationid,然后通过while循环来捕获合适的回应

5:我们请求信息,发送2个属性,replyTo 和correlationId

6:然后就是等待直到有合适的回应到达

7:while循环是做一个非常简单的工作,对于每一个响应消息,它检查是否有correlationid然后进行匹配。然后是就进行响应。

8:最后把响应返回到客户端。

转载于:https://www.cnblogs.com/LipeiNet/p/5980802.html

rabbitMQ第四篇:远程调用相关推荐

  1. RabbitMQ(四)远程连接RabbitMQ

    为了避免污染宿主系统环境,于是在虚拟机中搭建了一个linux环境并且安装了rabbitmq-server.然后在远程连接的时候一直连接失败.官网上面给的例子都是在本地使用系统默认的guest用户连接的 ...

  2. 【Android 逆向】Android 进程注入工具开发 ( 注入代码分析 | 获取注入的 libbridge.so 动态库中的 load 函数地址 并 通过 远程调用 执行该函数 )

    文章目录 一.dlsym 函数简介 二.获取 目标进程 linker 中的 dlsym 函数地址 三.远程调用 目标进程 linker 中的 dlsym 函数 获取 注入的 libbridge.so ...

  3. RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

    在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会 ...

  4. Dubbo(四) 消费者、提供者工程搭建并使用注解实现远程调用

    在上一章节<Dubbo(三) 消费者.提供者工程搭建并实现远程调用>中我们简单介绍了Dubbo的概念以及使用xml方式实现了一个消费者和提供者工程,本章介绍使用注解方式实现消费者调用服务提 ...

  5. Dubbo(十四)源码解析 之 远程调用

    远程调用主要处理三个流程: 消费者向提供者发起请求 提供者处理消费者请求 消费者处理提供者响应 1. NettyClient 的创建 上一章服务订阅,有两个地方没有说完,其中之一:无论是本地注册表方式 ...

  6. python rpc调用_从0到1:全面理解 RPC 远程调用

    上一篇关于 WSGI 的硬核长文,不知道有多少同学,能够从头看到尾的,不管你们有没有看得很过瘾,反正我是写得很爽,总有一种将一样知识吃透了的错觉. 今天我又给自己挖坑了,打算将 rpc 远程调用的知识 ...

  7. Gateway+Nacos+Sleuth+Zipkin网关链路追踪(测试及源码),Gateway+FeignClient+Nacos通过网关远程调用微服务(一)

    Gateway+Nacos+Sleuth+Zipkin网关链路追踪(测试及源码),Gateway+FeignClient+Nacos通过网关远程调用微服务(一) 问题背景 Gateway+Nacos+ ...

  8. zookeeper 密码_「附源码」Dubbo+Zookeeper 的 RPC 远程调用框架

    技术博文,及时送达 作者 | 码农云帆哥 链接 | blog.csdn.net/sinat_27933301 上一篇:从零搭建创业公司后台技术栈 这是一个基于Dubbo+Zookeeper 的 RPC ...

  9. 架构设计:远程调用服务架构设计及zookeeper技术详解(上篇)

    一.序言 Hadoop是一个技术生态圈,zookeeper是hadoop生态圈里一个非常重要的技术,当我研究学习hadoop的相关技术时候,有两块知识曾经让我十分的困惑,一个是hbase,一个就是zo ...

  10. cloud 异步远程调用_异步远程工作的意外好处-以及如何拥抱它们

    cloud 异步远程调用 In this article, I'll discuss the positive aspects of being a little out of sync with y ...

最新文章

  1. 烂泥:vcenter5.5无AD下的安装与配置
  2. 广度优先遍历算法-01寻找制高点问题
  3. 11 个重要的数据库设计规则
  4. 美国能源局投2100万美元加速光伏软成本下降
  5. C++基础::shared_ptr 编程细节(一)
  6. 如何构建批流一体数据融合平台的一致性语义保证?
  7. Android接口回调
  8. 360黑客攻防技术分享会
  9. 贝叶斯自举法(BayesianBootstrap)简介
  10. 二元一次方程有唯一解的条件_线性方程组在什么时候有唯一解/无穷个解/无解?...
  11. 常识性知识,高速快捷知识
  12. oracle清空实例数据,Linux下删除oracle实例
  13. AT指令详解,错误代码详解
  14. 阿里云电脑无影云桌面收费标准(CPU内存/云盘/互联网访问带宽)
  15. JavaDay29 CSS
  16. 一起talk C栗子吧(第一百八十七回:C语言实例--反余弦函数 )
  17. 四轴自适应控制算法的一些尝试开源我的山猫飞控和梯度在线辨识自适应等算法—(转)...
  18. 蜡烛图(K线图)-2反转形态
  19. Android开发--实现Android引导页
  20. 程序员必须要掌握哪些语言

热门文章

  1. 防盗链Nginx设置图片防盗链,设置无效的请仔细看红字
  2. Fedora7安装后的配置
  3. java编程剪刀石头布_Java实现的剪刀石头布游戏示例
  4. maxdea如何计算指数_10分钟计算出指数温度,开始基金定投之旅~
  5. 解决VIM打开U盘文件中文乱码的问题以及VIM有用的配置
  6. Linux内存管理之vmalloc与low_memory
  7. H.264中整数DCT变换,量化,反量化,反DCT究竟是如何实现的?(无代码,无真相)
  8. linux内核奇遇记之md源代码解读之一
  9. 空头平仓什么意思_什么是白糖期货期权仿真交易套利机会?
  10. 第五届CCPC中国大学生程序设计竞赛河南省赛-网络模拟赛