Cool-Rpc

前言

此博客所述项目代码已在github开源,欢迎大家一起贡献!
点此进入:Cool-RPC

最近一次写博客还是17年底,谢谢大家持久以来的关注
本篇博文将会教大家如何从0到1,搭建一个简单、高效且拓展性强的rpc框架.

什么是RPC

相信大家都或多或少使用过RPC框架,比如阿里的Dubbo、谷歌的grpc、Facebook的Thrift等等

那么究竟什么是rpc?
rpc翻译成中文叫做远程过程调用,通俗易懂点:将单应用架构成分布式系统架构后,多个系统间数据怎么交互,这就是rpc的职责.

从服务的角度来看,rpc分为服务提供者(provider)和服务消费者(consumer)两大类,中间会有一些共用java接口,叫做开放api接口
也就是说,接口服务实现类所处的地方叫做provider,接口服务调用类所处的地方叫consumer

因为处于分布式环境中,那consumer调用provider时,如何知道对方服务器的IP和开放端口呢?
这时需要一个组件叫做注册中心,consumer通过服务名后,去注册中心上查找该服务的IP+Port,拿到地址数据后,再去请求该地址的服务

如图:

Cool-Rpc技术简介

此项目基于传输层(TCP/IP协议)进行通讯,传输层框架使用netty编写,github上会有mina版本
提供多套序列化框架,默认使用Protostuff序列化,可配置使用java序列化等
注册中心默认zookeeper,可配置使用redis(只要有节点数据存储和消息通知功能的组件即可)

consumer通过java动态代理的方式使用执行远程调用
将所要执行的类名,方法,参数等通知provider,之后provider拿着数据调用本地实现类,将处理后得到的结果通知给consumer

注册中心

废话了那么多,开始上干货,建议大家从github克隆完整代码,本篇博文只讲重点代码

注册中心以api接口名为key,IP+Port为value,将数据持久化,以供消费者查询调用

以zookeeper为例:

为了更灵活地实现服务注册者和发现者,这里添加一个注册中心适配器

public abstract class ServiceCenterAdapter implements ServiceCenter{String host;int port = 0;String passWord;ServiceCenterAdapter(){}ServiceCenterAdapter(String host){this.host = host;}ServiceCenterAdapter(String host, int port) {this.host = host;this.port = port;}@Overridepublic String discover(String serviceName) {return null;}@Overridepublic void register(String serviceName, String serviceAddress) {}@Overridepublic void setHost(String host){this.host = host;};@Overridepublic void setPort(int port){this.port = port;};@Overridepublic void setPassWord(String passWord){this.passWord = passWord;};//获取 IP:端口@Overridepublic String getAddress(){if ("".equals(host) || host == null || port == 0){throw new RuntimeException("the zookeeper host or port error");}return host+":"+String.valueOf(port);};
}

zookeeper的服务注册(provider使用):
在实际项目中,需要构造此类,并注入相应的IP和端口,最后以bean的形式注入到IOC容器中

public class ZooKeeperServiceRegistry extends ServiceCenterAdapter {private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceRegistry.class);private ZkClient zkClient;{this.port = 2181;zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT);log.info("connect zookeeper");}public ZooKeeperServiceRegistry(String zkHost) {super(zkHost);}public ZooKeeperServiceRegistry(String zkHost, int zkPort) {super(zkHost, zkPort);}// 注册服务 serviceName=接口名  serviceAddress=IP+Port@Overridepublic void register(String serviceName, String serviceAddress) {// create cool node permanentString registryPath = CoolConstant.ZK_REGISTRY_PATH;if (!zkClient.exists(registryPath)) {zkClient.createPersistent(registryPath);log.info("create registry node: {}", registryPath);}// create service node permanentString servicePath = registryPath + "/" + serviceName;if (!zkClient.exists(servicePath)) {zkClient.createPersistent(servicePath);log.info("create service node: {}", servicePath);}// create service address node tempString addressPath = servicePath + "/address-";String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress);log.info("create address node: {}", addressNode);}}

zookeeper的服务发现者(consumer使用):
同上,也需要配置相应的IP和端口,并以bean注入到项目ioc容器中

public class ZooKeeperServiceDiscovery extends ServiceCenterAdapter {private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class);{super.port = 2181;}public ZooKeeperServiceDiscovery(){};public ZooKeeperServiceDiscovery(String zkHost){super(zkHost);}public ZooKeeperServiceDiscovery(String zkHost, int zkPort){super(zkHost, zkPort);}// 服务发现    name=api接口名@Overridepublic String discover(String name) {ZkClient zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT);log.debug("connect zookeeper");try {String servicePath = CoolConstant.ZK_REGISTRY_PATH + "/" + name;if (!zkClient.exists(servicePath)) {throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath));}List<String> addressList = zkClient.getChildren(servicePath);if (addressList.size() == 0) {throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath));}String address;int size = addressList.size();if (size == 1) {address = addressList.get(0);log.debug("get only address node: {}", address);} else {address = addressList.get(ThreadLocalRandom.current().nextInt(size));log.debug("get random address node: {}", address);}String addressPath = servicePath + "/" + address;return zkClient.readData(addressPath);} finally {zkClient.close();}}}

服务端TCP处理器

此篇博文的TCP数据(包括编解码器、处理器)全部以netty编写

服务端的netty引导类:

public class CoolRpcServer implements ApplicationContextAware {private static Logger log = LoggerFactory.getLogger(CoolRpcServer.class);private Channel channel;private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private ServerBootstrap bootstrap;private HandlerInitializer handlerInitializer;private ServiceCenter serviceRegistry;private String serviceIP;private int port;public static Map<String, Object> servicesMap ;{bossGroup = new NioEventLoopGroup(1);workerGroup = new NioEventLoopGroup();bootstrap = new ServerBootstrap();handlerInitializer = new HandlerInitializer();servicesMap = new HashMap<>(16);}public CoolRpcServer(ServiceCenter serviceRegistry, String serviceIP, int port){this.serviceRegistry = serviceRegistry;this.serviceIP = serviceIP;this.port = port;}/*** start and init tcp server if ioc contain is booting*/@SuppressWarnings("unchecked")public void initServer() throws InterruptedException {bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(handlerInitializer);bootstrap.option(ChannelOption.SO_BACKLOG, 128);bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);// the most send bytes ( 256KB )bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 256);// the most receive bytes ( 2048KB )bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024 * 2);channel = bootstrap.bind(serviceIP,port).sync().channel();if (servicesMap != null && servicesMap.size() > 0){for (String beanName: servicesMap.keySet()){serviceRegistry.register(beanName, serviceIP + ":" + String.valueOf(port));log.info("register service name = {}", beanName);}}log.info("TCP server started successfully, port:{}", port);channel.closeFuture().sync();}/*** close ioc contain and stop tcp server*/public void stopServer(){if (channel != null && channel.isActive()) {channel.close();}if (bossGroup != null) {bossGroup.shutdownGracefully();}if (workerGroup != null) {workerGroup.shutdownGracefully();}log.info("TCP server stopped successfully, port: {}", port);}/***  scan Annotation of CoolService*/@Overridepublic void setApplicationContext(ApplicationContext ctx) throws BeansException {Map<String, Object> beans = ctx.getBeansWithAnnotation(CoolService.class);if (beans != null && beans.size()>0){for (Object bean : beans.values()){String name = bean.getClass().getAnnotation(CoolService.class).value().getName();servicesMap.put(name, bean);}}}}

此项目的开放api接口实现类需要用@CoolService注解标识,服务端容器启动时,会扫描所有带有此注解的实现类,并注入到注册中心

服务端处理器(netty handler):

@ChannelHandler.Sharable
public class CoolServerHandler extends ChannelInboundHandlerAdapter {private static Logger log = LoggerFactory.getLogger(CoolServerHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CoolResponse response = new CoolResponse();CoolRequest request = (CoolRequest) msg;try {Object result = invoke(request);response.setRequestID(request.getRequestID());response.setResult(result);} catch (Throwable error) {response.setError(error);}ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}private Object invoke(CoolRequest request) throws Throwable{if (request == null){throw new Throwable("cool rpc request not found");}String className = request.getClassName();String methodName = request.getMethodName();Object[] parameters = request.getParameters();Object service = CoolRpcServer.servicesMap.get(className);if (service == null){throw new Throwable("cool rpc service not exist");}Class<?> serviceClass = service.getClass();Class<?>[] parameterTypes = request.getParameterTypes();FastClass fastClass = FastClass.create(serviceClass);FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);return fastMethod.invoke(service, parameters);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("server caught exception", cause);ctx.close();}}

将客户端传输过来的请求数据(类名,方法,参数)在本地以cglib的方式反射调用
调用成功后,将处理完毕的结果编码返回给客户端,并且关闭TCP连接

客户端TCP处理器

consumer只有api接口,并没有其实现类,所以我们可以用java动态代理的方式去自定义方法实现,代理的方法实现便是建立TCP握手连接,有provider来执行方法,将得到的结果返回给代理类,由此造成一种单凭接口就能调用实现类方法的假象

第一步: 使用java动态代理new出代理对象

public class CoolProxy {private static Logger log = LoggerFactory.getLogger(CoolProxy.class);private ServiceCenter serviceDiscovery;public CoolProxy(ServiceCenter serviceDiscovery){this.serviceDiscovery = serviceDiscovery;}@SuppressWarnings("unchecked")public <T> T getInstance(Class<T> cls){return (T)Proxy.newProxyInstance(cls.getClassLoader(),new Class<?>[]{cls},(proxy, method, args) -> {CoolRequest request = new CoolRequest();request.setRequestID(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameters(args);request.setParameterTypes(method.getParameterTypes());String[] addr = serviceDiscovery.discover(cls.getName()).split(":",2);CoolRpcClient client = new CoolRpcClient(addr[0],Integer.parseInt(addr[1]));CoolResponse response = client.send(request);if (response.getError()!=null){throw response.getError();} else {return response.getResult();}});}}

第二步: 在代理方法中,使用远程过程调用(rpc)

客户端引导类:

public class CoolRpcClient {private static Logger log = LoggerFactory.getLogger(CoolRpcClient.class);private CountDownLatch countDownLatch;private EventLoopGroup group;private Bootstrap bootstrap;private CoolResponse response;private String serviceIP;private int port;{response = new CoolResponse();countDownLatch = new CountDownLatch(1);group = new NioEventLoopGroup();bootstrap = new Bootstrap();}public CoolRpcClient(String serviceIP, int port){this.serviceIP = serviceIP;this.port = port;}public CoolResponse send(CoolRequest request){try {bootstrap.group(group);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new CoolRpcDecoder(CoolResponse.class)).addLast(new CoolRpcEncoder(CoolRequest.class)).addLast(new CoolClientHandler(countDownLatch, response));}});bootstrap.option(ChannelOption.TCP_NODELAY, true);Channel channel = bootstrap.connect(serviceIP, port).sync().channel();channel.writeAndFlush(request).sync();countDownLatch.await();channel.closeFuture().sync();return response;} catch (Exception e){e.printStackTrace();return null;} finally {group.shutdownGracefully();}}}

客户端处理器(handler):

@ChannelHandler.Sharable
public class CoolClientHandler extends ChannelInboundHandlerAdapter {private static Logger log = LoggerFactory.getLogger(CoolClientHandler.class);private CountDownLatch latch;private CoolResponse response;public CoolClientHandler(CountDownLatch latch, CoolResponse response){this.latch = latch;this.response = response;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CoolResponse enResponse = (CoolResponse) msg;this.response.sync(enResponse);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {latch.countDown();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("api caught exception", cause);ctx.close();}}

最后使用CountDownLatch同步通知调用者,rpc调用完毕

结束语

以上便是Cool-Rpc的简单讲解,如有更好的想法请联系我
热烈欢迎大家一起维护此项目Cool-RPC

从0到1搭建RPC框架相关推荐

  1. 从头搭建rpc框架_#LearnByDIY-如何从头开始创建JavaScript单元测试框架

    从头搭建rpc框架 by Alcides Queiroz 通过Alcides Queiroz #LearnByDIY-如何从头开始创建JavaScript单元测试框架 (#LearnByDIY - H ...

  2. 平安夜福利,送3本《从0到1搭建自动化测试框架》

    VOL 338 24 2021-12 今天距2022年8天 这是ITester软件测试小栈第338次推文 点击上方蓝字"ITester软件测试小栈"关注我,每周一.三.五早上 09 ...

  3. RPC框架:从原理到选型,一文带你搞懂RPC

    大家好,我是华仔,RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理.对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型,下面是文章内容目录: RPC 什么 ...

  4. 搭建新浪RPC框架motan Demo

    motan是新浪微博开源的RPC框架,github官网是:https://github.com/weibocom/motan 今天就先搭建一个Hello world demo,本demo基于motan ...

  5. 从0开始搭建编程框架——主框架和源码

    一个良好的结构是"对修改关闭,对扩展开放"的.(转载请指明出于breaksoftware的csdn博客) 这个过程就像搭建积木.框架本身需要有足够的向内扩展能力以使自身有进化能力, ...

  6. Spark2.1.0之内置RPC框架

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/80799622 在Spark中很多地方都涉及网络通 ...

  7. Google高性能RPC框架gRPC 1.0.0发布

    鉴于gRPC已进入稳定版分支,并对应用于生产中准备就绪,Google发布了gRPC 1.0. gRPC源于被称为Stubby的Google内部项目,早期是用于一些Google内部服务间的通信.18个月 ...

  8. 自己动手从0开始实现一个分布式RPC框架

    简介: 如果一个程序员能清楚的了解RPC框架所具备的要素,掌握RPC框架中涉及的服务注册发现.负载均衡.序列化协议.RPC通信协议.Socket通信.异步调用.熔断降级等技术,可以全方位的提升基本素质 ...

  9. 从0开始搭建自动化测试框架之PO分层架构

    一.什么是PO模式 全称:page object model  简称:POM/PO PO模式最核心的思想是分层,实现松耦合!实现脚本重复使用,实现脚本易维护性! 主要分三层: 1.基础层BasePag ...

  10. 高并发架构系列:如何从0到1设计一个类Dubbo的RPC框架

    优知学院 2019-01-22 18:43:51 之前持续分享的几十期阿里Java面试题中,几乎每次必问Dubbo,比如:"如何从0到1设计一个Dubbo的RPC框架",其实主要考 ...

最新文章

  1. stm32 窗口看门狗学习(二)
  2. ext grid 重新布局_如何让你的 CSS Grid 布局有良好的可访问性
  3. angularJs-脏检查
  4. python一对一_Python - Django - ORM 一对一表结构
  5. python --- 使用socket创建tcp服务
  6. 选中内容_Excel – 选中的单元格自动显示在A1,报表演示数据再多也能看清
  7. 2014 UESTC Training for Data Structures D - 长使英雄泪满襟
  8. 学习OpenCV研究报告指出系列(二)源代码被编译并配有实例project
  9. 5、LiveCharts--简介(一)
  10. Pandas数据分析教程(2)-数据读取之普通索引、loc/iloc索引
  11. Pygame实战:用 Python 写个贪吃蛇大冒险,保姆级教程。
  12. 前端---antd中的日期选择组件
  13. selenium防爬无头浏览器和模拟手机浏览器
  14. wordpress 安全保密hacks
  15. c语言计算平时成绩30%和期末成绩,C语言程序设计C
  16. bootstrap使用及解析
  17. 【CentOS】make cc Command not found,make: *** [adlist.o] Error 127”
  18. 计算机原理及硬件,计算机原理及硬件介绍
  19. 学习自旋电子学的笔记01:微磁模拟软件OOMMF的教程(中文版)7-7.3.4章
  20. 【已解决】win10 “你不能访问此共享文件夹,因为你组织的安全策略阻止未经身份验证的来宾访问。

热门文章

  1. Quartz.net官方开发指南系列篇
  2. Observer模式(观察者设计模式)
  3. ARM汇编中值滤波实验
  4. WPF 获取程序路径的一些方法,根据程序路径获取程序集信息
  5. 编译VCL(android)错误
  6. jQuery AJAX实现调用页面后台方法
  7. Pytorch——对应点相乘+矩阵相乘
  8. 【ACwing 95】费解的开关——枚举 + 搜索
  9. Lua 函数参数 默认实参
  10. JDK5后的特性整理