在微服务架构中,SpringCloud,Eureka,Dubbo,ZooKeeper这些框架再熟悉不过了,其中面向接口的远程方法调用是其主要核心功能之一,而MQ主要用来应用解耦,削峰填谷等作用;最近在RabbitMq官网上看到,竟然还支持RPC调用,处于好奇,动手用js跑了一遍。

共三个文件:RpcUtil.js, server.js,client.js

client.js

const { RpcUtil } = require('./rpcUtil');const rpcUtil = new RpcUtil(userName, password, host, port);
rpcUtil.init().then(() => {setInterval(() => {for (let i = 0; i < 100; i++) {rpcUtil.sendMsg(rpcUtil.generateUuid(), {method,params,}, result => {console.log(i + '<<>>' + JSON.stringify(result))});}}, 3000);
}).catch(e => console.log(e));

server.js

class DealerMsgRpcUtil extends RpcUtil {// @Overrideasync dealerMsg(msgContent) {msgContent = JSON.parse(msgContent);const { method, params } = msgContent;let result;try {// 这里调用各种方法if (method === 'listMember') {result = await foo(params);} else {result = { code: -1, msg: '找不到对应方法' };}} catch (e) {result = { code: -1, msg: e.message };}return result;}
}const rpcUtil = new DealerMsgRpcUtil(rabbitMQAccount, rabbitMQPassword, proxy_host, port);
rpcUtil.init().then(() => {rpcUtil.listenMsg();
}).catch(e => LOG(e));

RpcUtil.js

const amqp = require('amqplib/callback_api');class RpcUtil {constructor(rabbitMQAccount, rabbitMQPassword, rabbitMQHost, rabbitMQPort) {this.connection;this.channel;this.rpcQueueServer = 'rpc_queue_server';this.rpcQueueClient = 'rpc_queue_client';this.correlationIdPool = new Map();this.amqpUrl = 'amqp://' + rabbitMQAccount + ':' + rabbitMQPassword + '@' + rabbitMQHost + ':' + rabbitMQPort;}// 创建通道async _createChannel() {this.channel = await new Promise(resolve => {this.connection.createChannel((err, channel) => {resolve(channel);});});}// 发送端生成correlationIdgenerateUuid() {return Math.random().toString() +Math.random().toString() +Math.random().toString();}// 初始化async init() {await new Promise((resolve, reject) => {amqp.connect(this.amqpUrl, (error, conn) => {if (error) {reject(error);} else {this.connection = conn;resolve();}});});await this._createChannel();}// 被调用端,子类需要重写async dealerMsg(msgContent) {msgContent = JSON.parse(msgContent);return { code: 200, msg: '', data: msgContent };}// 被调用端监听消息async listenMsg() {this.channel.assertQueue(this.rpcQueueServer, { durable: false });this.channel.prefetch(1);this.channel.consume(this.rpcQueueServer, async msg => {const msgContent = msg.content.toString();let dealerResult;try {dealerResult = await this.dealerMsg(msgContent);} catch (e) {dealerResult = { code: -1, msg: e.message };}this.channel.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(dealerResult)), {correlationId: msg.properties.correlationId,});}, { noAck: true });}// 调用端发送消息async sendMsg(correlationId, params, cb) {const q = await new Promise(resolve => {this.channel.assertQueue(this.rpcQueueClient, { exclusive: true }, (e, q) => {resolve(q);});});this.correlationIdPool.set(correlationId, cb);this.channel.consume(q.queue, msg => {if (this.correlationIdPool.has(msg.properties.correlationId)) {const cb = this.correlationIdPool.get(msg.properties.correlationId);cb(msg.content.toString());this.correlationIdPool.delete(msg.properties.correlationId);}}, { noAck: true });this.channel.sendToQueue(this.rpcQueueServer, Buffer.from(JSON.stringify(params)), {correlationId,replyTo: q.queue,});}
}exports.RpcUtil = RpcUtil;

以上代码仅学习用,生产环境不建议mq去实现rpc。

nodejs基于RabbitMq的RPC调用相关推荐

  1. rabbitmq 简易RPC调用示例

    rabbitmq 简易RPC调用示例(后附go代码)) rabbimq 库代码获取 用例概述 客户端 服务端 rabbimq 库代码获取 https://github.com/streadway/am ...

  2. RabbitMQ (五)实现类似Dubbo的RPC调用

    springboot对rabbitMQ的接口做了封装,要实现 request/reponse 模式的调用,只需要调用 rabbitTemplate.convertSendAndReceive 方法即可 ...

  3. RabbitMQ中RPC的实现及其通信机制

    RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlati ...

  4. RabbitMQ学习之基于spring-rabbitmq的RPC远程调用

    http://blog.csdn.net/zhu_tianwei/article/details/40920985 spring-rabbitmq中实现远程接口调用,主要在com.rabbitmq.s ...

  5. 〖Demo〗-- 基于RabbitMQ rpc实现的主机管理

    [基于RabbitMQ rpc实现的主机管理] 要求: 文件分布: 流程图: import pika import os import socketclass Server(object):def _ ...

  6. 精通RabbitMQ之RPC同步调用

    精通RabbitMQ之RPC同步调用 前面我们对应用解耦做过分析,我们能够使用消息中间件来完成应用解耦,很大一部分原因是因为我们的系统之间可以异步处理并且不关心结果回执.假如我们现在需要异步处理的结果 ...

  7. 基于akka的flink RPC调用

    flink中的rpc框架使用的akka.在本节并不详细讲述akka,而是就flink中rpc来讲述akka的部分内容.本节,我从AkkaRpcActor.handleRpcInvocation方法讲起 ...

  8. RabbitMQ之RPC实现

    2019独角兽企业重金招聘Python工程师标准>>> 什么是RPC? RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/ ...

  9. 开发中的坑:MQ 也能做 RPC 调用?

    hi, 大家好,我是 haohongfan. 最近浏览 帖子[1] 的时候看到一个有意思的吐槽. 大概意思是架构师没有选用 RPC 框架来做服务间调用,而选择用 MQ 来代替.是不是很意外? 当然不出 ...

最新文章

  1. springboot添加多数据源连接池并配置Mybatis
  2. CVPR2020中关于3D点云分割
  3. org.apache.poi 读取数字问题
  4. Pretty girl,你一定要去旅行
  5. C语言实用算法系列之行指针
  6. python廖雪峰教程 学习笔记
  7. Vue2 使用Volar 报错:<template v-for> key should be placed on the <template> tag
  8. 拓端tecdat|R语言社区发现算法检测心理学复杂网络:spinglass、探索性图分析walktrap算法与可视化
  9. HTTP 连接管理进化论
  10. 越狱装源未能连接到服务器,科普cydia无法加载源地址插件安装错误解决方法及Cydia怎么备份shsh...
  11. Excel在筛选后进行排序的函数
  12. js word 预览_Word页眉横线怎么去掉与插入、修改、删除页眉页脚
  13. Ubuntu压缩、解压
  14. selenium爬虫模拟登录PayPal
  15. 从微信跳转到appstore下载App
  16. 基于Python生成Markdown的标题序号
  17. 电脑是linux下安装win7,Linux下安装win7
  18. swing-基础Graphics画布
  19. java 折扣_java会员折扣
  20. 原来RA是Router Advertisement的意思

热门文章

  1. java shiro盐值加密_java中spring-shiro实现密码的MD5盐值加密
  2. 网页调用activex控件
  3. 常用的sql语句(持续更新)
  4. 苹果应用商店AppStore审核中文指南
  5. 【重排版】番外4 宁夏酒庄的斯巴达克斯
  6. 室内P1.8超清LED无缝拼接LED显示屏详细介绍及显示效果和尺寸长高比例
  7. SCN时间序列预测模型详解(Matlab代码实现)
  8. 内存优化二Bitmap优化
  9. 【杂文】杂文记录(一)
  10. 华为Mate40系列手机正式发布