前面讲过一些RabbitMQ的安装和用法,也说了说RabbitMQ在一般的业务场景下如何使用。不知道的可以看我前面的博客,http://www.cnblogs.com/zhangweizhong/category/855479.html

不过,最近有朋友问我,RabbitMQ RPC 是干嘛的,有什么用。

其实,RabbitMQ RPC 就是通过消息队列(Message Queue)来实现rpc的功能,就是,客户端向服务端发送定义好的Queue消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。

1.RabbitMQ RPC的特点

  • Message Queue把所有的请求消息存储起来,然后处理,和客户端解耦。
  • Message Queue引入新的结点,系统的可靠性会受Message Queue结点的影响。
  • Message Queue是异步单向的消息。发送消息设计成是不需要等待消息处理的完成。

所以对于有同步返回需求,Message Queue是个不错的方向。

2.普通PRC的特点

  • 同步调用,对于要等待返回结果/处理结果的场景,RPC是可以非常自然直觉的使用方式。当然RPC也可以是异步调用。
  • 由于等待结果,客户端会有线程消耗。

如果以异步RPC的方式使用,客户端线程消耗可以去掉。但不能做到像消息一样暂存消息请求,压力会直接传导到服务端。

3.适用场合说明

  • 希望同步得到结果的场合,RPC合适。
  • 希望使用简单,则RPC;RPC操作基于接口,使用简单,使用方式模拟本地调用。异步的方式编程比较复杂。
  • 不希望客户端受限于服务端的速度等,可以使用Message Queue。

4.RabbitMQ RPC工作流程:

 

基本概念:

Callback queue 回调队列客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。

Correlation id 关联标识客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

流程说明

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 rpc_queue 队列中。
  • 服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用

 

5.完整代码:


  1. 创建两个控制台程序,作为RPC Server和RPC Client, 引用 RabbitMQ.Client,

  2. RPC Server

    class Program{static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){channel.QueueDeclare(queue: "rpc_queue",durable: false,exclusive: false,autoDelete: false,arguments: null);channel.BasicQos(0, 1, false);var consumer = new QueueingBasicConsumer(channel);channel.BasicConsume(queue: "rpc_queue",noAck: false,consumer: consumer);Console.WriteLine(" [x] Awaiting RPC requests");while (true){string response = null;var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();var body = ea.Body;var props = ea.BasicProperties;var replyProps = channel.CreateBasicProperties();replyProps.CorrelationId = props.CorrelationId;try{var message = Encoding.UTF8.GetString(body);int n = int.Parse(message);Console.WriteLine(" [.] fib({0})", message);response = fib(n).ToString();}catch (Exception e){Console.WriteLine(" [.] " + e.Message);response = "";}finally{var responseBytes = Encoding.UTF8.GetBytes(response);channel.BasicPublish(exchange: "",routingKey: props.ReplyTo,basicProperties: replyProps,body: responseBytes);channel.BasicAck(deliveryTag: ea.DeliveryTag,multiple: false);}}}}/// <summary>/// Assumes only valid positive integer input./// Don't expect this one to work for big numbers,/// and it's probably the slowest recursive implementation possible./// </summary>private static int fib(int n){if (n == 0 || n == 1){return n;}Thread.Sleep(1000 * 10);return n;}}

  3. RPC Client

    class Program{static void Main(string[] args){for (int i = 0; i < 10; i++){Stopwatch watch = new Stopwatch();watch.Start();var rpcClient = new RPCClient();Console.WriteLine(string.Format(" [x] Requesting fib({0})", i));var response = rpcClient.Call(i.ToString());Console.WriteLine(" [.] Got '{0}'", response);rpcClient.Close();watch.Stop();Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));}Console.WriteLine(" complete!!!! ");Console.ReadLine();}}class RPCClient{private IConnection connection;private IModel channel;private string replyQueueName;private QueueingBasicConsumer consumer;public RPCClient(){var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };connection = factory.CreateConnection();channel = connection.CreateModel();replyQueueName = channel.QueueDeclare().QueueName;consumer = new QueueingBasicConsumer(channel);channel.BasicConsume(queue: replyQueueName,noAck: true,consumer: consumer);}public string Call(string message){var corrId = Guid.NewGuid().ToString();var props = channel.CreateBasicProperties();props.ReplyTo = replyQueueName;props.CorrelationId = corrId;var messageBytes = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "",routingKey: "rpc_queue",basicProperties: props,body: messageBytes);while (true){var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();if (ea.BasicProperties.CorrelationId == corrId){return Encoding.UTF8.GetString(ea.Body);}}}public void Close(){connection.Close();}}

  4.分别运行Server和Client

6.最后

  1.参照RabbitMQ官方教程的RPC,地址:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

  2.本文源代码下载,http://files.cnblogs.com/files/zhangweizhong/Weiz.RabbitMQ.RPC.rar

  3.博客原地址:http://fpeach.com/post/2016/12/01/RabbitMQ%E5%AD%A6%E4%B9%A0%E7%B3%BB%E5%88%97%EF%BC%88%E4%BA%94%EF%BC%89-RPC-%E8%BF%9C%E7%A8%8B%E8%BF%87%E7%A8%8B%E8%B0%83%E7%94%A8.aspx

作者:章为忠 
出处:http://www.fpeach.com/ 
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。如有问题,可以微信:18618243664 联系我,非常感谢。

扫下面的二维码关注我的微信公众号。 

RabbitMQ学习系列(五): RPC 远程过程调用相关推荐

  1. 130、RPC远程过程调用

    RPC简介 1. 什么是RPC 远程过程调用(英语:Remote Procedure Call,缩写为 RPC,也叫远程程序调用)是一个计算机通信协议.该协议允许运行于一台计算机的程序调用另一台计算机 ...

  2. Windows RPC 远程过程调用

    本文章转载自 http://blog.csdn.net/xxxluozhen/article/details/5605818  作者写的很详细并且通俗易懂 一.什么是远程过程调用 什么是远程过程调用 ...

  3. RPC 远程过程调用

    RPC简介 RPC,英文全称: Remote Procedure Call. 中文名: 远程过程调用,是一个计算机通讯协议.借助RPC可以像调用本地服务一样地调用远程服务.比如两台服务器订单服务器和商 ...

  4. 【GWT系列】实现远程过程调用

    在本节中,使用GWT调用服务器端的方法,返回股票数据.从客户端调用的服务端的代码,也被称作一个服务. 接下来,会介绍: 1. 在服务器上创建一个服务 2. 从客户端调用这个服务 3. 序列化数据对象 ...

  5. zkcli远程连接_高级框架第一天RPC:远程过程调用

    RPC:远程过程调用 主要内容 1.项目结构变化 2.RPC简介 3.RMI实现RPC 4.HttpClient实现RPC 5.Zookeeper安装 6.Zookeeper客户端常用命令 7.向Zo ...

  6. 架构师之路 — 分布式系统 — RPC 远程过程调用

    目录 文章目录 目录 RPC RPC 架构 成熟的开演 RPC 框架 RPC RPC(Remote Procedure Call,远程过程调用)是一种计算机程序通信方式,允许运行于一台计算机中的程序调 ...

  7. (扫盲)RPC远程过程调用

    https://blog.csdn.net/mindfloating/article/details/39473807 https://blog.csdn.net/mindfloating/artic ...

  8. 分布式机构 RPC远程过程调用

  9. RabbitMQ教程远程过程调用RPC

    前言:在前面的教程里我们学习了工作队列,实现了将工作任务发给不同的工人,如果任务是需要在另一台计算机上运行,我们如何实现运行远程计算机上的一个函数任务并等待其返回的结果呢,这种模式通常被称为远程过程调 ...

最新文章

  1. 寻找孪生素数(当p为素数时,p+2也为素数)
  2. fastText的原理剖析
  3. 「比人还会聊天」百度PLATO对话机器人开放体验
  4. 【LeetCode笔记】剑指 Offer 20. 表示数值的字符串(Java、字符串)
  5. 【BZOJ3191】卡牌游戏,概率DP
  6. Python生成器函数案例一则:理财收益计算
  7. @程序员,Python 这次彻底上位了!
  8. Python安装xlrd和xlwt的步骤以及使用报错的解决方法
  9. 多个切点 boot spring_spring基于aspectJ的切点表示
  10. linux添加隧道,linux配置多级服务器登录和隧道映射
  11. 平均销售额计算机公式,销售额是什么意思(销售额的基本计算公式)
  12. VM虚拟机上的网络设置
  13. PAT 1058 选择题 python
  14. Go | 一分钟掌握Go | 9 - 通道
  15. matlab 序列对称,共轭对称序列.PPT
  16. Matlab--绘制高逼格地形图
  17. phpmailer报 You must provide at least one recipient email address.解决办法
  18. 爱因斯坦阶梯问题及寻找完全数问题
  19. Ubuntu零基础教学-GParted磁盘分区工具使用|超级详细,手把手教学
  20. c# 陈景润 15 子问题

热门文章

  1. error 图片,加载错误-》实用笔记
  2. 一切都不能够想当然D
  3. yum源快速配置脚本
  4. fastjson过滤属性或函数
  5. shell脚本的测试与判断的基础实施
  6. 为什么在中国“公有云”落地那么难?
  7. 配置LYNC和Exchange 2010 SP1 OWA集成
  8. .NET Core微服务之路:不断更新中的目录 (v0.42)
  9. Python学习——模块的基本知识
  10. 一个流畅的iOS图表框架PNChart