从零实现RPC框架1:RPC框架架构设计

1.什么是 RPC?

RPC 的全称是 Remote Procedure Call,即远程过程调用。简单解读字面上的意思,远程肯定是指要跨机器而非本机,所以需要用到网络编程才能实现,但是不是只要通过网络通信访问到另一台机器的应用程序,就可以称之为 RPC 调用了?显然并不够。

我理解的 RPC 是帮助我们屏蔽网络编程细节,实现调用远程方法就跟调用本地(同一个项目中的方法)一样的体验,我们不需要因为这个方法是远程调用就需要编写很多与业务无关的代码。

所以我认为,RPC 的作用就是体现在这样两个方面:

1.拼比底层网络通信的复杂性,让我们感觉就是调用项目内的方法。

2.比起http请求 通信的效率 带宽利用率更高 也就是在报文中 有效数据的占比更高

  1. 屏蔽远程调用跟本地调用的区别,让我们感觉就是调用项目内的方法;
  2. 隐藏底层网络通信的复杂性,让我们更专注于业务逻辑。

2.RPC 框架的结构

一下是dubbo的架构图

一个最简单的 RPC 框架分成三个部分:注册中心、服务端、客户端。以下是一个最简单的结构流程图。

组成部分:

  1. 注册中心:用于注册和获取服务。
  2. 服务端:指提供服务的一方,也叫服务提供方 Provider
  3. 客户端:指调用服务的一方,也叫服务消费者 Consumer

流程:

  1. 服务端把服务信息注册到注册中心,通常包含服务端地址、接口类和方法
  2. 客户端从注册中心获取对应服务的信息
  3. 客户端根据服务的信息,通过网络调用到服务端的接口

当然 其实客户端是可以通过服务端的ip和端口 直连调用的,而不需要向注册中心去获取服务

3.RPC 框架的设计

上面的流程

  1. 服务端以什么形式注册到注册中心?
  2. 客户端是怎么做到像调用接口一样调用服务?
  3. 调用服务的网络协议是怎样的?

一个基本的 RPC 框架,需要包含以下部分:

  1. 注册中心:注册中心负责服务信息的注册与查找。服务端在启动的时候,扫描所有的服务,然后将自己的服务地址和服务名注册到注册中心。客户端在调用服务之前,通过注册中心查找到服务的地址,就可以通过服务的地址调用到服务啦。常见的注册中心有 ZookeeperEureka 等。
  2. 动态代理:客户端调用接口,需要框架能自己根据接口去远程调用服务,这一步是用户无感知的。这样一来,就需要使用到动态代理,用户调用接口,实际上是在调用动态生成的代理类。常见的动态代理有:JDK ProxyCGLibJavassist 等。
  3. 网络传输:RPC 远程调用实际上就是网络传输,所以网络传输是 RPC 框架中必不可少的部分。网络框架有 Java NIONetty 框架等。
  4. 自定义协议:网络传输需要制定好协议,一个良好的协议能提高传输的效率。
  5. 序列化:网络传输肯定会涉及到序列化,常见的序列化有JsonProtostuffKyro 等。
  6. 负载均衡:当请求调用量大的时候,需要增加服务端的数量,一旦增加,就会涉及到符合选择服务的问题,这就是负载均衡。常见的负载均衡策略有:轮询、随机、加权轮询、加权随机、一致性哈希等等。
  7. 集群容错:当请求服务异常的时候,我们是应该直接报错呢?还是重试?还是请求其他服务?这个就是集群容错策略啦。
  8. SPI机制 作为一个框架,支持插件化的必要功能
  9. 支持rpc 同步调用和异步调用

3.系统设计

3.1分层

有了大致的需求,接下来就可以开始着手设计了。首先我们将框架划分为若干层,层与层之间约定通过接口交互。这里就不要问为什么需要分层了,非要问就是经验。分层作为一种经典到不能在经典的设计模式,几乎在软件开发过程中无处不在,在RPC框架当中也十分适用,下面画出大致的层次图:

代码实现概览

下面我们从代码的角度上,来看看以上几部分是如何组织的:

1.服务注册与监听

1.2注册中心

服务注册最终的表现就是:把服务信息注册到注册中心中。
根据注册中心的特性,可以抽出两个接口 ServiceDiscovery,包含了、查找服务的方法。

注册、取消注册

通过实现 ServiceDiscoveryServiceRegistry 接口,可以扩展出多种类型的注册中心。

@SPI
public interface ServiceDiscovery {/*** lookup service by rpcServiceName** @param rpcRequest rpc service pojo* @return service address*/InetSocketAddress lookupService(RpcRequest rpcRequest);
}
@SPI
public interface ServiceRegistry {/***  向注册中心注册服务* register service** @param rpcServiceName    rpc service name* @param inetSocketAddress service address*/void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress);/*** 向注册中心取消注册服务*/void unregister(URL url);}
public interface ServiceProvider {/*** @param rpcServiceConfig rpc service related attributes*/void addService(RpcServiceConfig rpcServiceConfig);/*** @param rpcServiceName rpc service name* @return service object*/Object getService(String rpcServiceName);/*** @param rpcServiceConfig rpc service related attributes*/void publishService(RpcServiceConfig rpcServiceConfig);}

1.2.扫描服务

服务要注册到注册中心,第一步是需要扫描到需要注册的接口。
我们通过 SpringBeanPostProcessor#postProcessBeforeInitialization,将带有 @RpcService 注解的接口进行发布。

@Slf4j
@Component
public class SpringBeanPostProcessor implements BeanPostProcessor {private final ServiceProvider serviceProvider;private final RpcRequestTransport rpcClient;
//    private final AsyncNettyRpcClient asyncNettyRpcClient;public SpringBeanPostProcessor() {this.serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension("netty");}@SneakyThrows@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {if (bean.getClass().isAnnotationPresent(RpcService.class)) {log.info("[{}] is annotated with  [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName());// get RpcService annotationRpcService rpcService = bean.getClass().getAnnotation(RpcService.class);// build RpcServicePropertiesRpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder().group(rpcService.group()).version(rpcService.version()).service(bean).build();serviceProvider.publishService(rpcServiceConfig);}return bean;}.....
}

那么是如何扫描自定义注解到Spring容器中的呢?

可以实现 ImportBeanDefinitionRegistrar

ImportBeanDefinitionRegistrar接口是也是spring的扩展点之一,它可以支持我们自己写的代码封装成BeanDefinition对象;实现此接口的类会回调postProcessBeanDefinitionRegistry方法,注册到spring容器中。把bean注入到spring容器不止有 @Service @Component等注解方式;还可以实现此接口。

@Slf4j
public class CustomScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {private static final String SPRING_BEAN_BASE_PACKAGE = "github.xsj";private static final String BASE_PACKAGE_ATTRIBUTE_NAME = "basePackage";private ResourceLoader resourceLoader;@Overridepublic void setResourceLoader(ResourceLoader resourceLoader) {this.resourceLoader = resourceLoader;}@Overridepublic void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {//get the attributes and values ​​of RpcScan annotationAnnotationAttributes rpcScanAnnotationAttributes = AnnotationAttributes.fromMap(annotationMetadata.getAnnotationAttributes(RpcScan.class.getName()));String[] rpcScanBasePackages = new String[0];if (rpcScanAnnotationAttributes != null) {// get the value of the basePackage propertyrpcScanBasePackages = rpcScanAnnotationAttributes.getStringArray(BASE_PACKAGE_ATTRIBUTE_NAME);}if (rpcScanBasePackages.length == 0) {rpcScanBasePackages = new String[]{((StandardAnnotationMetadata) annotationMetadata).getIntrospectedClass().getPackage().getName()};}// Scan the RpcService annotationCustomScanner rpcServiceScanner = new CustomScanner(beanDefinitionRegistry, RpcService.class);// Scan the Component annotationCustomScanner springBeanScanner = new CustomScanner(beanDefinitionRegistry, Component.class);if (resourceLoader != null) {rpcServiceScanner.setResourceLoader(resourceLoader);springBeanScanner.setResourceLoader(resourceLoader);}int springBeanAmount = springBeanScanner.scan(SPRING_BEAN_BASE_PACKAGE);log.info("springBeanScanner扫描的数量 [{}]", springBeanAmount);int rpcServiceCount = rpcServiceScanner.scan(rpcScanBasePackages);log.info("rpcServiceScanner扫描的数量 [{}]", rpcServiceCount);}}

这样就不仅将@Component 注解 还将@rpcService注解bean也扫描spring容器

1.3IO与监听

RPC 的请求响应本质上是网络请求,作为服务方,需要开启端口监听客户端的请求。
Netty 、Mina是目前java最流行的网络开发框架。

 @SneakyThrowspublic void start() {CustomShutdownHook.getCustomShutdownHook().clearAll();String host = InetAddress.getLocalHost().getHostAddress();EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(RuntimeUtil.cpus() * 2,ThreadPoolFactoryUtils.createThreadFactory("service-handler-group", false));try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。.childOption(ChannelOption.TCP_NODELAY, true)// 是否开启 TCP 底层心跳机制.childOption(ChannelOption.SO_KEEPALIVE, true)//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数.option(ChannelOption.SO_BACKLOG, 128).handler(new LoggingHandler(LogLevel.INFO))// 当客户端第一次进行请求的时候才会进行初始化.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 30 秒之内没有收到客户端请求的话就关闭连接ChannelPipeline p = ch.pipeline();p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));p.addLast(new RpcMessageEncoder());p.addLast(new RpcMessageDecoder());p.addLast(serviceHandlerGroup, new NettyRpcServerHandler());}});// 绑定端口,同步等待绑定成功ChannelFuture f = b.bind(host, PORT).sync();// 等待服务端监听端口关闭f.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("occur exception when start server:", e);} finally {log.error("shutdown bossGroup and workerGroup");bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();serviceHandlerGroup.shutdownGracefully();}}

2.客户端发现、请求

2.1. 扫描

客户端要是用 RPC 接口,首先要用 @RpcReference 注解标出。
通过 SpringBeanPostProcessor#postProcessAfterInitialization 初始化 Bean 之后,生成代理类。
调用接口的时候,这个代理类,就会在背地里偷偷找到服务,并请求到结果返回。

@Slf4j
@Component
public class SpringBeanPostProcessor implements BeanPostProcessor {private final ServiceProvider serviceProvider;private final RpcRequestTransport rpcClient;public SpringBeanPostProcessor() {this.serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension("netty");}....@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> targetClass = bean.getClass();Field[] declaredFields = targetClass.getDeclaredFields();for (Field declaredField : declaredFields) {RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);if (rpcReference != null) {RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder().group(rpcReference.group()).version(rpcReference.version()).build();RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());declaredField.setAccessible(true);try {declaredField.set(bean, clientProxy);} catch (IllegalAccessException e) {e.printStackTrace();}}}return bean;}
}

2.2 服务发现

客户端要请求服务,首先需要找到服务对应的域名/IP 和 端口,这个过程就是服务发现。
服务发现就是从注册中心找到对应服务的地址,上面注册中心的接口有提供对应的方法。

2.3. 负载均衡

从注册中心找到的地址可能是多个,那我们如何从多个地址中选择一个地址,这就是负载均衡。
负载均衡抽象出一个接口 LoadBalance,方法只有一个,就是选择 selectServiceAddress

@SPI
public interface LoadBalance {/*** Choose one from the list of existing service addresses list** @param serviceAddresses Service address list* @return target service address*/String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest);
}

2.4. 集群容错

当请求服务失败之后,应该如何处理?重试?快速失败?这个就是集群容错策略啦。我们来简单看一下重试策略吧。

public class RetryInvoker extends AbstractFaultTolerantInvoker {/*** 默认重试次数*/private static final Integer DEFAULT_RETRY_TIMES = 3;@Overrideprotected RpcResult doInvoke(RpcRequest request, Invoker invoker, List<URL> candidateUrls, LoadBalance loadBalance) throws RpcException {// 获取重试次数int retryTimes = Optional.ofNullable(clusterConfig.getRetryTimes()).orElse(DEFAULT_RETRY_TIMES);RpcException rpcException = null;for (int i = 0; i < retryTimes; i++) {try {// 执行,如果成功则返回结果,失败继续尝试RpcResult result = invoker.invoke(request);if (result.isSuccess()) {return result;}} catch (RpcException ex) {log.error("invoke error. retry times=" + i, ex);rpcException = ex;}}if (rpcException == null) {rpcException = new RpcException("invoker error. request=" + request);}throw rpcException;}
}

3.网络传输

3.1. 序列化

网络传输不可或缺的就是序列化,序列化就是怎么把一个对象的状态信息转化为可以存储或传输的形式的过程。我们常见的序列化方式有JSONProtobuf等二进制流的序列方式 以及 xml json等文本格式。
序列化和反序列化是一对,共同组成序列化器

@SPI
public interface Serializer {/*** 序列化** @param obj 要序列化的对象* @return 字节数组*/byte[] serialize(Object obj);/*** 反序列化** @param bytes 序列化后的字节数组* @param clazz 目标类* @param <T>   类的类型。举个例子,  {@code String.class} 的类型是 {@code Class<String>}.*              如果不知道类的类型的话,使用 {@code Class<?>}* @return 反序列化的对象*/<T> T deserialize(byte[] bytes, Class<T> clazz);
}

3.2. 自定义协议

如果传输层的协议是使用TCP的话,TCP又是基于字节流的,那么因为TCP有缓冲区限制的问题就会发生粘包和拆包

为什么会发生 TCP 粘包、拆包?

  • 要发送的数据大于 TCP 发送缓冲区剩余空间大小,将会发生拆包。
  • 待发送数据大于 MSS(最大报文长度),TCP 在传输前将进行拆包。
  • 要发送的数据小于 TCP 发送缓冲区的大小,TCP 将多次写入缓冲区的数据一次发送出去,将会发生粘包。
  • 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。

粘包、拆包解决办法

由于 TCP 本身是面向字节流的,无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,归纳如下:

  • **消息定长:**发送端将每个数据包封装为固定长度(不够的可以通过补 0 填充),这样接收端每次接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。
  • **设置消息边界:**服务端从网络流中按消息边界分离出消息内容。在包尾增加回车换行符进行分割,例如 FTP 协议。
  • **将消息分为消息头和消息体:**消息头中包含表示消息总长度(或者消息体长度)的字段。
  • 更复杂的应用层协议比如 Netty 中实现的一些协议都对粘包、拆包做了很好的处理。

网络传输中,收发两端如何正确解析请求,统一的协议是必不可少的

RPC 协议就是围绕应用层协议展开的

以下是body长度固定的 rpc协议

Netty 中的表现就是编码解码器 codec

后序会在此协议上升级为 包头长度也可扩展的协议。

0     1     2       3    4    5    6    7           8        9        10   11   12   13   14   15   16   17   18
+-----+-----+-------+----+----+----+----+-----------+---------+--------+----+----+----+----+----+----+----+---+
|   magic   |version|    full length    |messageType|serialize|compress|              RequestId               |
+-----+-----+-------+----+----+----+----+-----------+----- ---+--------+----+----+----+----+----+----+----+---+
|                                                                                                             |
|                                         body                                                                |
|                                                                                                             |
|                                        ... ...                                                              |
+-------------------------------------------------------------------------------------------------------------+
2B magic(魔法数)
1B version(版本)
4B full length(消息长度)
1B messageType(消息类型)
1B serialize(序列化类型)
1B compress(压缩类型)
8B requestId(请求的Id)
body(object类型数据)

4.总结

RPC 的组成包括: 注册中心、动态代理、网络传输、自定义协议、序列化、负载均衡、集群容错等等。
想要深入了解,先要知道他们是怎么组合运作的,其简单的运作都在上面提到了。

从零实现RPC框架1:RPC框架架构设计相关推荐

  1. 可以伪装mysql子节点框架是啥_kingbus 架构设计之如何伪装成 MySQL Master 角色

    1.背景 kingbus 是一个基于 raft 强一致协议实现的分布式 MySQL binlog 存储系统.它能够充当一个 MySQL Slave 从真正的 Master 上同步 binglog,并存 ...

  2. 徒手撸框架--实现 RPC 远程调用

    微服务,已经是每个互联网开发者必须掌握的一项技术.而 RPC 框架,是构成微服务最重要的组成部分之一.趁最近有时间.又看了看 dubbo 的源码.dubbo 为了做到灵活和解耦,使用了大量的设计模式和 ...

  3. java高性能rpc,企业级rpc,zk调度,负载均衡,泛化调用一体的rpc服务框架

    先放出链接,喜欢的给个star:https://gitee.com/a1234567891/koalas-rpc 一:项目介绍 koalas-RPC 个人作品,提供大家交流学习,有意见请私信,欢迎拍砖 ...

  4. 【RPC框架、RPC框架必会的基本知识、手写一个RPC框架案例、优秀的RPC框架Dubbo、Dubbo和SpringCloud框架比较】

    一.RPC框架必会的基本知识 1.1 什么是RPC? RPC(Remote Procedure Call --远程过程调用),它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络的技术. ...

  5. java基础巩固-宇宙第一AiYWM:为了维持生计,手写RPC~Version07(RPC原理、序列化框架们、网络协议框架们 、RPC 能帮助我们做什么呢、RPC异常排查:ctrl+F搜超时)整起

    上次Version06说到了咱们手写迷你版RPC的大体流程, 对咱们的迷你版RPC的大体流程再做几点补充: 为什么要封装网络协议,别人说封装好咱们就要封装?Java有这个特性那咱就要用?好像是这样.看 ...

  6. RPC框架和HTTP框架的区别

    RPC框架和HTTP框架的区别 有了HTTP协议,为什么还需要RPC远程过程调用协议? 由于RPC直接通过自定义TCP协议实现通信,而HTTP服务通过Http协议(Http在TCP之上),相当于多了一 ...

  7. 远程过程调用RPC 2:RPC思想与RPC框架

    RPC思想与RPC框架 RPC思想 组成部分 RPC框架 完整的RPC框架 RPC调用关键点 RPC框架分类对比 RPC和REST REST主要原则 对比 RPC思想 上一篇笔记:远程过程调用RPC ...

  8. 腾讯零反射全动态Android插件框架Shadow解析

    简介 最近几年,腾讯对于开源事业也是越来越支持,今天要说的就是在腾讯被广泛使用的Shadow框架,一个经过线上亿级用户量检验的反射全动态Android插件框架. 首先,让我们来看一下官方对于Shado ...

  9. 【零】ODB - C++ 持久层框架ODB

    [零]ODB - C++持久层框架ODB http://www.codesynthesis.com/products/odb/ 11款C++持久层框架 http://www.oschina.net/p ...

最新文章

  1. Abiword对话框资源
  2. BIOS中断相关资料和应用
  3. Oracle 用户、对象权限、系统权限
  4. 手写 单隐藏层神经网络_反向传播(Matlab实现)
  5. CodeForces - 1141D Colored Boots(暴力+水题)
  6. 技术管理—管理书籍推荐
  7. JavaSE简单实现多线程聊天
  8. MyBatis 为什么需要通用 Mapper ? 1
  9. php和会计,财务跟会计有什么区别
  10. 编程之美二进制一的个数
  11. vue点击改变data_vue 中自定义指令改变data中的值
  12. 2021-05-24
  13. 学习笔记03_测试用例
  14. 菜单栏、工具栏、状态栏——QT
  15. 进击的蚂蚁金融云与场景焦虑的银行
  16. 如何从面试官中知道自己需要准备什么
  17. git clone 报错 Permission denied (publickey,password).git配置ssh key
  18. 台式计算机负荷,简单计算台式电脑功率
  19. 新机器导致显示器发黄的问题
  20. 微信服务号开发-获取用户位置信息

热门文章

  1. proxy跨域不生效_配置proxy解决跨域问题
  2. Xray-强大的漏洞扫描工具
  3. Unity实战问题-双击脚本打不开的原因和解决方法
  4. Istio client-go 使用 patch 更新资源
  5. Java环境变量配置及完全卸载
  6. linux查看服务器内存
  7. 关于框架Spring------学习的第三天(AOP开发)
  8. 计算机毕业设计ssm拼车平台0k47u系统+程序+源码+lw+远程部署
  9. css3图标一直旋转样式,css3 – 问题使Bootstrap3图标旋转
  10. 1141 PAT Ranking of Institutions (25 分)