springboot对rabbitMQ的接口做了封装,要实现 request/reponse 模式的调用,只需要调用 rabbitTemplate.convertSendAndReceive 方法即可,队列和交换器的设置使用topic模式即可。

  Object res = rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, reqJson,message -> {MessageProperties messageProperties = message.getMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息// messageProperties.setContentType("application/json");String correlationId = UUID.randomUUID().toString().replaceAll("-", "");messageProperties.setCorrelationId(correlationId);
​return message;}, null);

下面通过spring aop、反射、rabbitmq实现一个类似dubbo的rpc调用系统:

  1. rabbitmq 使用topic工作模式

  2. springboot 创建client和server两个应用

  3. 通过注解 @Service 声明远程调用

  4. 客户端通过 RPCAspect 切面拦截本地服务调用,然后通过rabbitMQ发起远程调用,调用服务端的service并返回结果

这样就实现了一个简单的rpc过程,这里因为使用反射,并且序列化直接使用json,所以性能上可能弱鸡,后面有时间再跟dubbo的方式做个压测对比看看。

1 项目结构

项目结构图如下:

2 pom依赖

这里把项目用到的依赖全部直接贴出来

    
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
​<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>compile</scope></dependency>

3 common模块

主要定义了接口和接口实现,以及自定义注解,很简单,直接贴代码

1 IService接口

package com.fmi110.rabbitmq.service;
​
public interface IService {public Object sayHello(String reqObjStr);
}

2 IService实现类

这里需要注意 @Service 是自定义的注解类 com.fmi110.rpc.Service !!!

package com.fmi110.rabbitmq.service;
​
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.UUID;
import com.fmi110.rpc.Service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
​
/*** @author fmi110* @description 远程服务实现类* @date 2021/7/3 20:51*/
@Slf4j
@Component
@Service // 这个是自定义的注解类 com.fmi110.rpc.Service!!!
public class ServiceImpl implements IService {@Overridepublic Object sayHello(String reqObjStr) {
​// 模拟耗时 50 mstry { Thread.sleep(50);} catch (InterruptedException e) {}
​String now = DateUtil.now();String uuid   = UUID.fastUUID().toString();String result = now + ":" + uuid;log.info(">>>>> rpcService  >>>>>");log.info(result);return result;}
}

3 自定义注解

package com.fmi110.rpc;
​
import java.lang.annotation.*;
​
/*** @author fmi110* @description rpc服务注解* @date 2021/7/3 21:09*/
​
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Service {String value() default "";
}

4 服务端

服务端主要执行如下内容:

  1. 监听消息队列内容

  2. 获取消息内容,反序列化请求数据,通过反射从spring上下文中获取service对象,并调用对应的方法,返回结果

1 application.properties

rabbitMQ配置:

  1. 每次只消费一条消息

  2. 使用自动ack机制,消息投递给消费者后自动从消息队列中移除

  3. 开启spring提供的消费失败自动重试机制,每条消息最多消费3次

# 应用名称
spring.application.name=rabbitmq
server.port=9089
server.servlet.context-path=/
​
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 指定连接的虚拟主机,可以在rabbitMQ控制台查看对应的虚拟主机的名字
spring.rabbitmq.virtual-host=my_vhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
​
# 每次只消费一条消息
spring.rabbitmq.listener.simple.prefetch=1
# 开启消费者应答 ack 机制
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 开启spring提供的retry
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.initial-interval=3000

2 RabbitConfigRPC

配置消息队列和交换器,这里使用了 topic 模式

package com.fmi110.rabbitmq.config;
​
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
/*** @author fmi110* @description 配置交换器、队列* @date 2021/7/3 9:58*/
@Slf4j
@Configuration
public class RabbitConfigRPC {
​String exchangeName = "rpc-exchange";String queueName    = "rpc-request-queue";
​@Beanpublic TopicExchange exchange() {boolean durable    = true; // 持久化boolean autoDelete = false; // 消费者全部解绑时不自动删除return new TopicExchange(exchangeName, durable, autoDelete);}
​/*** 持久化队列* @return*/@Beanpublic Queue queue() {return new Queue(queueName, true, false, false);}
​@Beanpublic Binding binding(Queue queue,TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("rabbit.rpc");}
}
​

3 RabbitConsumer

消息消费者,主要执行:

  1. 从消息中提取反射调用需要的 class method arg 等信息

  2. applicationContext 获取服务对象,确定调用的 method 对象,通过反射调用方法并返回结果

package com.fmi110.rabbitmq;
​
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
​
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
​
​
/*** @author fmi110* @description 消息消费者* @date 2021/7/1 16:08*/
@Component
@Slf4j
public class RabbitConsumer {
​@Autowiredprivate ApplicationContext applicationContext;
​@RabbitListener(queues = "rpc-request-queue")public Object consumeRPC(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {return this.doInvoke(data);}
​/*** 反射调用** @param data* @return* @throws ClassNotFoundException*/private Object doInvoke(String data) throws ClassNotFoundException, NoSuchMethodException {Map       map            = JSONUtil.toBean(data, Map.class);String    className      = (String) map.get("class");String    methodName     = (String) map.get("method");JSONArray args           = (JSONArray) map.get("args");JSONArray parameterTypes = (JSONArray) map.get("parameterTypes");log.info(">>>>  RPC请求: {}", data);Class<?>   clazz   = Class.forName(className);Object     service = applicationContext.getBean(clazz);Class<?>[] clzArray   = new Class[parameterTypes.size()];
​for (int i = 0; i < parameterTypes.size(); i++) {String   clz    = (String) parameterTypes.get(i);Class<?> class_ = Class.forName(clz.replace("class ", ""));clzArray[i] = class_;
​// Object arg = args.get(i);}Method method = clazz.getMethod(methodName, clzArray);return ReflectionUtils.invokeMethod(method, service, args.stream().toArray());}
}
​

5 客户端

客户端需要做的事情:

  1. 定义aop切面,拦截远程服务的方法

  2. 获取服务调用的类、方法、参数等信息,封装成rpc调用的参数

  3. 通过rabbitMQ发起rpc调用,并获取返回结果

1 application.properties

主要配置rabbitMQ的链接信息

# 应用名称
spring.application.name=rabbitmq
server.port=8080
server.servlet.context-path=/
​
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 指定连接的虚拟主机,可以在rabbitMQ控制台查看对应的虚拟主机的名字
spring.rabbitmq.virtual-host=my_vhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
​
spring.rabbitmq.listener.simple.prefetch=1
​
# 开启 publish-comfirm 机制和消息路由匹配失败退回机制
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消费者应答 ack 机制
#spring.rabbitmq.listener.simple.acknowledge-mode=manual

2 RabbitConfigRPC

package com.fmi110.rabbitmq.config;
​
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
/*** @author fmi110* @description 配置交换器、队列* @date 2021/7/3 9:58*/
@Slf4j
@Configuration
public class RabbitConfigRPC {
​String exchangeName = "rpc-exchange";String queueName    = "rpc-request-queue";
​@Beanpublic TopicExchange exchange() {boolean durable    = true; // 持久化boolean autoDelete = false; // 消费者全部解绑时不自动删除return new TopicExchange(exchangeName, durable, autoDelete);}
​/*** 持久化、队列* @return*/@Beanpublic Queue queue() {return new Queue(queueName, true, false, false);}
​@Beanpublic Binding binding(Queue queue,TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("rabbit.rpc");}
}
​

3 RPCAspect切面

这是实现远程调用的核心,拦截请求,使用 rabbitmq 实现远程调用

package com.fmi110.rabbitmq.aspect;
​
import cn.hutool.json.JSONUtil;
import com.fmi110.rabbitmq.RabbitProducer;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.HashMap;
​
/*** @author fmi110* @description RPC服务 aop切面* @date 2021/7/3 20:54*/
@Component
@Slf4j
@Aspect
public class RPCAspect {
​@AutowiredRabbitProducer rabbitProducer;
​@Pointcut("execution(public * com.fmi110.rabbitmq.service.ServiceImpl.*(..))")public void rpcPointCut() {}
​
​@Around("rpcPointCut()")public Object around(ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();
​Method          method    = signature.getMethod();
​// 类名Class target    =  point.getTarget().getClass();
​String className = target.getName();// 方法名String methodName = method.getName();Type[] genericParameterTypes = method.getGenericParameterTypes();Class<?>[] parameterTypes = method.getParameterTypes();//请求的参数Object[] args = point.getArgs();String   argsStr    = JSONUtil.toJsonStr(args);log.info("class:{}",className);log.info("method:{}",methodName);log.info("args:{}",argsStr);HashMap<String, Object> map = new HashMap<>();map.put("class", className);map.put("method", methodName);map.put("parameterTypes", parameterTypes);map.put("args", args);String reqJson = JSONUtil.toJsonStr(map);// Object proceed = point.proceed();Object proceed = rabbitProducer.sendRPC(reqJson); // rabbitMQ rpc调用return proceed;}
}
​

4 RabbitProducer

rabbitMQ发送消息的逻辑

package com.fmi110.rabbitmq;
​
import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
​
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.security.AlgorithmConstraints;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
​
/*** @author fmi110* @description 消息生产者* @date 2021/7/1 15:08*/
@Component
@Slf4j
public class RabbitProducer {@AutowiredRabbitTemplate rabbitTemplate;
​/*** 1 设置 confirm 回调,消息发送到 exchange 时回调* 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调* <p>* correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用*/@PostConstructpublic void enableConfirmCallback() {// #1/*** 连接不上 exchange或exchange不存在时回调*/rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (!ack) {log.error("消息发送失败");// TODO 记录日志,发送通知等逻辑}});
​// #2/*** 消息投递到队列失败时,才会回调该方法* message:发送的消息* exchange:消息发往的交换器的名称* routingKey:消息携带的路由关键字信息*/rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.error("{},exchange={},routingKey={}",replyText,exchange,routingKey);// TODO 路由失败后续处理逻辑});}
​public Object sendRPC(String reqJson) {
​String exchangeName = "rpc-exchange";String routingKey = "rabbit.rpc";Object res = rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, reqJson,message -> {MessageProperties messageProperties = message.getMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息messageProperties.setContentType("application/json");String correlationId = UUID.randomUUID().toString().replaceAll("-", "");messageProperties.setCorrelationId(correlationId);
​return message;}, null);log.info(">>>>>服务端返回的响应:");log.info(JSONUtil.toJsonStr(res));return res;}
}
​

5 Controller

触发远程调用使用的

package com.fmi110.rabbitmq.controller;
@Slf4j
@RestController
public class TestController {@Resource(name = "serviceImpl")IService service;
​@GetMapping("/rpcCall")public Object rpcCall(String rpc) {Object result =service.sayHello("aaa");return result;}
}

6 运行效果

客户端截图:

服务端截图:

7 使用场景

在网络可以进行双向通信的场景下,实现远程调用的方式有很多,比如dubbo、EJB等,但是在某些特殊的场景下,比如某些政务系统网络,对网络通信的方向有严格的限制,有些限制外网区不能主动向内网区发起通信,只能内网通外网,这个时候如果要实现外网调用内网的服务,使用dubbo,EJB就无法实现了,这个时候就可以借助消息队列来绕过这个问题,消息服务部署在外网区,消息消费者在内网,消费者就可以连接上外网的服务,进而实现双向通信。

RabbitMQ (五)实现类似Dubbo的RPC调用相关推荐

  1. Dubbo面试 - 如何自己设计一个类似 Dubbo 的 RPC 框架?

    Dubbo面试 - 如何自己设计一个类似 Dubbo 的 RPC 框架? 面试题 如何自己设计一个类似 Dubbo 的 RPC 框架? 面试官心理分析 说实话,就这问题,其实就跟问你如何自己设计一个 ...

  2. SpringCloud集成Dubbo实现RPC调用

    SpringCloud轻松集成Dubbo实现RPC调用 很久之前在做微服务架构选型的时候就听说阿里的微服务RPC框架dubbo,当时与Spring Cloud以http协议调用的架构做对比.发现dub ...

  3. SpringCloud Alibaba实战(12:引入Dubbo实现RPC调用)

    源码地址:https://gitee.com/fighter3/eshop-project.git 持续更新中-- 大家好,我是老三,断更了半年,我又滚回来继续写这个系列了,还有人看吗-- 在前面的章 ...

  4. Dubbo实现RPC调用使用入门

    使用Dubbo进行远程调用实现服务交互,它支持多种协议,如Hessian.HTTP.RMI.Memcached.Redis.Thrift等等.由于Dubbo将这些协议的实现进行了封装了,无论是服务端( ...

  5. Dubbo的RPC调用流程

    首先在客户端启动时会从注册中心拉去和订阅对应的服务列表,Cluster会把拉取到的服务列表聚合成一个cluster,每次RPC调用前会通过Directory#list获取providers地址(已经生 ...

  6. Dubbo——远程(RPC)调用原理

    摘要 服务暴露和服务引入两个流程了,而这两个流程就是为了服务的调用,本博文将详细的介绍Dubbo的服务调用流程. PRC架构组件 一个基本的RPC架构里面应该至少包含以下4个组件: 客户端(Clien ...

  7. 7.Spring Cloud Alibaba教程:整合Dubbo实现RPC调用

    概述 Apache Dubbo 是一款高性能的.基于Java的开源RPC框架,它提供了以下特性: 基于接口的远程方法调用 智能负载均衡 服务自动注册和发现 高可扩展性 运行期流量调度 可视化的服务治理 ...

  8. nodejs基于RabbitMq的RPC调用

    在微服务架构中,SpringCloud,Eureka,Dubbo,ZooKeeper这些框架再熟悉不过了,其中面向接口的远程方法调用是其主要核心功能之一,而MQ主要用来应用解耦,削峰填谷等作用:最近在 ...

  9. 不满足于RPC,详解Dubbo的服务调用链路

    系列文章目录 [收藏向]从用法到源码,一篇文章让你精通Dubbo的SPI机制 面试Dubbo ,却问我和Springcloud有什么区别? 超简单,手把手教你搭建Dubbo工程(内附源码) Dubbo ...

最新文章

  1. Flutter开发之认识Flutter(一)
  2. Razor master page
  3. 工程代码_特征工程学习,19 项实践 Tips!代码已开源!
  4. 什么是Google Play保护以及如何确保Android安全?
  5. 小学计算机课教学设计,小学信息技术教学设计三篇
  6. 迈克尔 杰克逊mv_用杰克逊流式传输大型JSON文件– RxJava常见问题解答
  7. [css] 用css画一个五边形和一个六边形
  8. LeetCode 1484. 克隆含随机指针的二叉树(哈希/递归)
  9. Java笔记-使用ServerSocket构建HTTP服务器
  10. 送你一份价值5800元的技术干货PPT | 技术管理者工作坊强势来袭!
  11. LinkedList遍历方式区别
  12. 190720每日一句
  13. 一文让你详细了解CPU的内部架构和工作原理(好文)
  14. java三猴分桃多线程,浅谈数学趣题:三翁垂钓和五猴分桃
  15. vue 扁平化_JS数组扁平化(flat)
  16. linux系统开机自动锁定键盘,设置linux开机启动小键盘的详细教程设置linux开机启动小键盘的图文教程...
  17. 博客图片html代码,【html博客代码】图片羽化代码
  18. 摩尔纹的原理与产生条件
  19. 用计算机专业起情侣网名,带对方名字的情侣网名最新精选
  20. idea如何给main函数中的args[] 字符串数组赋值

热门文章

  1. 没有内幕交易:Coinbase完成了比特币现金调查
  2. 【面试】Java基础中的那些事-One
  3. localhost与127.0.0.1的区别
  4. C# 实现FTP上传与下载
  5. 卸载linux系统自带JDK,安装自己的jdk
  6. 企业级应用能帮助闪存走出产能过剩时代吗?
  7. 使用Windows远程桌面(mstsc)通过RDP协议访问Ubuntu/Debian服务器
  8. Android消息机制 Looper源码阅读
  9. python3 统计文件夹下文件(含文件夹)的个数
  10. 泥瓦匠进阶:连接池原理设计并不难