转载自  基于消息中间件RabbitMQ实现简单的RPC服务

RPC(Remote Procedure Call,远程过程调用),是一种计算机通信协议。对于两台机器而言,就是A服务器上的应用程序调用B服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。

1. RPC框架

我们首先通过一张图理解RPC的工作流程:

因此,实现一个最简单的RPC服务,只需要Client、Server和Network,本文就是利用消息中间件RabbitMQ作为Network载体传输信息,实现简单的RPC服务。简单原理可如下图所示:

即:当Client发送RPC请求时,Client端是消息生产者,Server端是消息消费者;当Server返回结果时,Server端是消息生产者,Client是消息消费者;发送和返回使用不同的队列。

接下来我们通过代码,详细展示一个计算斐波那契数列的RPC服务。

2. RPCServer实现

2.1 Server初始化

/*** 队列名、交换机名、路由键*/
private static final String EXCHANGE_NAME = "rpc_exchange";
private static final String QUEUE_NAME = "request_rpc_queue";
private static final String ROUTING_KEY = "rpc_routing_key";private Connection connection = null;
private Channel channel = null;
private QueueingConsumer consumer = null;/*** Server的构造函数*/
private RPCServer() {try {//创建链接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Config.HOST);factory.setPort(Config.PORT);factory.setUsername(Config.USER);factory.setPassword(Config.PASSWORD);connection = factory.newConnection();//创建信道channel = connection.createChannel();//设置AMQP的通信结构channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);//设置消费者consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);} catch (Exception e) {LOG.error("build connection failed!", e);}
}

初始化就是声明RabbitMQ的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了AMQP的通信结构。

2.2 监听队列并反馈

/*** 开启server*/
private void startServer() {try {LOG.info("Waiting for RPC calls.....");while (true) {//获得文本消息QueueingConsumer.Delivery delivery = consumer.nextDelivery();BasicProperties props = delivery.getProperties();//返回消息的属性BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();long receiveTime = System.currentTimeMillis();JSONObject json = new JSONObject();try {String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);LOG.info("Got a request: fib(" + message + ")");json.put("status", "success");json.put("result", fib(n));} catch (Exception e) {json.put("status", "fail");json.put("reason", "Not a Number!");LOG.error("receive message failed!", e);} finally {long responseTime = System.currentTimeMillis();json.put("calculateTime", (responseTime - receiveTime));channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}} catch  (Exception e) {LOG.error("server failed!", e);} finally {if (connection != null) {try {connection.close();} catch (Exception e) {LOG.error("close failed!", e);}}}
}

在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的nextDelivery方法来获得RabbitMQ队列的最新一条消息。同时通过getProperties获取到消息中的反馈信息属性,用于标记客户端Client的属性。然后计算斐波那契数列的结果。最后通过basicAck使用消息信封向RabbitMQ确认了该消息。

到这里就实现了计算斐波那契数列RPC服务的Server端。

3. RPCClient实现

3.1 初始化CLient

/*** 消息请求的队列名、交换机名、路由键*/
private static final String EXCHANGE_NAME = "rpc_exchange";
private static final String QUEUE_NAME = "request_rpc_queue";
private static final String ROUTING_KEY = "rpc_routing_key";/*** 消息返回的队列名、交换机名、路由键*/
private static final String RESPONSE_QUEUE = "response_rpc_queue";
private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";/*** RabbitMQ的实体*/
private Connection connection = null;
private Channel channel = null;
private QueueingConsumer consumer = null;/*** 构造客户端* @throws Exception*/
private RPCClient() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Config.HOST);factory.setPort(Config.PORT);factory.setUsername(Config.USER);factory.setPassword(Config.PASSWORD);connection = factory.newConnection();channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);consumer = new QueueingConsumer(channel);channel.basicConsume(RESPONSE_QUEUE, true, consumer);
}

这里声明AMQP结构体的方式和Server端类似,只不过Client端需要多声明一个队列,用于RPC的response。

3.2 发送/接收消息

/*** 请求server* @param message* @return* @throws Exception*/
private String requestMessage(String message) throws Exception {String response = null;String corrId = UUID.randomUUID().toString();BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();if (delivery.getProperties().getCorrelationId().equals(corrId)) {response = new String(delivery.getBody(),"UTF-8");break;}}return response;
}

BasicProperties用于存储你请求消息的属性,这里我设置了correlationId和replyTo属性,用于Server端的返回识别。

4. 运行测试

Client端发送:

Server端接收并处理:

Client收到计算结果:

由于我运行RabbitMQ的服务器是租用的阿里云的,差不多传输时延在60ms左右,如果把RPC服务和消息中间件同机房部署的话延时基本上就在ms级别。

5. FAQ

5.1 说明

需要体验完整的过程,你需要如下环境:

JDK1.6以上 + Maven + RabbitMQ

5.2 源代码

完整代码代码请戳:github

其中Server的代码在:

rpc.RPCServer

Client端的代码位置:

rpc.RPCClient

基于消息中间件RabbitMQ实现简单的RPC服务相关推荐

  1. 利用java实现简单的RPC服务调用

    一.前言 RPC(Remote Procedure Call Protocol)--远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输 ...

  2. go 简单的RPC服务与客户端通讯

    2019独角兽企业重金招聘Python工程师标准>>> // 服务器代码 package main// rpc 服务 import ("net/rpc"" ...

  3. jax java_JAX-WS 学习一:基于java的最简单的WebService服务

    JAVA 1.6 之后,自带的JAX-WS API,这使得我们可以很方便的开发一个基于Java的WebService服务. 基于JAVA的WebService 服务 1.创建服务端WebService ...

  4. 构建简单的微服务架构

    前言 本篇仅作引导,内容较多,如果阅读不方便,可以使用电脑打开我们的文档官网进行阅读.如下图所示: 文档官网地址:docs.xin-lai.com 目录 总体介绍   微服务架构的好处    微服务架 ...

  5. 利用Vert.x构建简单的API 服务、分布式服务

    目前已经使用Vertx已经一年多了,虽然没有太多的造诣,但也已在项目中推广了下:从最初的vertx搭建web服务,到项目上线运营,还算比较稳定.再到后来尝试搭建基于vertx的分布式服务,一路下来也积 ...

  6. RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统

    博客翻译自:RabbitMQ Tutorials Java版 RabbitMQ(一):Hello World程序 RabbitMQ(二):Work Queues.循环分发.消息确认.持久化.公平分发 ...

  7. NET Core微服务之路:自己动手实现Rpc服务框架,基于DotEasy.Rpc服务框架的介绍和集成...

    原文:NET Core微服务之路:自己动手实现Rpc服务框架,基于DotEasy.Rpc服务框架的介绍和集成 本篇内容属于非实用性(拿来即用)介绍,如对框架设计没兴趣的朋友,请略过. 快一个月没有写博 ...

  8. voyage java_GitHub - yezilong9/voyage: 采用Java实现的基于netty轻量的高性能分布式RPC服务框架...

    Voyage Overview 采用Java实现的基于netty轻量的高性能分布式RPC服务框架.实现了RPC的基本功能,开发者也可以自定义扩展,简单,易用,高效. Features 服务端支持注解配 ...

  9. 基于Asp.Net Core打造轻量级内部服务治理RPC(二 远程服务设计)

    紧接上一篇<基于Asp.Net Core打造轻量级内部服务治理RPC(一)>文章.本文主要讲解基于Asp.Net Core的远程服务设计和实现. 在上一篇中讲过,服务提供者提供的服务实际上 ...

最新文章

  1. goland 调试运行路径
  2. Tensorflow 错误总结:ImportError: cannot import name add_newdocs.
  3. [OS复习]进程管理3
  4. 前端学习(2077):开始回顾
  5. 2017.10.9 DZY Loves Math V 失败总结
  6. 华为Mate 30系列发布日期、地点再曝光:9月19日 慕尼黑见?
  7. python安装的模块在pycharm中能用吗_pycharm安装python模块
  8. 《OSPF网络设计解决方案(第2版)》一第2章 介绍OSPF
  9. C++ STL学习笔记(5) Vector容器, array容器,deque容器
  10. 训练作用_感觉统合是什么意思,感觉统合训练有什么作用
  11. $.getJSON()不执行回调函数
  12. @Builder 实际参数列表和形式参数列表长度不同
  13. FoxMail7.2信纸设置(适用于7.0及以上版本)
  14. wx.uploadFile上传图片 在正式环境无响应问题
  15. 三阶魔方大中小魔公式_三阶魔方花样玩法,公式汇总
  16. Latex论文用bibtex实现期刊/会议缩写
  17. html5画椭圆的完整代码,HTML5 Canvas中绘制椭圆的4种方法
  18. Restful 风格请求
  19. 为什么安装step7时要重启计算机,step7安装提示重启怎么解决
  20. cocos creator 牌面翻转

热门文章

  1. [MyBatisPlus]常用注解_@TableName_@TableId_@TableField_@TableLogic通过全局配置配置主键生成策略
  2. [蓝桥杯][算法提高VIP]摆花-多重背包计数问题
  3. 判别学习与生成学习的区别
  4. char *c = abc和char c[]=abc
  5. Codeforces Round #599 (Div. 2) E. Sum Balance 图转换 + 子集dp + 环
  6. Codeforces Round #709 (Div. 1) B. Playlist 链表维护 + bfs
  7. CF908G. New Year and Original Order
  8. Sumdiv POJ - 1845
  9. P3332 [ZJOI2013]K大数查询(整体二分做法)
  10. hdu 5094 Maze