一.概述

什么是RPC?

  • 远程服务调用
  • 官方:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的思想
  • 通俗一点:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样。
  • 市面上常见的rpc框架:dobbo,springCloud,gRPC...

那为什么要有 RPC,HTTP 不好么?

  • 因为 RPC 和 HTTP 就不是一个层级的东西,所以严格意义上这两个没有可比性,也不应该来作比较。
  • HTTP 只是传输协议,协议只是规范了一定的交流格式
  • RPC 对比的是本地过程调用,是用来作为分布式系统之间的通信,它可以用 HTTP 来传输,也可以基于 TCP 自定义协议传输。
  • HTTP 协议比较冗余,所以 RPC 大多都是基于 TCP 自定义协议,定制化的才是最适合自己的。

项目总体结构


整体架构

接下来,分别解释上述的过程

二.自定义注解

服务的提供者和消费者公用一个接口,@ServiceExpose是为了暴露服务,放在生产者的某个实现类上;@ServiceReference是为了引用服务,放在消费者的需要注入的属性上。

  • Target:指定被修饰的Annotation可以放置的位置(被修饰的目标)

    • @Target(ElementType.TYPE) //接口、类
    • @Target(ElementType.FIELD) //属性
    • @Target(ElementType.METHOD) //方法
    • @Target(ElementType.PARAMETER) //方法参数
    • @Target(ElementType.CONSTRUCTOR) //构造函数
    • @Target(ElementType.LOCAL_VARIABLE) //局部变量
    • @Target(ElementType.ANNOTATION_TYPE) //注解
    • @Target(ElementType.PACKAGE) //包
  • Retention:定义注解的保留策略
    • @Retention(RetentionPolicy.SOURCE) //注解仅存在于源码中,在class字节码文件中不包含
    • @Retention(RetentionPolicy.CLASS) //默认的保留策略,注解会在class字节码文件中存在,但运行时无法获得
    • @Retention(RetentionPolicy.RUNTIME) //注解会在class字节码文件中存在,在运行时可以通过反射获取到
  • Documented:指定被修饰的该Annotation可以被javadoc工具提取成文档
  • Inherited:指定被修饰的Annotation将具有继承性

二.启动配置

主要是加载一些rpc相关的配置类,使用SpringBoot自动装配。可以使用SPI机制加入一些自定义的类,放到指定文件夹中。

三.rpc接口注入/rpc服务扫描

这里主要就是通过反射获得对应注解的属性/类,进行服务暴露/服务引用。 这里需要关注的是什么时候进行服务暴露/引用?如下:

  • 客户端:一般有俩种方案
  • 饿汉式:饿汉式是通过实现 Spring 的InitializingBean接口中的 afterPropertiesSet方法,容器通过调用 ReferenceBean的 afterPropertiesSet方法时引入服务。(在Spring启动时,给所有的属性注入实现类,包含远程和本地的实现类)懒汉式:只有当这个服务被注入到其他类中时启动引入流程,也就是说用到了才会开始服务引入。在应用的Spring IOC 容器刷新完毕(spring Context初始化)之后,扫描所有的Bean,将Bean中带有@ServiceExpose/@ServiceReference注解的field获取到,然后创建field类型的代理对象,创建完成后,将代理对象set给此field。后续就通过该代理对象创建服务端连接,并发起调用。(dubbo默认)
  • 服务端:与懒汉式一样。

那么怎么知道Spring IOC刷新完成,这里就使用一个Spring提供的监听器,当Spring IOC刷新完成,就会触发监听器。

四.服务注册到ZK/从Zk获得服务

Zookeeper采用节点树的数据模型,类似linux文件系统,/,/node1,/node2 比较简单。不懂Zookeeper请移步:Zookeeper原理

我们采用的是对每个服务名创建一个持久节点,服务注册时实际上就是在zookeeper中该持久节点下创建了一个临时节点,该临时节点存储了服务的IP、端口、序列化方式等。

客户端获取服务时通过获取持久节点下的临时节点列表,解析服务地址数据:

客户端监听服务变化:

五.生成代理类对象

这里使用JDK的动态代理,也可以使用cglib或者Javassist(dobbo使用)。

public class ClientProxyFactory {/*** 获取代理对象,绑定 invoke 行为** @param clazz 接口 class 对象* @param <T>   类型* @return 代理对象*/public <T> T getProxyInstance(Class<T> clazz) {return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {final Random random = new Random();@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 第一步:通过服务发现机制选择一个服务提供者暴露的服务String serviceName = clazz.getName();final List<ServiceInfo> serviceInfos = serviceDiscovery.listServices(serviceName);logger.info("Rpc server instance list: {}", serviceInfos);if (CollectionUtils.isEmpty(serviceInfos)) {throw new RpcException("No rpc servers found.");}// TODO: 这里模拟负载均衡,从多个服务提供者暴露的服务中随机挑选一个,后期写方法实现负载均衡final ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));// 第二步:构造 rpc 请求对象final RpcRequest rpcRequest = new RpcRequest();rpcRequest.setServiceName(serviceName);rpcRequest.setMethod(method.getName());rpcRequest.setParameterTypes(method.getParameterTypes());rpcRequest.setParameters(args);// 第三步:编码请求消息, TODO: 这里可以配置多种编码方式byte[] data = messageProtocol.marshallingReqMessage(rpcRequest);// 第四步:调用 rpc client 开始发送消息byte[] byteResponse = rpcClient.sendMessage(data, serviceInfo);// 第五步:解码响应消息final RpcResponse rpcResponse = messageProtocol.unmarshallingRespMessage(byteResponse);// 第六步:解析返回结果进行处理if (rpcResponse.getException() != null) {throw rpcResponse.getException();}return rpcResponse.getRetValue();}});}
}

六.负载均衡

本实现支持两种主要负载均衡策略,随机和轮询,其中他们都支持带权重的随机和轮询,其实也就是四种策略。

七.Netty通信

服务端和客户端基本一样,这里只展示服务端的代码。代理对象在Spring启动的时候就生成了,但是没有调用,每一个调用(请求)都会生成一个Netty的连接。

public class NettyRpcServer extends RpcServer {@Overridepublic void start() {// 创建两个线程组EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建服务端的启动对象ServerBootstrap serverBootstrap = new ServerBootstrap()// 设置两个线程组.group(bossGroup, workerGroup)// 设置服务端通道实现类型.channel(NioServerSocketChannel.class)// 服务端用于接收进来的连接,也就是boosGroup线程, 线程队列大小.option(ChannelOption.SO_BACKLOG, 100).childOption(ChannelOption.SO_KEEPALIVE, true)// child 通道,worker 线程处理器.childHandler(new ChannelInitializer<SocketChannel>() {// 给 pipeline 管道设置自定义的处理器@Overridepublic void initChannel(SocketChannel channel) {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast(new NettyServerHandler());}});// 绑定端口号,同步启动服务ChannelFuture channelFuture = serverBootstrap.bind(port).sync();channel = channelFuture.channel();// 对关闭通道进行监听,变为同步channelFuture.channel().closeFuture().sync();} catch (Exception e) {logger.error("server error.", e);} finally {// 释放线程组资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}

实现具体handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {//当通道就绪就会触发该方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//进行记录logger.info("channel active: {}", ctx);}//读取数据实际(这里我们可以读取客户端发送的消息)@Overridepublic void channelRead(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {//将数据读到buffer中final ByteBuf msgBuf = (ByteBuf) msg;final byte[] reqBytes = new byte[msgBuf.readableBytes()];msgBuf.readBytes(reqBytes);}//数据读取完毕@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//使用反射获找到目标方法进行返回final byte[] respBytes = requestHandler.handleRequest(reqBytes);ctx.writeAndFlush(respBytes);}//处理异常, 一般是需要关闭通道@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

八.序列化协议

对计算机网络稍微有一点了解的同学都知道,数据在网络中传输是二进制的:01010101010101010,类似这种,只有二进制数据才能在网络中传输。但是在编码之前我们一般先进行序列化,目的是为了优化传输的数据量。因为有的数据太大,需要进行空间优化。

那么我们来区分一下序列化和编码:我画一张图大家都全明白了


定义一个序列化协议,放入作为一个handler放入pipeline中。

Netty支持多种序列化,比如jdk,Json,ProtoBuf 等,这里使用ProtoBuf,其序列化后码流小性能高,非常适合RPC调用。接下来看怎么使用ProtoBuf?

  • 1.编写需要序列化的类xxx.proto:ProtoBuf有自己的语法规则(自行百度)

  • 2.通过官网提供的protoc.exe生成对应的Java代码
  • 3.前面通过工具生成的代码(AnimalProto)已经帮我们封装好了序列化和反序列化的方法,我们只需要调用对应方法即可

引入Protobuf的依赖

<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>2.4.1</version>
</dependency>

序列化:

/*** 调用对象构造好的Builder,完成属性赋值和序列化操作* @return*/
public static byte[] protobufSerializer(){AnimalProto.Animal.Builder builder = AnimalProto.Animal.newBuilder();builder.setId(1L);builder.setName("小猪");List<String> actions = new ArrayList<>();actions.add("eat");actions.add("run");builder.addAllActions(actions);return builder.build().toByteArray();
}

反序列化:

/*** 通过调用parseFrom则完成反序列化* @param bytes* @return* @throws InvalidProtocolBufferException*/
public static Animal deserialize(byte[] bytes) throws Exception {AnimalProto.Animal pAnimal = AnimalProto.Animal.parseFrom(bytes);Animal animal = new Animal();animal.setId(pAnimal.getId());animal.setName(pAnimal.getName());animal.setActions(pAnimal.getActionsList());return animal;
}

测试:

public static void main(String[] args) throws Exception {byte[] bytes = serializer();Animal animal = deserialize(bytes);System.out.println(animal);
}

以下看到是能正常序列化和反序列化的:

九.通信协议

通信协议主要是解决网络传输问题,比如TCP拆包粘包问题。

TCP问题:

  • TCP拆包粘包主要就是把一些数据合并或者分割开进行发送,这时候有的数据就不完整,有的数据就多出一部分,就会造成问题。一般使用TCP协议都需要考虑拆包粘包问题
  • tcp粘包和半包问题就是因为滑动窗口。 因为不管你的数据是多少长度,怎么分割每一条数据。但是tcp只按照我滑动窗口的长度发送。
  • 本质是因为TCP是流式协议,消息无边界。

解决方案:业界的主流协议的解决方案可以归纳如下

  • 消息定长:例如每个报文的大小为固定长度100字节,如果不够用空格补足。(定长解码器)
  • 在包尾加特殊结束符进行分割。(分隔符编码器)

  • 消息长度+消息:将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段。Netty自带:

  • 自定义编解码器

这里只是列举出来编码过程,解码是逆过程。(说白了,编码就是找着固定的格式进行写入,解码就是照着固定的格式读)


恭喜你,已经学会写RPC框架了,想深入了解的朋友可以参照源码。进行学习,升级。

原文链接:
https://www.cnblogs.com/monkey-xuan/p/15893604.html

作者:Monkey-X

如果觉得本文对你有帮助,可以转发关注支持一下

大厂程序员都会的分布式RPC框架,直接无私打包分享,手慢无相关推荐

  1. Java 程序员必备的 15 个框架,前 3 个地位无可动摇!

    2019独角兽企业重金招聘Python工程师标准>>> Java 程序员方向太多,且不说移动开发.大数据.区块链.人工智能这些,大部分 Java 程序员都是 Java Web/后端开 ...

  2. 如何实现一个分布式 RPC 框架

    远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议.该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程.RPC的主要目标 ...

  3. 自己动手从0开始实现一个分布式RPC框架

    简介: 如果一个程序员能清楚的了解RPC框架所具备的要素,掌握RPC框架中涉及的服务注册发现.负载均衡.序列化协议.RPC通信协议.Socket通信.异步调用.熔断降级等技术,可以全方位的提升基本素质 ...

  4. 还发愁项目经验吗?基于Netty实现分布式RPC框架[附完整代码]

    写给大家的话 最近我收到很多读者的来信,对如何学习分布式.如何进行项目实践和提高编程能力,存在很多疑问. 分布式那么难,怎么学?为什么看了那么多书还是掌握不了? 开源的框架比如Dubbo代码太多了,完 ...

  5. 分布式RPC框架dubbo、motan、rpcx、gRPC、thrift简介与性能比较

    分布式RPC框架性能大比拼 dubbo.motan.rpcx.gRPC.thrift的性能比较  Dubbo 是阿里巴巴公司开源的一个Java高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现 ...

  6. 某大厂程序员哀叹:千万不要从大厂往小厂跳,后悔死了!小厂只会逼迫压榨,刚来就一个劲要产出!...

    都说大厂程序员出路多,既可以往其他大厂跳,也可以往中小厂跳.如果从大厂跳小厂非常香,不仅薪资待遇高,而且小厂也没有大厂那么忙和内卷,没准还能因为大厂背景成为管理层. 但一个从大厂跳到小厂的程序员却跳出 ...

  7. 某大厂程序员抱怨:“大厂镀金”是鬼话!从大厂裸辞后,面阿里、字节全都挂掉,连货拉拉都不要自己!...

    坊间传言:程序员可以先在大厂镀金,以后去中小厂毫无压力,基本不会被卡,事实果真如此吗? 近日,一个大厂程序员发帖抱怨:谁说的大厂镀金?信了你们的鬼话,从大厂裸辞两个多月,一个offer都没拿到.早知道 ...

  8. 深圳某女孩身家上千万,却称自己不配追求大厂程序员

    程序员收入高,给人的印象也比较老实稳重,算是比较理想的择偶对象.但一个小姐姐却发出感叹,称自己不敢去撩大厂程序员,觉得自己月薪几千不配,不想拖别人后腿,虽然家庭条件不错,但感觉还是有点自卑. 这个小姐 ...

  9. 做大厂程序员是一种怎样的体验?这四位“百度程序员”说出了自己的看法!

    程序员是个特殊"物种",在外界印象中,他们身穿格子衬衫.顶着秃头还对女朋友说些听不懂的技术黑话. "程序猿""攻城狮"等调侃话术随处可见,但 ...

最新文章

  1. 待飞日记(第四天和第五天)
  2. 项目四-用循环求(2)
  3. 看下巴识心情,这个AI项链挂胸前也能识别面部表情
  4. Linux内核的l2tp实现,Linux Kernel gdth实现内核内存破坏漏洞
  5. OpenGL 点光源的多遍阴影贴图
  6. 求凸函数极值 CSF迭代法(雾)
  7. 为什么我们应该担心华尔街的人工智能
  8. 《Web异步与实时交互——iframe AJAX WebSocket开发实战》—— 1.4 内容安排
  9. plsql 存储过程 批量提交_Spring Batch 批量处理策略
  10. Bootstrap 列平移/列偏移
  11. 织梦网站服务器配置,织梦本地服务器配置
  12. 关于SQL Server的日期时间数据类型
  13. a = a + 1, a++, ++a ,a+=1区别在哪
  14. servlet输出中文乱码
  15. qt使用QZxing生成二维码
  16. android按钮半透明
  17. Visual Basic快捷教程——Visual Basic 2017 破冰
  18. 巨人网络18年春招java答案_巨人网络18春招Java开发笔试题,希望对大家能有帮助...
  19. EXCEL如何隔三行设置背景色
  20. 2-二、安装CUDA

热门文章

  1. Windows系统下的CMD Route路由配置
  2. Linux: lo 网络接口
  3. 语音识别-关键词检测
  4. 施乐3030工程机驱动安装
  5. 大数据就业前景分析-好程序员
  6. 短视频app开发:如何实现视频直播功能
  7. 解决nodemon : 无法加载文件 C:\Users\admin\AppData\Roaming\npm\nodemon.ps1
  8. 帆软加密狗注册配置安装步骤
  9. 【无标题】JAVA解压ZIP文件并解析Excel(easyExcel)
  10. 常见算法的时间复杂度 Ο(1)<Ο(log2n)<Ο(n)<Ο(nlog2n)<Ο(n2)<Ο(n3)<…