基于 SpringBoot 手写 RPC 框架
目录
- GitHub地址
- 文件目录
- Message
- consumer
- @EnableRpcConsumer
- @Reference
- ReferenceInvokeProxy
- RpcHandler
- provider
- @EnableRpcProvider
- @Service
- BeanMethod
- InitialMediator
- Mediator
- ServiceHandler
- SocketServerInitial
- 示例
- consumer
- 启动类
- 配置文件
- TestController
- provider
- 启动类
- 配置文件
- ExampleServiceImpl
- 结果
GitHub地址
https://github.com/Asperger12345/shenrpc-spring-boot-starter
文件目录
Message
传递的消息载体类
package com.shen.api;import lombok.Builder;
import lombok.Data;import java.io.Serializable;@Data
@Builder
public class Message implements Serializable {//服务接口名private String className;//方法名private String methodName;//参数private Object[] args;//接口类型private Class[] types;
}
consumer
服务消费方
@EnableRpcConsumer
打在SpringBoot启动类上
package com.shen.api.consumer;import org.springframework.context.annotation.Import;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import({ReferenceInvokeProxy.class,RpcHandler.class})
public @interface EnableRpcConsumer {}
@Reference
打在注入的接口上
package com.shen.api.consumer;import org.springframework.stereotype.Component;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface Reference {}
ReferenceInvokeProxy
对于每个有 @Reference 注解的接口,生成代理
package com.shen.api.consumer;import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;import java.lang.reflect.Field;
import java.lang.reflect.Proxy;public class ReferenceInvokeProxy implements BeanPostProcessor {@AutowiredRpcHandler invocationHandler;public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {Field[] fields=bean.getClass().getDeclaredFields();for(Field field:fields){if(field.isAnnotationPresent(Reference.class)){field.setAccessible(true);Object proxy= Proxy.newProxyInstance(field.getType().getClassLoader(),new Class<?>[]{field.getType()},invocationHandler);try {field.set(bean,proxy);} catch (IllegalAccessException e) {e.printStackTrace();}}}return bean;}
}
RpcHandler
让接口生成代理
调用服务的方法,实际上是在进行 BIO 通信
通过在 application.properties 配置 provider.host,provider.port,确定提供方主机号、端口号
package com.shen.api.consumer;import com.shen.api.Message;
import org.springframework.beans.factory.annotation.Value;import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.Socket;public class RpcHandler implements InvocationHandler {@Value("${provider.host}")private String host;@Value("${provider.port}")private int port;public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {Socket socket = new Socket(host, port);ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());Message message = Message.builder().className(method.getDeclaringClass().getName()).methodName(method.getName()).args(args).types(method.getParameterTypes()).build();objectOutputStream.writeObject(message);objectOutputStream.flush();ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());return objectInputStream.readObject();}
}
provider
服务提供方
@EnableRpcProvider
打在SpringBoot启动类上
package com.shen.api.provider;import org.springframework.context.annotation.Import;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import({SocketServerInitial.class,InitialMediator.class,})
public @interface EnableRpcProvider {}
@Service
打在提供的服务实现类上
package com.shen.api.provider;import org.springframework.stereotype.Component;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface Service {}
BeanMethod
封装所有提供的类和方法
package com.shen.api.provider;import lombok.Data;import java.lang.reflect.Method;@Data
public class BeanMethod {private Object bean;private Method method;}
InitialMediator
对于每个打了自定义的 @Service 的实例化后的 Bean,遍历所有方法,以"接口名.方法名"为key,存储实现类类名和方法名的封装到 map 中
package com.shen.api.provider;import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;import java.lang.reflect.Method;public class InitialMediator implements BeanPostProcessor {public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if(bean.getClass().isAnnotationPresent(Service.class)){Method[] methods=bean.getClass().getDeclaredMethods();for(Method method:methods){String key=bean.getClass().getInterfaces()[0].getName()+"."+method.getName();BeanMethod beanMethod=new BeanMethod();beanMethod.setBean(bean);beanMethod.setMethod(method);Mediator.map.put(key,beanMethod);}}return bean;}
}
Mediator
"接口名.方法名"为key,存储实现类类名和方法名的封装
根据实现类类名和方法名,反射调用本地方法,给出返回值
package com.shen.api.provider;import com.shen.api.Message;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class Mediator {public static Map<String ,BeanMethod> map=new ConcurrentHashMap();private volatile static Mediator instance;private Mediator(){}public static Mediator getInstance(){if(instance==null){synchronized (Mediator.class){if(instance==null){instance=new Mediator();}}}return instance;}public Object process(Message message){String key=message.getClassName()+"."+message.getMethodName();BeanMethod beanMethod=map.get(key);if(beanMethod==null){return null;}Object bean=beanMethod.getBean();Method method=beanMethod.getMethod();try {return method.invoke(bean,message.getArgs());} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}
ServiceHandler
伪异步处理 Message
package com.shen.api.provider;import com.shen.api.Message;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;public class ServiceHandler implements Runnable {private Socket socket;public ServiceHandler(Socket socket) {this.socket = socket;}public void run() {ObjectInputStream objectInputStream = null;ObjectOutputStream outputStream = null;try {objectInputStream = new ObjectInputStream(socket.getInputStream());Message message = (Message) objectInputStream.readObject();Mediator mediator=Mediator.getInstance();Object rs=mediator.process(message);outputStream = new ObjectOutputStream(socket.getOutputStream());outputStream.writeObject(rs);outputStream.flush();} catch (Exception e) {e.printStackTrace();}finally {if(objectInputStream != null) {try {objectInputStream.close();} catch (IOException e) {e.printStackTrace();}}if(outputStream != null) {try {objectInputStream.close();} catch (IOException e) {e.printStackTrace();}}}}}
SocketServerInitial
容器 Refresh 后,调用此方法监听请求。
通过在 application.properties 配置 provider.port,确定提供方端口号。
package com.shen.api.provider;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SocketServerInitial implements ApplicationListener<ContextRefreshedEvent> {private final ExecutorService executorService= Executors.newCachedThreadPool();@Value("${provider.port}")private int port;public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {ServerSocket serverSocket=null;try {serverSocket=new ServerSocket(port);while(true){Socket socket=serverSocket.accept(); executorService.execute(new ServiceHandler(socket));}} catch (IOException e) {e.printStackTrace();}finally {if(serverSocket!=null){try {serverSocket.close();} catch (IOException e) {e.printStackTrace();}}}}}
示例
consumer
启动类
package com.shen.consumer;import com.shen.api.consumer.EnableRpcConsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;@SpringBootApplication
@ComponentScan("com.shen.consumer")
@EnableRpcConsumer
public class BootStrap {public static void main(String[] args) {SpringApplication.run(BootStrap.class,args);}}
配置文件
provider.host = localhost
provider.port = 8888server.port=8080
TestController
package com.shen.consumer;import com.shen.api.ExampleService;
import com.shen.api.consumer.Reference;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TestController {@ReferenceExampleService exampleService;@GetMapping("/test")public String test(){return exampleService.info();}}
provider
启动类
package com.shen.provider;import com.shen.api.provider.EnableRpcProvider;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;@SpringBootApplication
@ComponentScan("com.shen.provider")
@EnableRpcProvider
public class BootStrap {public static void main(String[] args) {SpringApplication.run(BootStrap.class,args);}}
配置文件
provider.port = 8888server.port=8081
ExampleServiceImpl
package com.shen.provider.service;import com.shen.api.ExampleService;
import com.shen.api.provider.Service;@Service
public class ExampleServiceImpl implements ExampleService {public String info() {return "example";}
}
结果
基于 SpringBoot 手写 RPC 框架相关推荐
- MyRPCDemo netty+jdk动态代理+反射+序列化,反序列化手写rpc框架
RPC RPC(remote procedure call)远程过程调用 RPC是为了在分布式应用中,两台主机的Java进程进行通信,当A主机调用B主机的方法时,过程简洁,就像是调用自己进程里的方法一 ...
- 手写篇:如何手写RPC框架?
手写篇:如何手写RPC框架? 首先我们讲下什么是RPC? RPC(Remote Procedure Call)远程过程调用协议,他是一种通过网络从远程计算机程序请求服务.简单的来说,就是通过网络进行远 ...
- 第四篇 - 手写RPC框架
Github源码下载地址:https://github.com/chenxingxing6/myrpc 一.前言 RPC(Remote Procedure Call)-远程过程调用,它是一种通过网络从 ...
- 手写RPC框架(六)
v1.4 小更新 更新事项 暂定目标对启动类进行修改 直接集合 这个就直接看代码吧 不是特别难 难的地方我会点出来 启动引导类直接进行修改 可以传参 可以这样 当然 我想到了可以注解传参 注解构造 注 ...
- 手写RPC框架(十六)
v2.7 更新:实现CGLIB动态代理 实现CGLIB动态代理 实现一下统一调用代理类,创建总调用类,和对应模板接口,调用注解,同时在每个consumerbootstrap进行修改 对应模板接口 pa ...
- 手写RPC框架(五)
v1.3 (启动器依旧使用1.2 1.3版本在启动服务版本上尚未做出大变动 主要是增加了方便学习的功能) 更新事项 以下更新均在非阻塞模块进行更新,阻塞模块可供读者自己尝试 使用注解方式 改造一下启动 ...
- 手写RPC框架(八)
v1.6 热补丁,nio目前来看最后的完善,使用Curator简化zookeeper的操作,优化调用体验 使用Curator创建服务注册和服务发现类(是看快速开始速成的) 服务注册类实现代码 pack ...
- Marco's Java【Dubbo 之手写Dubbo框架实现远程调用】
前言 关于Dubbo入门的网上教程也特别多,因此我没有专门出关于Dubbo的系列博文(主要呢- 也是在忙些工作上的事儿),用Dubbo特别简单,但是想要把Dubbo学好,学精还得花费不少时间的,特别是 ...
- 【手撸RPC框架】SpringBoot+Netty4实现RPC框架
[手撸RPC框架]SpringBoot+Netty4实现RPC框架 线程模型1:传统阻塞 I/O 服务模型 模型特点: 采用阻塞IO模式获取输入的数据 每个链接都需要独立的线程完成数据的输入,业务处理 ...
- java基础巩固-宇宙第一AiYWM:为了维持生计,手写RPC~Version07(RPC原理、序列化框架们、网络协议框架们 、RPC 能帮助我们做什么呢、RPC异常排查:ctrl+F搜超时)整起
上次Version06说到了咱们手写迷你版RPC的大体流程, 对咱们的迷你版RPC的大体流程再做几点补充: 为什么要封装网络协议,别人说封装好咱们就要封装?Java有这个特性那咱就要用?好像是这样.看 ...
最新文章
- 1000行 MySQL 学习笔记,不怕你不会,就怕你不学!
- python datetime.datetime 当前_python之time和datetime的常用方法
- 已知先序遍历和中序遍历,输出他的后序遍历序列.
- List集合ArrayList,LinkList
- 计算机组成原理--主存储器
- s5pv210——LCD基础理论
- leetcode练习:292. Nim Game
- Vivado 约束文件XDC使用经验总结
- Redmi K50标准版工信部入网:搭载骁龙870 没有12GB内存
- maven 打包java项目_如何使用maven打包java项目?
- 2021-2025年中国一次性生物处理系统行业市场供需与战略研究报告
- 出去转了一转,便利店......
- 6.GitLab 分支管理
- 雷霆战机单机老版本_雷霆战机单机版
- CI框架工作原理浅析
- Java selenium 设置代理
- 一篇文章说尽,中国互联网的30年(完结篇)
- 凯利公式(庄家必胜篇)——致放假在家的高薪程序员们
- android设备连接电脑无需授权
- code回归采访哭 ladies_LADIES’CODE再次提到高恩妃和权梨世的车祸事故 在节目中流泪...
热门文章
- axios拦截,api统一管理
- SSM整合案例分析(详解)
- mac下安装photoshop
- 【GO】panic: reflect.Value.Interface: cannot return value obtained from unexported field or method
- react-native获取农历日期和二十四节气
- 记事本改字体的代码java_记事本编程切换字体颜色 用java编写一个记事本程序
- 160多个android开源码汇总
- oracle 按天数统计数据
- VS Code 中解决 C++ 代码编写时的爆红
- python实现Content-Type:application/octet-stream