RPC(Remote Procedure Call) 在介绍分布是RPC前首先介绍一个下JAVA中简单的RPC实现

服务器端,通过SocketServer,持续接收客户端的请求,并将客户端的请求分发到指定的处理器出去处理。

阻塞通线模型,是server对每一个请求都开启一条线程去执行请求,此种方式的缺点是服务器端线程的数量和客户端并发访问请求树呈1:1的正比关系。

PRC 原理

请求方 按照一定格式发送请求到服务端

服务端 按照请求要求进行计算,并将结果按照约定格式进行返回

请求方从返回数据中解析出具体结果

此处对此作出了一定优化,伪异步IO通信,将所有用户请求放到线程池中处理。

/**** @author zhangwei_david* @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $*/
public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {/**服务端口号**/private int                port       = 12000;private ServerSocket       server;//线程池@Autowiredprivate Executor           executorService;public Map<String, Object> handlerMap = new ConcurrentHashMap<>();private void publishedService() throws Exception {server = new ServerSocket(port);// 一直服务for (;;) {try {// 获取socketfinal Socket socket = server.accept();executorService.execute(new Runnable() {@Overridepublic void run() {try {// 获取inputObjectInputStream input = new ObjectInputStream(socket.getInputStream());ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());try {// 获取引用String interfaceName = input.readUTF();//获取 方法名String methodName = input.readUTF();//Class<?>[] parameterTypes = (Class<?>[]) input.readObject();Object[] arguments = (Object[]) input.readObject();try {Object service = handlerMap.get(interfaceName);Method method = service.getClass().getMethod(methodName,parameterTypes);Object result = method.invoke(service, arguments);output.writeObject(result);} catch (Throwable t) {output.writeObject(t);} finally {input.close();}} finally {socket.close();}} catch (Exception e) {}}});} catch (Exception e) {}}}public void init() {}/*** @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()*/@Overridepublic void afterPropertiesSet() throws Exception {//发布服务publishedService();}/*** @see org.springframework.context.Lifecycle#start()*/@Overridepublic void start() {}/*** @see org.springframework.context.Lifecycle#stop()*/@Overridepublic void stop() {if (server != null) {try {server.close();} catch (IOException e) {}}}/*** @see org.springframework.context.Lifecycle#isRunning()*/@Overridepublic boolean isRunning() {return false;}/*** @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);System.out.println(serviceBeanMap);if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {for (Object serviceBean : serviceBeanMap.values()) {String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf().getName();handlerMap.put(interfaceName, serviceBean);}}}/*** Setter method for property <tt>executorService</tt>.** @param executorService value to be assigned to property executorService*/public void setExecutorService(Executor executorService) {this.executorService = executorService;}}/**** @author zhangwei_david* @version $Id: SRPC.java, v 0.1 2015年8月8日 下午12:51:17 zhangwei_david Exp $*/
@Documented
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface SRPC {public Class<?> interf();
}

  

至此就实现了服务的自动发现自动注册,当然这个仅针对单机环境下的自动注册。

/**** @author zhangwei_david* @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $*/
public class Client {/*** 引用服务** @param <T> 接口泛型* @param interfaceClass 接口类型* @param host 服务器主机名* @param port 服务器端口* @return 远程服务* @throws Exception*/@SuppressWarnings("unchecked")public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)throws Exception {if (interfaceClass == null || !interfaceClass.isInterface()) {throw new IllegalArgumentException("必须指定服务接口");}if (host == null || host.length() == 0) {throw new IllegalArgumentException("必须指定服务器的地址和端口号");}return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] { interfaceClass }, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] arguments)throws Throwable {Socket socket = new Socket(host, port);try {ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());try {output.writeUTF(interfaceClass.getName());output.writeUTF(method.getName());output.writeObject(method.getParameterTypes());output.writeObject(arguments);ObjectInputStream input = new ObjectInputStream(socket.getInputStream());try {Object result = input.readObject();if (result instanceof Throwable) {throw (Throwable) result;}return result;} finally {input.close();}} finally {output.close();}} finally {socket.close();}}});}
}

  

上面在没有使用第三方依赖包实现了简单的RPC,优化增加 request和reponse,定义RPC协议。

/**** @author zhangwei_david* @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $*/
public class SrpcRequest implements Serializable {/**  */private static final long serialVersionUID = 6132853628325824727L;// 请求Idprivate String            requestId;// 远程调用接口名称private String            interfaceName;//远程调用方法名称private String            methodName;// 参数类型private Class<?>[]        parameterTypes;// 参数值private Object[]          parameters;/*** Getter method for property <tt>requestId</tt>.** @return property value of requestId*/public String getRequestId() {return requestId;}/*** Setter method for property <tt>requestId</tt>.** @param requestId value to be assigned to property requestId*/public void setRequestId(String requestId) {this.requestId = requestId;}/*** Getter method for property <tt>interfaceName</tt>.** @return property value of interfaceName*/public String getInterfaceName() {return interfaceName;}/*** Setter method for property <tt>interfaceName</tt>.** @param interfaceName value to be assigned to property interfaceName*/public void setInterfaceName(String interfaceName) {this.interfaceName = interfaceName;}/*** Getter method for property <tt>methodName</tt>.** @return property value of methodName*/public String getMethodName() {return methodName;}/*** Setter method for property <tt>methodName</tt>.** @param methodName value to be assigned to property methodName*/public void setMethodName(String methodName) {this.methodName = methodName;}/*** Getter method for property <tt>parameterTypes</tt>.** @return property value of parameterTypes*/public Class<?>[] getParameterTypes() {return parameterTypes;}/*** Setter method for property <tt>parameterTypes</tt>.** @param parameterTypes value to be assigned to property parameterTypes*/public void setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;}/*** Getter method for property <tt>parameters</tt>.** @return property value of parameters*/public Object[] getParameters() {return parameters;}/*** Setter method for property <tt>parameters</tt>.** @param parameters value to be assigned to property parameters*/public void setParameters(Object[] parameters) {this.parameters = parameters;}}/**** @author zhangwei_david* @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $*/
public class SrpcResponse implements Serializable {/**  */private static final long serialVersionUID = -5934073769679010930L;// 请求的Idprivate String            requestId;// 异常private Throwable         error;// 响应private Object            result;/*** Getter method for property <tt>requestId</tt>.** @return property value of requestId*/public String getRequestId() {return requestId;}/*** Setter method for property <tt>requestId</tt>.** @param requestId value to be assigned to property requestId*/public void setRequestId(String requestId) {this.requestId = requestId;}/*** Getter method for property <tt>error</tt>.** @return property value of error*/public Throwable getError() {return error;}/*** Setter method for property <tt>error</tt>.** @param error value to be assigned to property error*/public void setError(Throwable error) {this.error = error;}/*** Getter method for property <tt>result</tt>.** @return property value of result*/public Object getResult() {return result;}/*** Setter method for property <tt>result</tt>.** @param result value to be assigned to property result*/public void setResult(Object result) {this.result = result;}}/**** @author zhangwei_david* @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $*/
public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {/**服务端口号**/private int                port       = 12000;private ServerSocket       server;//线程池@Autowiredprivate Executor           executorService;public Map<String, Object> handlerMap = new ConcurrentHashMap<>();private void publishedService() throws Exception {server = new ServerSocket(port);// 一直服务for (;;) {try {// 获取socketfinal Socket socket = server.accept();executorService.execute(new Runnable() {@Overridepublic void run() {try {// 获取inputObjectInputStream input = new ObjectInputStream(socket.getInputStream());try {// 获取RPC请求SrpcRequest request = (SrpcRequest) input.readObject();ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());try {SrpcResponse response = doHandle(request);output.writeObject(response);} finally {input.close();}} finally {socket.close();}} catch (Exception e) {}}});} catch (Exception e) {}}}private SrpcResponse doHandle(SrpcRequest request) {SrpcResponse response = new SrpcResponse();response.setRequestId(request.getRequestId());try {Object service = handlerMap.get(request.getInterfaceName());Method method = service.getClass().getMethod(request.getMethodName(),request.getParameterTypes());response.setResult(method.invoke(service, request.getParameters()));} catch (Exception e) {response.setError(e);}return response;}public void init() {}/*** @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()*/@Overridepublic void afterPropertiesSet() throws Exception {//发布publishedService();}/*** @see org.springframework.context.Lifecycle#start()*/@Overridepublic void start() {}/*** @see org.springframework.context.Lifecycle#stop()*/@Overridepublic void stop() {if (server != null) {try {server.close();} catch (IOException e) {}}}/*** @see org.springframework.context.Lifecycle#isRunning()*/@Overridepublic boolean isRunning() {return false;}/*** @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);System.out.println(serviceBeanMap);if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {for (Object serviceBean : serviceBeanMap.values()) {String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf().getName();handlerMap.put(interfaceName, serviceBean);}}}/*** Setter method for property <tt>executorService</tt>.** @param executorService value to be assigned to property executorService*/public void setExecutorService(Executor executorService) {this.executorService = executorService;}}/**** @author zhangwei_david* @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $*/
public class Client {/*** 引用服务** @param <T> 接口泛型* @param interfaceClass 接口类型* @param host 服务器主机名* @param port 服务器端口* @return 远程服务* @throws Exception*/@SuppressWarnings("unchecked")public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)throws Exception {if (interfaceClass == null || !interfaceClass.isInterface()) {throw new IllegalArgumentException("必须指定服务接口");}if (host == null || host.length() == 0) {throw new IllegalArgumentException("必须指定服务器的地址和端口号");}return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] { interfaceClass }, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] arguments)throws Throwable {Socket socket = new Socket(host, port);try {ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());try {SrpcRequest request = new SrpcRequest();request.setRequestId(UUID.randomUUID().toString());request.setInterfaceName(interfaceClass.getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(arguments);output.writeObject(request);ObjectInputStream input = new ObjectInputStream(socket.getInputStream());try {SrpcResponse response = (SrpcResponse) input.readObject();if (response.getError() != null&& response.getError() instanceof Throwable) {throw response.getError();}return response.getResult();} finally {input.close();}} finally {output.close();}} finally {socket.close();}}});}
}

  

转载于:https://www.cnblogs.com/wei-zw/p/8797749.html

实现RPC就是这么简单相关推荐

  1. 项目通信之RPC调用——java简单实现

    微服务项目通信方法很多,有像springcloud解决方案的http通信,还有像阿里Dubbo的RPC通信,这里简单实现RPC调用.一共2个端,客户端server和客户端client.项目demo很简 ...

  2. Yar并行的RPC框架的简单使用

    RPC,就是Remote Procedure Call的简称呀,翻译成中文就是远程过程调用 RPC要解决的两个问题: 解决分布式系统中,服务之间的调用问题. 远程调用时,要能够像本地调用一样方便,让调 ...

  3. c# 调用restful json_微服务调用为啥用RPC框架,http不更简单吗?

    背景 在一次的面试交谈中,聊到业务实现的技术架构.不管系统大小,一般都是微服务的架构,所以就产生了一个问题,为什么服务之间调用,选择用RPC,http 不也能实现服务之间的通信吗?怎么不用呢?或者 R ...

  4. 分布式 RPC架构简单理解

    RPC框架 RPC(Remote Promote Call) 一种进程间通信方式.允许像调用本地服务一样调用远程服务. RPC框架的主要目标就是让远程服务调用更简单.透明.RPC框架负责屏蔽底层的传输 ...

  5. 基于消息中间件RabbitMQ实现简单的RPC服务

    转载自  基于消息中间件RabbitMQ实现简单的RPC服务 RPC(Remote Procedure Call,远程过程调用),是一种计算机通信协议.对于两台机器而言,就是A服务器上的应用程序调用B ...

  6. 使用Akka实现简单RPC框架

    使用Akka实现简单RPC框架 最近简单看了看Flink的RPC通讯相关的源码,它是通过Akka实现的,为了更好的阅读理解代码,又大体看了看Akka相关的知识.这篇文章主要记录了如果使用Akka来实现 ...

  7. 手写一个简单rpc框架(一)

    扑街前言:前面说了netty的基本运用.Java的NIO等一系列的知识,这些知识已经可以做一个简单的rpc框架,本篇和下篇我们一起了解一个怎么完成一个rpc框架,当然个只是为了更好的了解rpc框架的基 ...

  8. 花了一个星期,我终于把RPC框架整明白了!

    " RPC(Remote Procedure Call):远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的思想. 作者:李金葵,来自:51CTO技术栈 R ...

  9. RabbitMQ之RPC实现

    2019独角兽企业重金招聘Python工程师标准>>> 什么是RPC? RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/ ...

最新文章

  1. python编程入门经典实例-【python】编程语言入门经典100例--30
  2. 一图解码数据中心数字化运维管理之道
  3. Vue开发规范1.0
  4. c# 获取excel单元格公式结果_excel公式应用技巧:文字和数字混合的单元格,如何求和?...
  5. 研究Mysql优化得出一些建设性的方案
  6. [转载] Java反射是什么?看这篇绝对会了!
  7. 6-2-JSP基本语法
  8. JWT教程_2 SpringSecurity与JWT整合
  9. ADO.NET五大对象详解(转)
  10. python源代码编译后的文件扩展名-python源代码被解释器转换后的格式是什么?
  11. HashMap,LinkedHashMap,TreeMap应用
  12. STC8A8K64D4 EEPROM读写失败
  13. 一个多道批处理系统中仅有 P1 和 P2 两个作业
  14. Android中的RAM、ROM、SD卡以及各种内存的区别
  15. HE4484E泛海微5V USB 输入双节锂电池串联应用升压充电IC管理芯片
  16. 【MATLAB统计分析与应用100例】案例016:matlab读取Excel数据,进行样品系统聚类分析
  17. 用脚本语言对 Excel 分组汇总
  18. WEB前端网页设计-Bootstrap 超大屏幕(Jumbotron)
  19. python中语法错误-Python语法错误与异常及异常处理方法
  20. oracle按非选列排序,如何选择和排序不在Groupy中的列按SQL语句 – Oracle

热门文章

  1. linux运行jps五行结果,Linux系统性能监控
  2. 苹果手机夜间模式怎么设置_微信怎么设置夜间模式?iPhone夜间模式设置教程 省电又护眼!...
  3. 2022-04-25 安装PostgreSQL的发现小bug
  4. addEventListener事件监听传递参数
  5. IOCP编程之基本原理
  6. 计算机信息管理专业教学改革,计算机信息管理专业实践教学改革探索
  7. mysql怎样搞一个项目_程序员如何快速上手一个自己不太熟悉的新项目?有什么技巧?...
  8. Python使用C++动态库的方法
  9. Windows和Linux内存检测工具:Valgrind,Visual Leak Detector,CppCheck, Cpplint
  10. Spark的三种运行模式