RabbitMQ消息队列之RPC调用
我们知道现在市面上的RPC框架很多,但是如何用我们的RabbitMQ去实现一个RPC调用呢?这就是我们这篇文章所要讲解的内容。
如果有阅读过我写的博客的大兄弟们,可能会知道,我有个习惯就是学习技术喜欢去看官方文档,同样对于RabbitMQ如何去实现RPC调用,我们先来看看官方文档怎么说。
如上图,进入RabbitMQ官网,找到get Started,然后里面会有很多基本使用方式(其他几种使用方式在我的另一篇博文里面有详细介绍:https://blog.csdn.net/baomw/article/details/84847769),找到我们的RPC,然后点及java。
来看官网说的这句话,什么意思呢,就是在我们声明一个客户端,然后去通过call方法去调用我们的服务端,然后实时去接收我们服务端处理后返回的结果值,显示在控制台。
这里呢大致说了下使用rpc调用的一些注意的地方,以及rpc调用的方法的一些不足,容易造成系统混乱,等等。旨在提醒我猿们,在开发rpc调用实现的时候需要明确调用及依赖关系。
当然由于我们是需要接受服务端处理结果并返回的,所有客户端这边还需要声明一个接受回调信息的回调队列。
好了下面我们来实现一个具体的rpc调用例子。首先声明我们的服务端,代码如下;
public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";private static int fib(int n) {if (n == 0) return 0;if (n == 1) return 1;return fib(n - 1) + fib(n - 2);}public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);channel.queuePurge(RPC_QUEUE_NAME);channel.basicQos(1);System.out.println(" [x] Awaiting RPC requests");Object monitor = new Object();DeliverCallback deliverCallback = (consumerTag, delivery) -> {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);System.out.println(" [.] fib(" + message + ")");response += fib(n);} catch (RuntimeException e) {System.out.println(" [.] " + e.toString());} finally {channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC server owner threadsynchronized (monitor) {monitor.notify();}}};channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}}
}
里面声明一个方法fib(int i),递归调用自己,有点算法基础的大兄弟们可能知道,这个是算我们的斐波那契数列求和。
public class RPCClient implements AutoCloseable {private Connection connection;private Channel channel;private String requestQueueName = "rpc_queue";public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");connection = factory.newConnection();channel = connection.createChannel();}public static void main(String[] argv) {try (RPCClient fibonacciRpc = new RPCClient()) {for (int i = 0; i < 32; i++) {String i_str = Integer.toString(i);System.out.println(" [x] Requesting fib(" + i_str + ")");String response = fibonacciRpc.call(i_str);System.out.println(" [.] Got '" + response + "'");}} catch (IOException | TimeoutException | InterruptedException e) {e.printStackTrace();}}public String call(String message) throws IOException, InterruptedException {final String corrId = UUID.randomUUID().toString();String replyQueueName = channel.queueDeclare().getQueue();AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {if (delivery.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(delivery.getBody(), "UTF-8"));}}, consumerTag -> {});String result = response.take();channel.basicCancel(ctag);return result;}public void close() throws IOException {connection.close();}}
客户端这边呢循环调用我们的call方法,在call方法中,实时去调用服务端并返回response,
然后String result = response.take();
获取到我们的返回结果。
到这里呢我们也就大致可以看出来,首先客户端发出一个数给服务端,服务端算出该数对应的斐波那契数并返回给我客户端打印到控制台,我们来看下具体实现效果。
可以看到,我们服务端处有31次打印,说明方法执行了31次调用,客户端对应有31个返回值打印出来。
由此,便利用我们的RabbitMQ实现了一个简单的rpc调用了!
RabbitMQ消息队列之RPC调用相关推荐
- RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)
在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会 ...
- 任务队列,消息队列和rpc的区别是什么?
2019独角兽企业重金招聘Python工程师标准>>> 首先,这几个概念本就不是同一层次上的东西,本身风马牛不相及. 先说RPC RPC通常指的是PRC框架(分布式框架),或者PRC ...
- 大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列
QuartZ定时任务+RabbitMQ消息队列 一 .QuartZ定时任务解决订单系统遗留问题 情景分析: 在电商项目中 , 订单生成后 , 数据库商品数量-1 , 但是用户迟迟不进行支付操作 , 这 ...
- 使用EasyNetQ组件操作RabbitMQ消息队列服务
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,是实现消息队列应用的一个中间件,消息队列中间件是分布式系统中重要的组件,主要解决应用耦合, ...
- php中rabbitmq消息乱码,PHP实现RabbitMQ消息队列(转)
本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 先安装PHP对应的RabbitMQ,这里用的是 php_a ...
- RabbitMQ学习系列二:.net 环境下 C#代码使用 RabbitMQ 消息队列
上一篇已经讲了Rabbitmq如何在Windows平台安装,不懂请移步:RabbitMQ学习系列一:windows下安装RabbitMQ服务 一.理论: .net环境下,C#代码调用RabbitMQ消 ...
- rabbitMQ消息队列 – 面板介绍及简单demo
首先rabbit安装好之后,运维会给一个控制面板. 默认账号密码为guest 登入以后可以看到具体界面. 在此鸣谢百度翻译给予的大力支持.. ###写一个简单的demo 编写之前..虽然说可以直接用底 ...
- 消息队列——RabbitMQ消息队列集群
RabbitMQ消息队列集群 消息队列/中间件 RabbitMQ详解 RabbitMQ单机部署 RabbitMQ集群部署 消息队列/中间件 一.前言 在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中 ...
- .NET为什么推荐它作为RabbitMQ消息队列的首选开发工具
[前言] 自2022年末推出此工具以来,相关文章已被圈内顶尖的几家.NET头条号转载,而且短短数月,已有超100个团队/个人开发者使用它来操控RabbitMQ消息队列,反响可谓十分火爆.故本次经典重现 ...
- RabbitMQ消息队列(十三)-VirtualHost与权限管理
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限.那RabbitMQ呢?RabbitMQ也有类似的权限管理.在RabbitMQ中可以虚拟消息服务器VirtualHost,每个Virtua ...
最新文章
- live555 源码分析:子会话 SDP 行生成
- mha如何管理多套mysql集群_Mysql 集群高可用方案 MHA
- android 自定义loading,android_自定义Loading框
- python正则匹配html标签_Python正则获取、过滤或者替换HTML标签的方法
- python tkinter画笑脸_Python3 tkinter基础 Canvas create_polygon 画三角形
- 计算机控制用户自己编写什么软件吗,计算机控制软件技术基础.ppt
- u盘分为windows和linux启动,【电脑软件】Ventoy 官方版,一个U盘,同时拥有启动win+linux+Ubuntu...
- 微软终于屈服和妥协:宣布加入 OpenJDK,贡献构建Java生态
- man exportfs(exportfs命令中文手册)
- 复旦计算机系统基础课件,复旦大学软件工程考研(MSE)计算机系统基础复习资料PPT演示课件...
- FAT32文件系统快速入门
- VBA写一个下拉复选框,以及循环判断,附代码
- LOJ10068 秘密的牛奶运输
- Java加密体系结构(JCA)参考指南
- 部门年终总结会议有必要开吗?
- 前端页面嵌入word文档_word文档怎样加页面
- 一到两年工作经验的看完这些面试轻松拿offer
- 火山PC编辑框组件详解3
- matplotlib 入门之Sample plots in Matplotlib
- 聊聊C10K问题及解决方案