后续会迭代

文章目录

  • 模块目录
  • 流程
    • 测试实例
    • 服务端初始化
      • 1.配置端口
      • 2.网络处理模块初始化
      • 3.实例化后续用到的类
    • 注册服务
      • 1.服务信息作为map的key
      • 2.具体服务实例为map的value
    • 客户端初始化
      • 1.创建多个和服务端连接
      • 2.连接选择类的初始化
    • 方法调用之动态代理
      • 客户端获取代理实例
      • 客户端远程调用
      • 服务端执行服务
    • 方法调用之网络传输过程及序列化
      • 序列化
      • IO传输
      • 反序列化
  • 知识扩展
    • Jetty
      • 项目中的使用
      • 什么是Jetty?
      • 为什么使用Jetty?
      • 架构

模块目录

协议模块proto:定义了其他模块需要的bean,如ip端口,请求信息,响应信息,服务信息(注册的key)

序列化模块codec:定义了序列化和反序列化方法

网络模块transport:实现底层的网络通信,使用了jetty

server模块:rpc server端,接收请求报文并返回响应报文

client模块:rpc client端,发送请求报文收到响应报文

工具模块common: 反射工具类

流程

测试实例

服务端

public static void main(String[] args) throws Exception {RpcServer server = new RpcServer();server.register(CalcService.class,new CalcServiceImpl());server.start();}

客户端

public static void main(String[] args) throws InstantiationException, IllegalAccessException {RpcClient client = new RpcClient();   CalcService  service = client.getProxy(CalcService.class);      int r1 = service.add(1,2);int r2 = service.minus(10,8);System.out.println(r1);System.out.println(r2);}

调用类

public class CalcServiceImpl implements CalcService{@Overridepublic int add(int a, int b) {return a+b;}@Overridepublic int minus(int a, int b) {return a-b;}
}
服务端初始化

package: rpc-server class:RpcServer

public RpcServer() throws InstantiationException, IllegalAccessException {this.config=new RpcServerConfig(); //1this.net=ReflectionUtils.newInstance(config.getTransportClass());this.net.init(config.getPort(),this.handler);//2this.ecoder=ReflectionUtils.newInstance(config.getEcoderClass());//3this.dcoder=ReflectionUtils.newInstance(config.getDcoderClass());this.serviceInvoker=new ServiceInvoker();this.serviceManager=new ServiceManager();}
1.配置端口
this.config=new RpcServerConfig();

package: rpc-server class:RpcServerConfig

定义服务端port

public class RpcServerConfig {private Class<? extends TransportServer> transportClass=HttpTransportServer.class;private Class<? extends Ecoder>ecoderClass =JsonEcoder.class;private Class<? extends Dcoder>dcoderClass= JsonDcoder.class;private int port=3001;}
2.网络处理模块初始化
this.net=ReflectionUtils.newInstance(config.getTransportClass());
this.net.init(config.getPort(),this.handler);

package:rpc-transport class:HttpTransportServer

init函数

使用jetty(服务端的web容器类似tomcat,接收http)

@Override
public void init(int port, RequestHandler requestHandler) {this.requestHandler=requestHandler;this.server=new Server(port);//接收请求并处理ServletContextHandler servletContextHandler=new ServletContextHandler();server.setHandler(servletContextHandler);ServletHolder servletHolder=new ServletHolder(new RequestServlet() );servletContextHandler.addServlet(servletHolder,"/*");
}

package:rpc-transport class:HttpTransportServer

init()创建的servlet doPost函数用于接收处理网络请求

class RequestServlet extends HttpServlet{@SneakyThrows@Overrideprotected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {log.info("client log");InputStream inputStream=req.getInputStream();OutputStream outputStream=resp.getOutputStream();if(requestHandler!=null){requestHandler.onRequest(inputStream,outputStream);}outputStream.flush();}}
3.实例化后续用到的类
注册服务

package:rpc-server class:ServiceManager

public ServiceManager(){this.services=new ConcurrentHashMap<>();}public <T> void register(Class<T> interfaceClass,T bean){Method[] methods=ReflectionUtils.getPublicMethod(interfaceClass);for(Method method:methods){ServiceInstance serviceInstance=new ServiceInstance(bean,method); //1ServiceDescriptor serviceDescriptor=ServiceDescriptor.from(interfaceClass,method);//2services.put(serviceDescriptor,serviceInstance);System.out.println(serviceDescriptor.getClazz());log.info("register————————{} {}",serviceDescriptor.getClazz(),serviceDescriptor.getMethod());}}
1.服务信息作为map的key
ServiceDescriptor serviceDescriptor=ServiceDescriptor.from(interfaceClass,method);//2

package:rpc-proto class:ServiceDescriptor

服务的类名,方法名,返回类型,参数

 private String clazz;private String method;private String returnType;private String[] parameterType;
2.具体服务实例为map的value
erviceInstance serviceInstance=new ServiceInstance(bean,method); //1

package:rpc-server class: ServiceInstance

bean实例和方法实例作为后续反射调用服务(方法)的基础

@Data
@AllArgsConstructor
public class ServiceInstance {private Object target;private Method method;
}
客户端初始化
public RpcClient(RpcClientConfig rpcClientConfig) throws InstantiationException, IllegalAccessException {this.config=rpcClientConfig; //1this.ecoder=ReflectionUtils.newInstance(this.config.getEcoderClass());this.dcoder=ReflectionUtils.newInstance(this.config.getDcoderClass());this.selector=ReflectionUtils.newInstance(this.config.getSelectorClass());this.selector.init(this.config.getServers(),this.config.getConnectCount(),this.config.getTransportClass());}
1.创建多个和服务端连接
 this.config=rpcClientConfig;

package:rpc-client class:RpcClientConfig

 private List<Peer> servers = Arrays.asList(new Peer("127.0.0.1",3001));
2.连接选择类的初始化
this.selector=ReflectionUtils.newInstance(this.config.getSelectorClass());this.selector.init(this.config.getServers(),this.config.getConnectCount(),this.config.getTransportClass());}

package:rpc-client class:RandomTransportSelector

后续连接时会在连接池中随机取出一个作为通信连接

private List<TransportClient> clients;public RandomTransportSelector()
{clients=new ArrayList<>();
}@Overridepublic synchronized void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz) throws InstantiationException, IllegalAccessException {count=Math.max(count,1);for(Peer peer:peers) {for (int i = 0; i < count; i++) {TransportClient client=ReflectionUtils.newInstance(clazz);client.connect(peer);clients.add(client);}log.info("connect server:{}",peer);}}
方法调用之动态代理
客户端获取代理实例

package:rpc-client class:RpcClient

public <T> T getProxy(Class<T> clazz){System.out.println("enter getProxy");return (T)Proxy.newProxyInstance(getClass().getClassLoader(),new Class[]{clazz},new RemoteInvoker(clazz,ecoder,dcoder,selector));
}
客户端远程调用

package:rpc-client class:RemoteInvoker

动态代理的核心函数

将请求服务信息(方法所在类,方法名,参数等)封装在request对象中

request通过invokeRemote发送给服务端执行,服务端执行后的结果返回给客户端

从而完成了客户端远程方法调用(传输过程见后续)

 @Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws IllegalAccessException {Request request=new Request();request.setServiceDescriptor(ServiceDescriptor.from(clazz,method));request.setParameter(args);Response response=invokeRemote(request);if(response==null||response.getCode()!=0){System.out.println("response is null");throw new IllegalAccessException("fail to remoteInvoke");}return response.getData();}
服务端执行服务

package:rpc-server class:RpcServer

通过lookup函数(注册服务map的get的方法)通过request提取的serviceDescriptor(map的key)找到服务实例

通过反射执行serviceInstance,方法执行得到的结果封装为response发给客户端

(传输过程见后续)

private RequestHandler handler = new RequestHandler() {@Overridepublic void onRequest(InputStream inputStream, OutputStream outputStream) throws IOException {Response response=new Response();try {byte[] bytes = IOUtils.readFully(inputStream, inputStream.available());Request request = dcoder.decode(bytes, Request.class);log.info("get request", request);ServiceInstance serviceInstance = serviceManager.lookup(request);Object ret = serviceInvoker.serviceInvoke(serviceInstance, request);response.setData(ret);}catch(Exception e){response.setCode(1);response.setMessage("RpcSever error "+e.getClass().getName());}finally {byte outBytes[]=ecoder.ecode(response);outputStream.write(outBytes);}}};

package:rpc-server class:ServiceManage

public ServiceInstance lookup(Request request)
{ServiceDescriptor serviceDescriptor=request.getServiceDescriptor();return services.get(serviceDescriptor);
}
方法调用之网络传输过程及序列化

package:rpc-client class:remoteInvoke 函数:invokeRemote(Request request)——前面客户端远程调用中使用

byte[] outBytes=ecoder.ecode(request);
InputStream responseStream=transportClient.write(new ByteArrayInputStream(outBytes));
byte[] inBytes= IOUtils.readFully(responseStream,responseStream.available());
response=dcoder.decode(inBytes,Response.class);
序列化
byte[] outBytes=ecoder.ecode(request);

package:rpc-codec class:JsonEcoder

使用Json序列化,将request序列化成二进制数组

 private ObjectMapper objectMapper = new ObjectMapper();@Overridepublic byte[] ecode(Object obj) {try {return objectMapper.writeValueAsBytes(obj);} catch (JsonProcessingException e) {log.error("序列化时有错误发生: {}", e.getMessage());e.printStackTrace();return null;}}
IO传输

客户端:

InputStream responseStream=transportClient.write(new ByteArrayInputStream(outBytes));

建立连接,客户端通过IO输出流将二进制数组输出,并获得服务端传来的输入流

package:rpc-transport class:HttpTransportClient

public InputStream write(InputStream data) throws IOException {try {HttpURLConnection httpURLConnection = (HttpURLConnection) (new URL(url).openConnection());httpURLConnection.setDoInput(true); //设置是否向HttpURLConnection输入httpURLConnection.setDoOutput(true); //设置是否向HttpURLConnection输出httpURLConnection.setUseCaches(false); //设置是否使用缓存httpURLConnection.setRequestMethod("POST");httpURLConnection.connect();//建立连接IOUtils.copy(data, httpURLConnection.getOutputStream());//将data作为请求发送出去int resultCode = httpURLConnection.getResponseCode();//收到响应信息获取状态码System.out.println("resultCode "+resultCode);//System.out.println("HTTP");//System.out.println("Input "+httpURLConnection.getInputStream());if (resultCode == HttpURLConnection.HTTP_OK) {System.out.println("HTTPOK");return httpURLConnection.getInputStream();} elsereturn httpURLConnection.getInputStream();}catch (IOException e){throw new IllegalStateException();}}
public void connect(Peer peer) {this.url="http://"+peer.getHost()+":"+peer.getPort();}

服务端:

package:rpc-transport class:HttpTransportServer

requestHandler.onRequest(inputStream,outputStream);

package:rpc-transport class:HttpTransportServer

先将客户端发来input输入流的二进制数据读取出来,然后反序列化成Request类

进行中间处理(前文已说明)

然后将结果序列化为二进制数组,以输出流返回客户端

 public void onRequest(InputStream inputStream, OutputStream outputStream) throws IOException {Response response=new Response();try {byte[] bytes = IOUtils.readFully(inputStream, inputStream.available());Request request = dcoder.decode(bytes, Request.class);log.info("get request", request);ServiceInstance serviceInstance = serviceManager.lookup(request);Object ret = serviceInvoker.serviceInvoke(serviceInstance, request);response.setData(ret);}catch(Exception e){response.setCode(1);response.setMessage("RpcSever error "+e.getClass().getName());}finally {byte outBytes[]=ecoder.ecode(response);outputStream.write(outBytes);}}
反序列化
response=dcoder.decode(inBytes,Response.class);

package:rpc-codec class:JsonDcoder

public <T> T decode(byte[] bytes, Class<T> clazz) {try {Object obj = objectMapper.readValue(bytes, clazz);return (T)obj;} catch (IOException e) {log.error("反序列化时有错误发生: {}", e.getMessage());e.printStackTrace();return null;}}

知识扩展

Jetty
项目中的使用
this.server=new Server(port);//接收请求并处理ServletContextHandler servletContextHandler=new ServletContextHandler();server.setHandler(servletContextHandler);ServletHolder servletHolder=new ServletHolder(new RequestServlet() );servletContextHandler.addServlet(servletHolder,"/*");
什么是Jetty?

​ 简单来讲Jetty就是一个开源的HTTP服务器和Servlet引擎,它可以为JSP和Servlet提供运行时环境,比如Java Web应用最常用的Servlet容器Tomcat,由于其轻量、灵活的特性,Jetty也被应用于一些知名产品中,例如ActiveMQ、Maven、Spark、GoogleAppEngine、Eclipse、Hadoop等。

为什么使用Jetty?

​ ①异步的 Servlet,支持更高的并发量

​ ②模块化的设计,更灵活,更容易定制,也意味着更高的资源利用率

​ ③在面对大量长连接的业务场景下,Jetty 默认采用的 NIO 模型是更好的选择

​ ④将jetty嵌入到应用中,使一个普通应用可以快速支持 http 服务

架构

jetty 的架构比较简单,核心组件主要是由 Server 和 Handler 组成。其中 Server 的 Handler 是其比较重要的一个数据模型,Jetty 中所有的组件都是基于 Handler 来实现的。

项目笔记——简易RPC框架(待升级)相关推荐

  1. python go rpc_Go实现简易RPC框架的方法步骤

    本文旨在讲述 RPC 框架设计中的几个核心问题及其解决方法,并基于 Golang 反射技术,构建了一个简易的 RPC 框架. RPC RPC(Remote Procedure Call),即远程过程调 ...

  2. 知识笔记 - sekiro RPC框架的安装与简单使用

    文章旨在学习和记录,若有侵权,请联系删除 文章目录 前言 一.sekiro是什么 1. 简介 2. 逻辑结构 3. 运行流程 4. 下载和部署 二.如何开发sekiro客户端 1.打开平头哥项目 2. ...

  3. RPC框架几行代码就够了

    转于作者梁飞在公司的Blog:  http://pt.alibaba-inc.com/wp/experience_1330/simple-rpc-framework.html 因为要给百技上实训课,让 ...

  4. 使用C++开发RPC框架

    使用C++开发RPC框架 RPC(Remote Procedure Call)框架使得远端的一个进程可以调用远端另一个进程所提供的方法,是构建分布式系统的基础通信协议.本项目使用了C++开发了一个RP ...

  5. RPC框架与REST服务

    1.常见的RPC框架 Dubbo:阿里开源的框架,仅支持Java语言. gRPC:Google开源的框架,支持多种语言. Thrift:Facebook开源框架,支持多种语言. Tars:腾讯开源的框 ...

  6. RPC框架项目的学习

    学习RPC主要文章 声哥文章 https://blog.csdn.net/qq_40856284/category_10138756.html PANDA文章(更易懂一些) https://blog. ...

  7. 还发愁项目经验吗?基于Netty实现分布式RPC框架[附完整代码]

    写给大家的话 最近我收到很多读者的来信,对如何学习分布式.如何进行项目实践和提高编程能力,存在很多疑问. 分布式那么难,怎么学?为什么看了那么多书还是掌握不了? 开源的框架比如Dubbo代码太多了,完 ...

  8. motan学习笔记 一 微博轻量级RPC框架Motan

    前言 motan学习笔记 一 微博轻量级RPC框架Motan motan学习笔记 二 motan架构分析 motan学习笔记 三 motan Demo 分析 motan学习笔记 四 motan Dem ...

  9. 经典项目|手撸一个高质量RPC框架

    hi, 大家好,RPC是后端系统节点之间通信的核心技术,属于后端开发必须要学习的技能. 后端技术趋势指南|如何选择自己的技术方向 如何从0搭建公司的后端技术栈 远程过程调用(Remote Proced ...

最新文章

  1. 入门响应式Web?看懂这篇文章就够了!——Web前端系列学习笔记
  2. 物联网火爆,入门却太难了!
  3. MySQL8 全部类型
  4. Python Django开发中XSS内容过滤问题的解决
  5. 格雷码与二进制转换电路设计
  6. 把计算机网络关闭啦怎么打开,网络发现已关闭怎么办?Win7系统启用/关闭网络发现方法(图文)...
  7. jsp里table边框线_JSP表格边框颜色
  8. 我的appstore新游戏--LeBallon 拿码了
  9. 连续均匀聚苯乙烯纳米微球造孔剂/氨基化聚苯乙烯微球/羧基功能化马来酸酉干(MA)聚苯乙烯微球
  10. 使用虚拟机备份软件备份VMware vSphere虚拟机
  11. Linux搭建下载器
  12. 基于上下文感知计算的APT攻击组织追踪方法
  13. flex弹性盒子flex-grow 和flex的区别
  14. 特征选择-过滤式选择
  15. Android Adapter嵌套Adapter(文档类app,说明书类app)
  16. Kotlin——高阶函数详解与标准的高阶函数使用
  17. 企业在贴吧里面被人恶意诽谤的不实帖子要怎么删除?
  18. 图像的傅里叶变换和逆变换C++版
  19. 如何修改通达OA的登录首页的LOGO
  20. Android手机微信内置浏览器缓存怎么清理?

热门文章

  1. Unisoc RNDIS上网业务流程学习笔记
  2. FreeRDP的安装方法
  3. UE4 C++入门之路1-C++和蓝图的关系和介绍
  4. 深度学习细粒度分类综述
  5. 各行业分析研究报告 入口汇总
  6. 数码管:3位6脚的数码管分析和编码
  7. c语言记录键盘敲击次数,【转】你知道一天敲键盘的敲击次数能达到多少么
  8. 计算机使用水平怎么填,计算机水平怎么填写?
  9. JavaBean 技术与 JSP 开发模型练习题
  10. 解析:数组名a、数组名取地址a、数组首地址a[0]、数组指针*p