RabbitMQ (五)实现类似Dubbo的RPC调用
springboot对rabbitMQ的接口做了封装,要实现 request/reponse 模式的调用,只需要调用 rabbitTemplate.convertSendAndReceive
方法即可,队列和交换器的设置使用topic模式即可。
Object res = rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, reqJson,message -> {MessageProperties messageProperties = message.getMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息// messageProperties.setContentType("application/json");String correlationId = UUID.randomUUID().toString().replaceAll("-", "");messageProperties.setCorrelationId(correlationId);
return message;}, null);
下面通过spring aop、反射、rabbitmq实现一个类似dubbo的rpc调用系统:
rabbitmq 使用topic工作模式
springboot 创建client和server两个应用
通过注解
@Service
声明远程调用客户端通过
RPCAspect
切面拦截本地服务调用,然后通过rabbitMQ发起远程调用,调用服务端的service并返回结果
这样就实现了一个简单的rpc过程,这里因为使用反射,并且序列化直接使用json,所以性能上可能弱鸡,后面有时间再跟dubbo的方式做个压测对比看看。
1 项目结构
项目结构图如下:
2 pom依赖
这里把项目用到的依赖全部直接贴出来
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>compile</scope></dependency>
3 common模块
主要定义了接口和接口实现,以及自定义注解,很简单,直接贴代码
1 IService接口
package com.fmi110.rabbitmq.service;
public interface IService {public Object sayHello(String reqObjStr);
}
2 IService实现类
这里需要注意 @Service
是自定义的注解类 com.fmi110.rpc.Service
!!!
package com.fmi110.rabbitmq.service;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.UUID;
import com.fmi110.rpc.Service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/*** @author fmi110* @description 远程服务实现类* @date 2021/7/3 20:51*/
@Slf4j
@Component
@Service // 这个是自定义的注解类 com.fmi110.rpc.Service!!!
public class ServiceImpl implements IService {@Overridepublic Object sayHello(String reqObjStr) {
// 模拟耗时 50 mstry { Thread.sleep(50);} catch (InterruptedException e) {}
String now = DateUtil.now();String uuid = UUID.fastUUID().toString();String result = now + ":" + uuid;log.info(">>>>> rpcService >>>>>");log.info(result);return result;}
}
3 自定义注解
package com.fmi110.rpc;
import java.lang.annotation.*;
/*** @author fmi110* @description rpc服务注解* @date 2021/7/3 21:09*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Service {String value() default "";
}
4 服务端
服务端主要执行如下内容:
监听消息队列内容
获取消息内容,反序列化请求数据,通过反射从spring上下文中获取service对象,并调用对应的方法,返回结果
1 application.properties
rabbitMQ配置:
每次只消费一条消息
使用自动ack机制,消息投递给消费者后自动从消息队列中移除
开启spring提供的消费失败自动重试机制,每条消息最多消费3次
# 应用名称
spring.application.name=rabbitmq
server.port=9089
server.servlet.context-path=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 指定连接的虚拟主机,可以在rabbitMQ控制台查看对应的虚拟主机的名字
spring.rabbitmq.virtual-host=my_vhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
# 每次只消费一条消息
spring.rabbitmq.listener.simple.prefetch=1
# 开启消费者应答 ack 机制
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 开启spring提供的retry
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.initial-interval=3000
2 RabbitConfigRPC
配置消息队列和交换器,这里使用了 topic 模式
package com.fmi110.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** @author fmi110* @description 配置交换器、队列* @date 2021/7/3 9:58*/
@Slf4j
@Configuration
public class RabbitConfigRPC {
String exchangeName = "rpc-exchange";String queueName = "rpc-request-queue";
@Beanpublic TopicExchange exchange() {boolean durable = true; // 持久化boolean autoDelete = false; // 消费者全部解绑时不自动删除return new TopicExchange(exchangeName, durable, autoDelete);}
/*** 持久化队列* @return*/@Beanpublic Queue queue() {return new Queue(queueName, true, false, false);}
@Beanpublic Binding binding(Queue queue,TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("rabbit.rpc");}
}
3 RabbitConsumer
消息消费者,主要执行:
从消息中提取反射调用需要的
class
method
arg
等信息从
applicationContext
获取服务对象,确定调用的 method 对象,通过反射调用方法并返回结果
package com.fmi110.rabbitmq;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
/*** @author fmi110* @description 消息消费者* @date 2021/7/1 16:08*/
@Component
@Slf4j
public class RabbitConsumer {
@Autowiredprivate ApplicationContext applicationContext;
@RabbitListener(queues = "rpc-request-queue")public Object consumeRPC(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {return this.doInvoke(data);}
/*** 反射调用** @param data* @return* @throws ClassNotFoundException*/private Object doInvoke(String data) throws ClassNotFoundException, NoSuchMethodException {Map map = JSONUtil.toBean(data, Map.class);String className = (String) map.get("class");String methodName = (String) map.get("method");JSONArray args = (JSONArray) map.get("args");JSONArray parameterTypes = (JSONArray) map.get("parameterTypes");log.info(">>>> RPC请求: {}", data);Class<?> clazz = Class.forName(className);Object service = applicationContext.getBean(clazz);Class<?>[] clzArray = new Class[parameterTypes.size()];
for (int i = 0; i < parameterTypes.size(); i++) {String clz = (String) parameterTypes.get(i);Class<?> class_ = Class.forName(clz.replace("class ", ""));clzArray[i] = class_;
// Object arg = args.get(i);}Method method = clazz.getMethod(methodName, clzArray);return ReflectionUtils.invokeMethod(method, service, args.stream().toArray());}
}
5 客户端
客户端需要做的事情:
定义aop切面,拦截远程服务的方法
获取服务调用的类、方法、参数等信息,封装成rpc调用的参数
通过rabbitMQ发起rpc调用,并获取返回结果
1 application.properties
主要配置rabbitMQ的链接信息
# 应用名称
spring.application.name=rabbitmq
server.port=8080
server.servlet.context-path=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 指定连接的虚拟主机,可以在rabbitMQ控制台查看对应的虚拟主机的名字
spring.rabbitmq.virtual-host=my_vhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.simple.prefetch=1
# 开启 publish-comfirm 机制和消息路由匹配失败退回机制
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消费者应答 ack 机制
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
2 RabbitConfigRPC
package com.fmi110.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** @author fmi110* @description 配置交换器、队列* @date 2021/7/3 9:58*/
@Slf4j
@Configuration
public class RabbitConfigRPC {
String exchangeName = "rpc-exchange";String queueName = "rpc-request-queue";
@Beanpublic TopicExchange exchange() {boolean durable = true; // 持久化boolean autoDelete = false; // 消费者全部解绑时不自动删除return new TopicExchange(exchangeName, durable, autoDelete);}
/*** 持久化、队列* @return*/@Beanpublic Queue queue() {return new Queue(queueName, true, false, false);}
@Beanpublic Binding binding(Queue queue,TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("rabbit.rpc");}
}
3 RPCAspect切面
这是实现远程调用的核心,拦截请求,使用 rabbitmq 实现远程调用
package com.fmi110.rabbitmq.aspect;
import cn.hutool.json.JSONUtil;
import com.fmi110.rabbitmq.RabbitProducer;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.HashMap;
/*** @author fmi110* @description RPC服务 aop切面* @date 2021/7/3 20:54*/
@Component
@Slf4j
@Aspect
public class RPCAspect {
@AutowiredRabbitProducer rabbitProducer;
@Pointcut("execution(public * com.fmi110.rabbitmq.service.ServiceImpl.*(..))")public void rpcPointCut() {}
@Around("rpcPointCut()")public Object around(ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
// 类名Class target = point.getTarget().getClass();
String className = target.getName();// 方法名String methodName = method.getName();Type[] genericParameterTypes = method.getGenericParameterTypes();Class<?>[] parameterTypes = method.getParameterTypes();//请求的参数Object[] args = point.getArgs();String argsStr = JSONUtil.toJsonStr(args);log.info("class:{}",className);log.info("method:{}",methodName);log.info("args:{}",argsStr);HashMap<String, Object> map = new HashMap<>();map.put("class", className);map.put("method", methodName);map.put("parameterTypes", parameterTypes);map.put("args", args);String reqJson = JSONUtil.toJsonStr(map);// Object proceed = point.proceed();Object proceed = rabbitProducer.sendRPC(reqJson); // rabbitMQ rpc调用return proceed;}
}
4 RabbitProducer
rabbitMQ发送消息的逻辑
package com.fmi110.rabbitmq;
import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.security.AlgorithmConstraints;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
/*** @author fmi110* @description 消息生产者* @date 2021/7/1 15:08*/
@Component
@Slf4j
public class RabbitProducer {@AutowiredRabbitTemplate rabbitTemplate;
/*** 1 设置 confirm 回调,消息发送到 exchange 时回调* 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调* <p>* correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用*/@PostConstructpublic void enableConfirmCallback() {// #1/*** 连接不上 exchange或exchange不存在时回调*/rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (!ack) {log.error("消息发送失败");// TODO 记录日志,发送通知等逻辑}});
// #2/*** 消息投递到队列失败时,才会回调该方法* message:发送的消息* exchange:消息发往的交换器的名称* routingKey:消息携带的路由关键字信息*/rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.error("{},exchange={},routingKey={}",replyText,exchange,routingKey);// TODO 路由失败后续处理逻辑});}
public Object sendRPC(String reqJson) {
String exchangeName = "rpc-exchange";String routingKey = "rabbit.rpc";Object res = rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, reqJson,message -> {MessageProperties messageProperties = message.getMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息messageProperties.setContentType("application/json");String correlationId = UUID.randomUUID().toString().replaceAll("-", "");messageProperties.setCorrelationId(correlationId);
return message;}, null);log.info(">>>>>服务端返回的响应:");log.info(JSONUtil.toJsonStr(res));return res;}
}
5 Controller
触发远程调用使用的
package com.fmi110.rabbitmq.controller;
@Slf4j
@RestController
public class TestController {@Resource(name = "serviceImpl")IService service;
@GetMapping("/rpcCall")public Object rpcCall(String rpc) {Object result =service.sayHello("aaa");return result;}
}
6 运行效果
客户端截图:
服务端截图:
7 使用场景
在网络可以进行双向通信的场景下,实现远程调用的方式有很多,比如dubbo、EJB等,但是在某些特殊的场景下,比如某些政务系统网络,对网络通信的方向有严格的限制,有些限制外网区不能主动向内网区发起通信,只能内网通外网,这个时候如果要实现外网调用内网的服务,使用dubbo,EJB就无法实现了,这个时候就可以借助消息队列来绕过这个问题,消息服务部署在外网区,消息消费者在内网,消费者就可以连接上外网的服务,进而实现双向通信。
RabbitMQ (五)实现类似Dubbo的RPC调用相关推荐
- Dubbo面试 - 如何自己设计一个类似 Dubbo 的 RPC 框架?
Dubbo面试 - 如何自己设计一个类似 Dubbo 的 RPC 框架? 面试题 如何自己设计一个类似 Dubbo 的 RPC 框架? 面试官心理分析 说实话,就这问题,其实就跟问你如何自己设计一个 ...
- SpringCloud集成Dubbo实现RPC调用
SpringCloud轻松集成Dubbo实现RPC调用 很久之前在做微服务架构选型的时候就听说阿里的微服务RPC框架dubbo,当时与Spring Cloud以http协议调用的架构做对比.发现dub ...
- SpringCloud Alibaba实战(12:引入Dubbo实现RPC调用)
源码地址:https://gitee.com/fighter3/eshop-project.git 持续更新中-- 大家好,我是老三,断更了半年,我又滚回来继续写这个系列了,还有人看吗-- 在前面的章 ...
- Dubbo实现RPC调用使用入门
使用Dubbo进行远程调用实现服务交互,它支持多种协议,如Hessian.HTTP.RMI.Memcached.Redis.Thrift等等.由于Dubbo将这些协议的实现进行了封装了,无论是服务端( ...
- Dubbo的RPC调用流程
首先在客户端启动时会从注册中心拉去和订阅对应的服务列表,Cluster会把拉取到的服务列表聚合成一个cluster,每次RPC调用前会通过Directory#list获取providers地址(已经生 ...
- Dubbo——远程(RPC)调用原理
摘要 服务暴露和服务引入两个流程了,而这两个流程就是为了服务的调用,本博文将详细的介绍Dubbo的服务调用流程. PRC架构组件 一个基本的RPC架构里面应该至少包含以下4个组件: 客户端(Clien ...
- 7.Spring Cloud Alibaba教程:整合Dubbo实现RPC调用
概述 Apache Dubbo 是一款高性能的.基于Java的开源RPC框架,它提供了以下特性: 基于接口的远程方法调用 智能负载均衡 服务自动注册和发现 高可扩展性 运行期流量调度 可视化的服务治理 ...
- nodejs基于RabbitMq的RPC调用
在微服务架构中,SpringCloud,Eureka,Dubbo,ZooKeeper这些框架再熟悉不过了,其中面向接口的远程方法调用是其主要核心功能之一,而MQ主要用来应用解耦,削峰填谷等作用:最近在 ...
- 不满足于RPC,详解Dubbo的服务调用链路
系列文章目录 [收藏向]从用法到源码,一篇文章让你精通Dubbo的SPI机制 面试Dubbo ,却问我和Springcloud有什么区别? 超简单,手把手教你搭建Dubbo工程(内附源码) Dubbo ...
最新文章
- Flutter开发之认识Flutter(一)
- Razor master page
- 工程代码_特征工程学习,19 项实践 Tips!代码已开源!
- 什么是Google Play保护以及如何确保Android安全?
- 小学计算机课教学设计,小学信息技术教学设计三篇
- 迈克尔 杰克逊mv_用杰克逊流式传输大型JSON文件– RxJava常见问题解答
- [css] 用css画一个五边形和一个六边形
- LeetCode 1484. 克隆含随机指针的二叉树(哈希/递归)
- Java笔记-使用ServerSocket构建HTTP服务器
- 送你一份价值5800元的技术干货PPT | 技术管理者工作坊强势来袭!
- LinkedList遍历方式区别
- 190720每日一句
- 一文让你详细了解CPU的内部架构和工作原理(好文)
- java三猴分桃多线程,浅谈数学趣题:三翁垂钓和五猴分桃
- vue 扁平化_JS数组扁平化(flat)
- linux系统开机自动锁定键盘,设置linux开机启动小键盘的详细教程设置linux开机启动小键盘的图文教程...
- 博客图片html代码,【html博客代码】图片羽化代码
- 摩尔纹的原理与产生条件
- 用计算机专业起情侣网名,带对方名字的情侣网名最新精选
- idea如何给main函数中的args[] 字符串数组赋值