基于bio手写实现简单的rpc

1.bio基础知识

Java BIO:传统的网络通讯模型,就是BIO,同步阻塞IO, 其实就是服务端创建一个ServerSocket, 然后就是客户端用一个Socket去连接服务端的那个ServerSocket, ServerSocket接收到了一个的连接请求就创建一个Socket和一个线程去跟那个Socket进行通讯。接着客户端和服务端就进行阻塞式的通信,客户端发送一个请求,服务端Socket进行处理后返回响应,在响应返回前,客户端那边就阻塞等待,什么事情也做不了。 这种方式的缺点, 每次一个客户端接入,都需要在服务端创建一个线程来服务这个客户端,这样大量客户端来的时候,就会造成服务端的线程数量可能达到了几千甚至几万,这样就可能会造成服务端过载过高,最后崩溃死掉。

为了改进这种一连接一线程的模型,我们可以使用线程池来管理这些线程,实现 1 个或 多个线程处理 N 个客户端的模型(但是底层还是使用的同步阻塞 I/O),通常被称为“伪异 步 I/O 模型“。

2.BIO服务端代码示例

1、服务器驱动一个 ServerSocket。使用Acceptor监听链接
2、客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每一个客户端建立一个线程进行通信。
3、客户端发出请求后,先咨询服务器时候否线程响应,如果没有则会等待,或者被拒绝。
4、如果有响应,客户端线程会等待请求结束后,再继续执行。
    private static ExecutorService executorService= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());public static void main(String[] args) throws IOException {ServerSocket serverSocket = new ServerSocket(9999);System.out.println("start server......");while (true){//开启监听Socket socket = serverSocket.accept();executorService.execute(() ->hander(socket));}}private static void hander(Socket socket) {//实例化与客户端通信的输入输出流try(ObjectInputStream inputStream =new ObjectInputStream(socket.getInputStream());ObjectOutputStream outputStream =new ObjectOutputStream(socket.getOutputStream())){//接收客户端的输出,也就是服务器的输入String userName = inputStream.readUTF();System.out.println("Accept client message:"+userName);//服务器的输出,也就是客户端的输入outputStream.writeUTF("Hello,"+userName);outputStream.flush();}catch(Exception e){e.printStackTrace();}finally {try {socket.close();} catch (IOException e) {e.printStackTrace();}}}

3.BIO 应用-RPC 框架

3.1 什么是 RPC?

RPC(Remote Procedure Call ——远程过程调用),它是一种通过网络从远程计算机程 序上请求服务,而不需要了解底层网络的技术。

一次完整的 RPC 同步调用流程:

1)服务消费方(client)以本地调用方式调用客户端存根;

2)什么叫客户端存根?就是远程方法在本地的模拟对象,一样的也有方法名,也有方 法参数,client stub 接收到调用后负责将方法名、方法的参数等包装,并将包装后的信息通 过网络发送到服务端;

3)服务端收到消息后,交给代理存根在服务器的部分后进行解码为实际的方法名和参 数

4) server stub 根据解码结果调用服务器上本地的实际服务;

5)本地服务执行并将结果返回给 server stub;

6)server stub 将返回结果打包成消息并发送至消费方;

7)client stub 接收到消息,并进行解码;

8)服务消费方得到最终结果。 RPC 框架的目标就是要中间步骤都封装起来,让我们进行远程方法调用的时候感觉到就 像在本地调用一样。

3.1 RPC设计及时序图


3.2 RPC需要解决的问题

3.2.1 代理问题

代理本质上是要解决什么问题?要解决的是被调用的服务本质上是远程的服务,但是调 用者不知道也不关心,调用者只要结果,具体的事情由代理的那个对象来负责这件事。既然 是远程代理,当然是要用代理模式了。目前很多的rpc也是基于代理模式实现的。 代理(Proxy)是一种设计模式,即通过代理对象访问目标对象.这样做的好处是:可以在目 标对象实现的基础上,增强额外的功能操作,即扩展目标对象的功能。那我们这里额外的功能 操作是干什么,通过网络访问远程服务。 jdk 的代理有两种实现方式:静态代理和动态代理。

3.2.2 序列化问题

序列化问题在计算机里具体是什么?我们的方法调用,有方法名,方法参数,这些可能 是字符串,可能是我们自己定义的 java 的类,但是在网络上传输或者保存在硬盘的时候, 网络或者硬盘并不认得什么字符串或者 javabean,它只认得二进制的 01 串,怎么办?要进 行序列化,网络传输后要进行实际调用,就要把二进制的 01 串变回我们实际的 java 的类, 这个叫反序列化。java 里已经为我们提供了相关的机制 Serializable。

3.2.3 通信问题

我们在用序列化把东西变成了可以在网络上传输的二进制的 01 串,但具体如何通过网 络传输?使用 JDK 为我们提供的 BIO。

3.2.3 登记的服务实例化

登记的服务有可能在我们的系统中就是一个名字,怎么变成实际执行的对象实例,当然 是使用反射机制。

设计和理论分析完成之后,开始编码

3.3 RPC注册中心代码

@Service
public class RegisterCenter {/*key表示服务名,value代表服务提供者地址的集合*/private static final Map<String,Set<RegisterServiceVo>> serviceHolder= new HashMap<>();/*注册服务的端口号*/private int port;/*服务注册,考虑到可能有多个提供者同时注册,进行加锁*/private static synchronized void registerService(String serviceName,String host,int port){//获得当前服务的已有地址集合Set<RegisterServiceVo> serviceVoSet = serviceHolder.get(serviceName);if(serviceVoSet==null){//已有地址集合为空,新增集合serviceVoSet = new HashSet<>();serviceHolder.put(serviceName,serviceVoSet);}//将新的服务提供者加入集合serviceVoSet.add(new RegisterServiceVo(host,port));System.out.println("服务已注册["+serviceName+"]," +"地址["+host+"],端口["+port+"]");}/*取出服务提供者*/private static Set<RegisterServiceVo> getService(String serviceName){return serviceHolder.get(serviceName);}/*处理服务请求的任务,其实无非就是两种服务:1、服务注册服务2、服务查询服务*/private static class ServerTask implements Runnable{private Socket client = null;public ServerTask(Socket client){this.client = client;}public void run() {try(ObjectInputStream inputStream =new ObjectInputStream(client.getInputStream());ObjectOutputStream outputStream =new ObjectOutputStream(client.getOutputStream())){/*检查当前请求是注册服务还是获得服务*/boolean isGetService = inputStream.readBoolean();/*服务查询服务,获得服务提供者*/if(isGetService){String serviceName = inputStream.readUTF();/*取出服务提供者集合*/Set<RegisterServiceVo> result = getService(serviceName);/*返回给客户端*/outputStream.writeObject(result);outputStream.flush();System.out.println("将已注册的服务["+serviceName+"提供给客户端");}/*服务注册服务*/else{/*取得新服务提供方的ip和端口*/String serviceName = inputStream.readUTF();String host = inputStream.readUTF();int port = inputStream.readInt();/*在注册中心保存*/registerService(serviceName,host,port);outputStream.writeBoolean(true);outputStream.flush();}}catch(Exception e){e.printStackTrace();}finally {try {client.close();} catch (IOException e) {e.printStackTrace();}}}}/*启动注册服务*/public void startService() throws IOException {ServerSocket serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(port));System.out.println("服务注册中心 on:"+port+":运行");try{while(true){new Thread(new ServerTask(serverSocket.accept())).start();}}finally {serverSocket.close();}}@PostConstructpublic void init() {this.port = 9999;new Thread(new Runnable() {public void run() {try{startService();}catch(IOException e){e.printStackTrace();}}}).start();}
}

3.4 RPC服务端核心代码

3.4.1 RPC服务端向注册中心注册本地服务代码

/*** 类说明:注册服务*/
@Service
public class RegisterServiceWithRegCenter {/*本地可提供服务的一个名单,用缓存实现*/private static final Map<String,Class> serviceCache= new ConcurrentHashMap<>();/*往远程注册服务器注册本服务*/public void regRemote(String serviceName, String host, int port, Class impl)throws Throwable{//登记到注册中心Socket socket = null;ObjectOutputStream output = null;ObjectInputStream input = null;try{socket = new Socket();socket.connect(new InetSocketAddress("127.0.0.1",9999));output = new ObjectOutputStream(socket.getOutputStream());//注册服务output.writeBoolean(false);//提供的服务名output.writeUTF(serviceName);//服务提供方的IPoutput.writeUTF(host);//服务提供方的端口output.writeInt(port);output.flush();input = new ObjectInputStream(socket.getInputStream());if(input.readBoolean()){System.out.println("服务["+serviceName+"]注册成功!");}//可提供服务放入缓存serviceCache.put(serviceName,impl);} catch (IOException e) {e.printStackTrace();}  finally{if (socket!=null) socket.close();if (output!=null) output.close();if (input!=null) input.close();}}public Class getLocalService(String serviceName) {return serviceCache.get(serviceName);}}

3.4.2 RPC框架的服务端接收请求核心处理代码

/****类说明:rpc框架的服务端部分*/
@Service
public class RpcServerFrame {@Autowiredprivate RegisterServiceWithRegCenter registerServiceWithRegCenter;//服务的端口号private int port;//处理服务请求任务private static class ServerTask implements Runnable{private Socket client;private RegisterServiceWithRegCenter registerServiceWithRegCenter;public ServerTask(Socket client,RegisterServiceWithRegCenter registerServiceWithRegCenter){this.client = client;this.registerServiceWithRegCenter = registerServiceWithRegCenter;}public void run() {try(ObjectInputStream inputStream =new ObjectInputStream(client.getInputStream());ObjectOutputStream outputStream =new ObjectOutputStream(client.getOutputStream())){//方法所在类名接口名String serviceName = inputStream.readUTF();//方法的名字String methodName = inputStream.readUTF();//方法的入参类型Class<?>[] parmTypes = (Class<?>[]) inputStream.readObject();//方法入参的值Object[] args = (Object[]) inputStream.readObject();//从容器中拿到服务的Class对象Class serviceClass = registerServiceWithRegCenter.getLocalService(serviceName);if (serviceClass == null){throw new ClassNotFoundException(serviceName+" Not Found");}//通过反射,执行实际的服务Method method = serviceClass.getMethod(methodName,parmTypes);Object result = method.invoke(serviceClass.newInstance(),args);//将服务的执行结果通知调用者outputStream.writeObject(result);outputStream.flush();}catch(Exception e){e.printStackTrace();}finally {try {client.close();} catch (IOException e) {e.printStackTrace();}}}}public void startService(String serviceName, String host, int port, Class impl) throws Throwable{ServerSocket serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(port));System.out.println("RPC server on:"+port+":运行");registerServiceWithRegCenter.regRemote(serviceName,host,port,impl);try{while(true){new Thread(new ServerTask(serverSocket.accept(),registerServiceWithRegCenter)).start();}}finally {serverSocket.close();}}@PostConstructpublic void server() throws Throwable {Random r = new Random();int port = r.nextInt(100)+7778;this.startService(StockService.class.getName(),"127.0.0.1",port, StockServiceImpl.class);}
}

3.5 RPC客户端核心代码

3.5.1 客户端启动时候获取服务端接口存根代理对象

@Configuration
public class BeanConfig {@Autowiredprivate RpcClientFrame rpcClientFrame;@Beanpublic SendSms getSmsService() throws Exception{return rpcClientFrame.getRemoteProxyObject(SendSms.class);}
}

3.5.2 rpc框架的客户端代理部分

/***类说明:rpc框架的客户端代理部分*/
@Service
public class RpcClientFrame {/*远程服务的代理对象,参数为客户端要调用的的服务*/public static<T> T getRemoteProxyObject(final Class<?> serviceInterface) throws Exception {/*获得远程服务的一个网络地址*/InetSocketAddress addr =getService(serviceInterface.getName());/*拿到一个代理对象,由这个代理对象通过网络进行实际的服务调用*/return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),new Class<?>[]{serviceInterface},new DynProxy(serviceInterface,addr));}/*动态代理,实现对远程服务的访问*/private static class DynProxy implements InvocationHandler{private Class<?> serviceInterface;private InetSocketAddress addr;public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) {this.serviceInterface = serviceInterface;this.addr = addr;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args)throws Throwable {Socket socket = null;ObjectInputStream inputStream = null;ObjectOutputStream outputStream = null;try {socket = new Socket();socket.connect(addr);outputStream = new ObjectOutputStream(socket.getOutputStream());outputStream.writeUTF(serviceInterface.getName());outputStream.writeUTF(method.getName());outputStream.writeObject(method.getParameterTypes());outputStream.writeObject(args);outputStream.flush();inputStream = new ObjectInputStream(socket.getInputStream());System.out.println(serviceInterface+" remote exec success!");return inputStream.readObject();}finally {inputStream.close();outputStream.close();socket.close();}}}/*----------------以下和动态获得服务提供者有关------------------------------*/private static Random r = new Random();/*获得远程服务的地址*/private static InetSocketAddress getService(String serviceName)throws Exception {//获得服务提供者的地址列表List<InetSocketAddress> serviceVoList = getServiceList(serviceName);InetSocketAddress addr= serviceVoList.get(r.nextInt(serviceVoList.size()));System.out.println("本次选择了服务器:"+addr);return addr;}/*获得服务提供者的地址*/private static List<InetSocketAddress> getServiceList(String serviceName)throws Exception {Socket socket = null;ObjectOutputStream output = null;ObjectInputStream input = null;try{socket = new Socket();socket.connect(new InetSocketAddress("127.0.0.1",9999));output = new ObjectOutputStream(socket.getOutputStream());//需要获得服务提供者output.writeBoolean(true);//告诉注册中心服务名output.writeUTF(serviceName);output.flush();input = new ObjectInputStream(socket.getInputStream());Set<RegisterServiceVo> result= (Set<RegisterServiceVo>)input.readObject();List<InetSocketAddress> services = new ArrayList<>();for(RegisterServiceVo serviceVo : result){String host = serviceVo.getHost();//获得服务提供者的IPint port = serviceVo.getPort();//获得服务提供者的端口号InetSocketAddress serviceAddr = new InetSocketAddress(host,port);services.add(serviceAddr);}System.out.println("获得服务["+serviceName+"]提供者的地址列表["+services+"],准备调用.");return services;}finally{if (socket!=null) socket.close();if (output!=null) output.close();if (input!=null) input.close();}}}

4. 思考

目前简单rpc存在很多问题如:
1、性能欠缺,表现在网络通信机制,序列化机制等等

2、负载均衡、容灾和集群功能很弱

3、服务的注册和发现机制也很差劲

后续如果有时间将会基于NIO的netty实现一个更好用的rpc

基于bio手写实现简单的rpc相关推荐

  1. 如何手写一个简单的RPC框架

    http://www.czhenblog.cn/article?articleId=29

  2. 手写一个简单的IOC容器

    手写一个简单的IOC容器 原文 http://localhost:4000/2020/02/25/SSM/spring/%E6%89%8B%E5%86%99%E4%B8%80%E4%B8%AA%E5% ...

  3. 机器学习算法(九): 基于线性判别LDA模型的分类(基于LDA手写数字分类实践)

    机器学习算法(九): 基于线性判别模型的分类 1.前言:LDA算法简介和应用 1.1.算法简介 线性判别模型(LDA)在模式识别领域(比如人脸识别等图形图像识别领域)中有非常广泛的应用.LDA是一种监 ...

  4. Python+OpenCV:基于SVM手写数据OCR(OCR of Hand-written Data using SVM)

    Python+OpenCV:基于SVM手写数据OCR(OCR of Hand-written Data using SVM) dsize = 20 affine_flags = lmc_cv.WARP ...

  5. Python+OpenCV:基于KNN手写数据OCR(OCR of Hand-written Data using kNN)

    Python+OpenCV:基于KNN手写数据OCR(OCR of Hand-written Data using kNN) OCR of Hand-written Digits ########## ...

  6. 基于JavaScript+css写一个简单的h5动态下雨效果

    基于JavaScript+css写一个简单的h5动态下雨效果 文章目录 什么是前端 展示效果 JavaScript是什么? 步骤 1.html 2.css 3.js 什么是前端 前端它是一个工作,它的 ...

  7. CV之IC之SpatialTransformer:基于ClutteredMNIST手写数字图片数据集分别利用CNN_Init、ST_CNN算法(CNN+ST)实现多分类预测案例训练过程记录

    CV之IC之SpatialTransformer:基于ClutteredMNIST手写数字图片数据集分别利用CNN_Init.ST_CNN算法(CNN+ST)实现多分类预测案例训练过程记录 目录 基于 ...

  8. 手写实现简单栈(练习题)

    一.手写实现简单栈 push(x) -- 将元素 x 推入栈中. pop() -- 删除栈顶的元素. top() -- 获取栈顶元素. getMin() -- 检索栈中的最小元素.示例:输入: [&q ...

  9. 基于vue手写一个分屏器,通过鼠标控制屏幕宽度。

    基于vue手写一个分屏器,通过鼠标控制屏幕宽度. 先来看看实现效果: QQ录屏20220403095856 下面是实现代码: <template><section class=&qu ...

最新文章

  1. SAP WMSD集成之Copy WM Quantity – Copy WM qty as delivery qty into delivery
  2. 【C++/C】【学习笔记】二分算法——处理“最小却最大”问题
  3. android studio 2.1 ndk,Android studio 2.1编辑器(CLint)无法找到使用原生(ndk)插件的模块的标题...
  4. 1.编译cartographer ROS
  5. Wormholes--POJ 3259
  6. leetcode 927. Three Equal Parts | 927. 三等分(Java)
  7. 华为荣耀30pro鸿蒙内测版,荣耀手机用户放心了 消息称荣耀30 Pro正在内测华为鸿蒙OS...
  8. 【牛客 - 368D】动态连通块(并查集+bitset优化)
  9. 经典面试题(12):关于事件循环,以下代码将输出什么?
  10. 计算机绘图实训任务书,autocad模块化实训任务书-2011.11
  11. .describe() python_Stataamp;Python | 分别实现多元线性回归
  12. Atitit 研发管理之道 attilax总结 艾龙 著 研发管理 1 简介 1 基本理念 2 基本原则 2 内容 3 团队建设 4 流程设计 4 成本管理 4 项目管理 4 绩效管理 4 风险管理
  13. 屏幕共享技术及相关软件使用测评
  14. python扫描局域网ip_Python实现扫描局域网活动ip
  15. android手机文件管理器,4 款 Android 文件管理器,总有一款适合你
  16. 配置mt7620a上的双SSID
  17. 沧海桑田:Linux 系统启动的演进模型与发展变化
  18. 绝对干货!初学者也能看懂的DPDK解析
  19. Windows11设置登录密码
  20. python strftime函数_PyThon中time strftime()函数用法

热门文章

  1. WINS服务器和DNS服务器有什么区别?
  2. POj-1091 跳蚤
  3. 《淘宝交付之道》出版!大淘宝技术36个月匠心之作
  4. windows 环境下在anaconda 3中安装python2和python3两个环境(python2和python3共存)
  5. 嵌入式Linux LED小灯点亮实验
  6. ContextCapture导入点云进行重建
  7. 采用计算机发布调度命令时 必须严格遵守,铁路行车规章复习题
  8. css vertical-align属性详解
  9. IOT2050 更改Debian源为中科大源
  10. Android简易音乐重构MVVM Java版-新增推荐、雷达歌单详情列表界面(十八)