01 RPC 概述

下面的这张图,大概很多小伙伴都见到过,这是 Dubbo 官网中的一张图描述了项目架构的演进过程。

它描述了每一种架构需要的具体配置和组织形态。当网站流量很小时,只需一个应用,将所有功能都部署在一起, 以减少部署节点和成本,我们通常会采用单一应用架构。之后出现了 ORM 框架,主要用于简化增删改查工作流的,数 据访问框架 ORM 是关键。

随着用户量增加,当访问量逐渐增大,单一应用增加机器,带来的加速度越来越小 ,我们需要将应用拆分成互不 干扰的几个应用,以提升效率,于是就出现了垂直应用架构。MVC 架构就是一种非常经典的用于加速前端页面开发的 架构。

当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服逐渐形成稳定的服务中心, 使前端应用能更快速的响应,多变的市场需求,就出现了分布式服务架构。分布式架构下服务数量逐渐增加,为了提 高管理效率,RPC 框架应运而生。RPC 用于提高业务复用及整合的,分布式服务框架下 RPC 是关键。

下一代框架,将会是流动计算架构占据主流。当服务越来越多,容量的评估,小服务的资源浪费等问题,逐渐明 显。此时,需要增加一个调度中心 ,基于访问压力实时管理集群容量,提高集群利用率。SOA 架构就是用于提高及其 利用率的,资源调度和治理中心 SOA 是关键。

Netty 基本上是作为架构的技术底层而存在的,主要完成高性能的网络通信。

02 环境预设

第一步:我们先将项目环境搭建起来,创建 pom.xml 配置文件如下:

 <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency>

第二步:创建项目结构。

在没有 RPC 框架以前,我们的服务调用是这样的,如下图:

从上图可以看出接口的调用完全没有规律可循,想怎么调,就怎么调。这导致业务发展到一定阶段之后,对接口 的维护变得非常困难。于是有人提出了服务治理的概念。所有服务间不允许直接调用,而是先到注册中心进行登记, 再由注册中心统一协调和管理所有服务的状态并对外发布,调用者只需要记住服务名称,去找注册中心获取服务即可。这样,极大地规范了服务的管理,可以提高了所有服务端可控性。整个设计思想其实在我们生活中也能找到活生生的 案例。例如:我们平时工作交流,大多都是用 IM 工具,而不是面对面吼。大家只需要相互记住运营商(也就是注册中 心)提供的号码(如:腾讯 QQ)即可。再比如:我们打电话,所有电话号码有运营商分配。我们需要和某一个人通 话时,只需要拨通对方的号码,运营商(注册中心,如中国移动、中国联通、中国电信)就会帮我们将信号转接过去。

目前流行的 RPC 服务治理框架主要有 Dubbo 和 Spring Cloud,下面我以比较经典的 Dubbo 为例。Dubbo 核 心模块主要有四个:Registry 注册中心、Provider 服务端、Consumer 消费端、Monitor 监控中心,如下所示:

注册中心 (Registry)
消费端 (Consumer)
服务端 (Provider)
监控中心 (Monitor)

为了方便,我们将所有模块全部放到一个项目中,主要模块包括:

api:主要用来定义对外开放的功能与服务接口。
protocol:主要定义自定义传输协议的内容。
registry:主要负责保存所有可用的服务名称和服务地址。
provider:实现对外提供的所有服务的具体功能。
consumer:客户端调用。
monitor:完成调用链监控。

下面,我们先把项目结构搭建好,具体的项目结构截图如下:

03 代码实战

3.1 创建 API 模块

首先创建 API 模块,provider 和 consumer 都遵循 API 模块的规范。为了简化,创建两个 Service 接口,分别是:

package com.xinfan.netty.rpc.api;/*** IRpcHelloService** @author Lss* @date 2020/2/18 22:50* @Version 1.0*/
public interface IRpcHelloService {String hello(String name);
}

创建 IRpcService 接口,完成模拟业务加、减、乘、除运算,具体代码如下:

package com.xinfan.netty.rpc.api;/*** IRpcService** @author Lss* @date 2020/2/18 22:51* @Version 1.0*/
public interface IRpcService {/** 加 */public int add(int a,int b);/** 减 */public int sub(int a,int b);/** 乘 */public int mult(int a,int b);/** 除 */public int div(int a,int b);
}

至此,API 模块就定义完成了,非常简单。接下来,我们要确定传输规则,也就是传输协议,协议内容当然要自定义, 才能体现出 Netty 的优势。

3.2 创建自定义协议

Netty 中内置的 HTTP 协议,需要 HTTP 的编、解码器来完成解析。我们来 看自定义协议如何设定?

在 Netty 中要完成一个自定义协议,其实非常简单,只需要定义一个普通的 Java 类即可。我们现在手写 RPC 主要 是完成对 Java 代码的远程调用(类似于 RMI,大家应该都很熟悉了),远程调用 Java 代码哪些内容是必须由网络来 传输的呢?譬如,服务名称?需要调用该服务的哪个方法?方法的实参是什么?这些信息都需要通过客户端传送到服 务端去。

下面我们来看具体的代码实现,定义 InvokerProtocol 类:

import lombok.Data;import java.io.Serializable;
/*** InvokerProtocol** @author Lss* @date 2020/2/18 22:54* @Version 1.0*/
@Data
public class InvokerProtocol implements Serializable {//类名private String className;//函数名称(方法名)private String methodName;//参数类型private Class<?>[] parames;//参数列表private Object[] values;
}

从上面的代码看出来,协议中主要包含的信息有类名、函数名、形参列表和实参列表,通过这些信息就可以定位到一 个具体的业务逻辑实现。

3.3 实现 Provider 服务端业务逻辑

我们将 API 中定义的所有功能在 provider 模块中实现,分别创建两个实现类:

RpcHelloServiceImpl 类:

package com.xinfan.netty.rpc.provider;import com.xinfan.netty.rpc.api.IRpcHelloService;/*** RpcHelloServiceImpl** @author Lss* @date 2020/2/18 23:00* @Version 1.0*/
public class RpcHelloServiceImpl implements IRpcHelloService{@Overridepublic String hello(String name) {return "Hello " + name + "!";}
}

RpcServiceImpl 类:

package com.xinfan.netty.rpc.provider;import com.xinfan.netty.rpc.api.IRpcService;/*** RpcServiceImpl** @author Lss* @date 2020/2/18 23:00* @Version 1.0*/
public class RpcServiceImpl implements IRpcService {@Overridepublic int add(int a, int b) {return a + b;}@Overridepublic int sub(int a, int b) {return a - b;}@Overridepublic int mult(int a, int b) {return a * b;}@Overridepublic int div(int a, int b) {return a / b;}
}

3.4 完成 Registry 服务注册

Registry 注册中心主要功能就是负责将所有 Provider 的服务名称和服务引用地址注册到一个容器中,并对外发布。 Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口。先启动一个 Netty 服务,创建 RpcRegistry 类,具体代码如下:

package com.xinfan.netty.rpc.registry;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;/*** RpcRegistry** @author Lss* @date 2020/2/18 23:27* @Version 1.0*/
public class RpcRegistry {private int port;public RpcRegistry(int port){this.port = port;}public void start(){EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//自定义协议解码器/** 入参有5个,分别解释如下maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)lengthAdjustment:要添加到长度字段值的补偿值initialBytesToStrip:从解码帧中去除的第一个字节数*/pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//自定义协议编码器pipeline.addLast(new LengthFieldPrepender(4));//对象参数类型编码器pipeline.addLast("encoder",new ObjectEncoder());//对象参数类型解码器pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));                            pipeline.addLast(new RegistryHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = b.bind(port).sync();System.out.println("GP RPC Registry start listen at " + port );future.channel().closeFuture().sync();} catch (Exception e) {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new RpcRegistry(8080).start();}
}

在 RegistryHandler 中实现注册的具体逻辑,上面的代码,主要实现服务注册和服务调用的功能。因为所有模块创 建在同一个项目中,为了简化,服务端没有采用远程调用,而是直接扫描本地 Class,然后利用反射调用。代码实现如 下:

package com.xinfan.netty.rpc.registry;import com.xinfan.netty.rpc.protocol.InvokerProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;import java.io.File;
import java.io.FileInputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;/*** RegistryHandler** @author Lss* @date 2020/2/18 23:58* @Version 1.0*/
public class RegistryHandler extends ChannelInboundHandlerAdapter {//用保存所有可用的服务public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<String,Object>();//保存所有相关的服务类private List<String> classNames = new ArrayList<String>();public RegistryHandler(){//完成递归扫描scannerClass("com.xinfan.netty.rpc.provider");doRegister();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Object result = new Object();InvokerProtocol request = (InvokerProtocol)msg;//当客户端建立连接时,需要从自定义协议中获取信息,拿到具体的服务和实参//使用反射调用if(registryMap.containsKey(request.getClassName())){Object clazz = registryMap.get(request.getClassName());Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());result = method.invoke(clazz, request.getValues());}ctx.write(result);ctx.flush();ctx.close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}/** 递归扫描*/private void scannerClass(String packageName){URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));File dir = new File(url.getFile());for (File file : dir.listFiles()) {//如果是一个文件夹,继续递归if(file.isDirectory()){scannerClass(packageName + "." + file.getName());}else{classNames.add(packageName + "." + file.getName().replace(".class", "").trim());}}}/*** 完成注册*/private void doRegister(){if(classNames.size() == 0){ return; }for (String className : classNames) {try {Class<?> clazz = Class.forName(className);Class<?> i = clazz.getInterfaces()[0];registryMap.put(i.getName(), clazz.newInstance());} catch (Exception e) {e.printStackTrace();}}}
}

至此,注册中心的基本功能就已完成,下面来看客户端的代码实现。

3.5 实现 Consumer 远程调用

梳理一下基本的实现思路,主要完成一个这样的功能:API 模块中的接口功能在服务端实现(并没有在客户端实现)。 因此,客户端调用 API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这 个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终 调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一 样。具体调用过程如下图所示:

下面来看代码实现,创建 RpcProxy 类:

package com.xinfan.netty.rpc.consumer.proxy;import com.xinfan.netty.rpc.protocol.InvokerProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;/*** RpcProxy** @author Lss* @date 2020/2/19 12:10* @Version 1.0*/
public class RpcProxy  {public static <T> T create(Class<?> clazz){//clazz传进来本身就是interfaceMethodProxy proxy = new MethodProxy(clazz);Class<?> [] interfaces = clazz.isInterface() ?new Class[]{clazz} :clazz.getInterfaces();T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);return result;}private static class MethodProxy implements InvocationHandler {private Class<?> clazz;public MethodProxy(Class<?> clazz){this.clazz = clazz;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args)  throws Throwable {//如果传进来是一个已实现的具体类(本次演示略过此逻辑)if (Object.class.equals(method.getDeclaringClass())) {try {return method.invoke(this, args);} catch (Throwable t) {t.printStackTrace();}//如果传进来的是一个接口(核心)} else {return rpcInvoke(proxy,method, args);}return null;}/*** 实现接口的核心方法* @param method* @param args* @return*/public Object rpcInvoke(Object proxy,Method method,Object[] args){//传输协议封装InvokerProtocol msg = new InvokerProtocol();msg.setClassName(this.clazz.getName());msg.setMethodName(method.getName());msg.setValues(args);msg.setParames(method.getParameterTypes());final RpcProxyHandler consumerHandler = new RpcProxyHandler();EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//自定义协议解码器(服务端的代码可以复制过来)/** 入参有5个,分别解释如下maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8)lengthAdjustment:要添加到长度字段值的补偿值initialBytesToStrip:从解码帧中去除的第一个字节数*/pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//自定义协议编码器pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));//对象参数类型编码器pipeline.addLast("encoder", new ObjectEncoder());//对象参数类型解码器pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));pipeline.addLast("handler",consumerHandler);}});ChannelFuture future = b.connect("localhost", 8080).sync();future.channel().writeAndFlush(msg).sync();future.channel().closeFuture().sync();} catch(Exception e){e.printStackTrace();}finally {group.shutdownGracefully();}return consumerHandler.getResponse();}}
}

接收网络调用的返回值 RpcProxyHandler 类:

package com.xinfan.netty.rpc.consumer.proxy;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;/*** RpcProxyHandler** @author Lss* @date 2020/2/19 12:36* @Version 1.0*/
public class RpcProxyHandler extends ChannelInboundHandlerAdapter{private Object response;public Object getResponse() {return response;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {response=msg;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("client exception is general");}
}

完成客户端调用代码 RpcConsumer类:

package com.xinfan.netty.rpc.consumer;import com.xinfan.netty.rpc.api.IRpcHelloService;
import com.xinfan.netty.rpc.api.IRpcService;
import com.xinfan.netty.rpc.consumer.proxy.RpcProxy;/*** RpcConsumer** @author Lss* @date 2020/2/19 12:55* @Version 1.0*/
public class RpcConsumer {public static void main(String[] args) {IRpcHelloService rpcHello= RpcProxy.create(IRpcHelloService.class);System.out.println(rpcHello.hello("四川情场浪子"));IRpcService service=RpcProxy.create(IRpcService.class);System.out.println("8 + 2 = " + service.add(8, 2));System.out.println("8 - 2 = " + service.sub(8, 2));System.out.println("8 * 2 = " + service.mult(8, 2));System.out.println("8 / 2 = " + service.div(8, 2));//Dubbo 中的 Monitor 是用 Spring 的 AOP 埋点来实现的,我没有引入 Spring 框架}
}

3.6 Monitor 监控

Dubbo 中的 Monitor 是用 Spring 的 AOP 埋点来实现的,我没有引入 Spring 框架,在本代码中不实现监控的功 能。感兴趣的小伙伴,可以回顾之前 Spring AOP 的课程自行完善此功能。

04 运行效果演示

第一步,启动注册中心,运行结果如下:

第二步,运行客户端,调用结果如下:

通过以上案例演示,相信小伙伴们对 Netty 的应用已经有了一个比较深刻的印象,本次只是对 RPC 的基本实现原理做了一个简单的实现,感兴趣的小伙伴可以在本项 目的基础上继续完善 RPC 的其他细节。欢迎留言

基于 Netty 重构 RPC 框架相关推荐

  1. Java编写基于netty的RPC框架

    一 简单概念RPC: ( Remote Procedure Call),远程调用过程,是通过网络调用远程计算机的进程中某个方法,从而获取到想要的数据,过程如同调用本地的方法一样.阻塞IO :当阻塞I/ ...

  2. 手动实现一个基于netty的RPC框架(模拟dubble)

    轻量级RPC框架开发 内容安排: 1.掌握RPC原理 2.掌握nio操作 3.掌握netty简单的api 4.掌握自定义RPC框架 RPC原理学习 什么是RPC RPC(Remote Procedur ...

  3. 基于Netty的RPC框架

    概述 RPC(Remote Procedure Call),远程过程调用,是一个计算机通信协议,该协议允许运行一个进程调用另一个进程,而程序员无需额外为这个交互作用编程. 两个或者多个应用程序分布在不 ...

  4. Netty和RPC框架线程模型分析

    <Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty和RPC框架线程模型.李林锋已在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同学可 ...

  5. Netty 和 RPC 框架线程模型分析

    https://www.infoq.cn/article/9Ib3hbKSgQaALj02-90y 1. 背景 1.1 线程模型的重要性 对于 RPC 框架而言,影响其性能指标的主要有三个要素: I/ ...

  6. 基于Netty的RPC简易实现

    代码地址如下: http://www.demodashi.com/demo/13448.html 可以给你提供思路 也可以让你学到Netty相关的知识 当然,这只是一种实现方式 需求 看下图,其实这个 ...

  7. 基于Netty的RPC架构实战演练

    基于Netty的RPC架构实战演练 NIO netty服务端 netty客户端 netty线程模型源码分析(一) netty线程模型源码分析(二) netty5案例学习 netty学习之心跳 prot ...

  8. Courier:Dropbox 基于gRPC 的 RPC 框架开发过程

    Dropbox运行着数百个用不同语言编写的服务,每秒交换数百万次请求.Courier是我们面向服务的架构的核心,这是一个基于gRPC的远程过程调用(RPC)框架.在开发Courier时,我们学习了很多 ...

  9. 晁岳攀---基于go的 rpc框架实践

    晁岳攀:软件开发的老兵,Scala集合技术手册(简/繁版)的作者, 高性能的服务治理rpcx (Go)框架的开发者,先前在同方.Motorola.comcast从事软件开发工作,现在在微博平台研发部做 ...

  10. 基于RDMA的RPC框架(一) 环境搭建

    文章目录 一.简介 二.Ubuntu 14安装SoftRoce 2.1.编译内核 2.2.安装用户库 三.Ubuntu 18安装Soft Roce 引用 一.简介 二.Ubuntu 14安装SoftR ...

最新文章

  1. HDU1114 Piggy-Bank 【全然背包】
  2. Scala模式匹配:对象匹配
  3. 2018 ACM-ICPC亚洲区域赛 北京赛区
  4. [转]如何写出让同事无法维护的代码?
  5. Linq无聊练习系列7----Insert,delete,update,attach操作练习
  6. SqlDependency不起作用
  7. Java核心编程总结(九、File文件类),王道训练营Java百度云盘
  8. 在北理珠,如何快速被动了解(社工)一个学生
  9. linux 分区格式化类型,Linux分区格式化
  10. 微信公众服务号如何快速申请注册并认证开通支付功能
  11. 我(阿朱)再说两句新零售
  12. 基于Spring Boot的宠物猫店管理系统的设计与实现毕业设计源码140909
  13. 解决连接深信服vp无法通过burpsuit抓包的问题
  14. linux修改文件图标,Gnome怎么修改应用图标icon
  15. 流行的ORM框架简介
  16. ldpc译码讲解_LDPC码编译码原理及应用
  17. 悼念图灵奖得主、ML语言之父Robin Milner
  18. 会跳舞的钢珠力学分析
  19. 华为30岁了,73岁的任正非管理哲学是怎样迭代的
  20. 推荐3款在线编辑器(IDE)

热门文章

  1. 报错“The C compiler identification is unknown……”解决办法
  2. Linu笔记-管线命令pipe
  3. 用计算机采集光栅尺的数据,基于PLC的光栅尺数据采集系统及方法与流程
  4. 愚你相遇,好幸运:遇见你,遇见了最好的自己
  5. pvs-stdio ue4_PlatformIO中的PVS-Studio集成
  6. C#给图片加水印文字或图片
  7. Creo 9.0 基准特征:基准平面
  8. 计算机硬盘从盘的设置,图文解说:电脑硬盘的主从盘设置方法_清风一笑
  9. 基于python毕业设计毕设课题选题参考
  10. CSU 2166: 卖萌表情(2018湖南省赛)