前言

什么是RPC?

RPC(Remote Procedure Call)远程过程调用协议,一种通过网络从远程计算机上请求服务,而不需要了解底层网络技术的协议。简单地说就是能使应用像调用本地方法一样的调用远程的过程或服务,可以应用在分布式服务、分布式计算、远程服务调用等许多场景。

业界有很多开源的优秀 RPC 框架,例如 Dubbo、Thrift、gRPC、Hprose 等等。

RPC 协议只规定了 Client 与 Server 之间的点对点调用流程,包括 stub、通信协议、RPC 消息解析等部分,在实际应用中,还需要考虑服务的高可用、负载均衡等问题,除了点对点的 RPC 协议的具体实现外,还可以包括服务的发现与注销、提供服务的多台 Server 的负载均衡、服务的高可用等更多的功能。 目前的 RPC 框架大致有两种不同的侧重方向,一种偏重于服务治理,另一种偏重于跨语言调用 。

RPC框架特点

RPC 调用方式

RPC 调用方式分以下两种:

1. 同步调用  客户方等待调用执行完成并返回结果。2. 异步调用  客户方调用后不用等待执行结果返回,但依然可以通过回调通知等方式获取返回结果。 若客户方不关心调用返回结果,则变成单向异步调用,单向调用不用返回结果。

RPC协议的组成

在一个典型的RPC的使用场景中,包含了服务发现,负载,容错,网络传输,序列化等组件,其中RPC协议指明了程序如何进行网络传输和序列化。

![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly91cGxvYWQtaW1hZ2VzLmppYW5zaHUuaW8vdXBsb2FkX2ltYWdlcy8xMjEzNTMzOS0wNDcxYjM0MGMwOWE3NTczLnBuZz9pbWFnZU1vZ3IyL2F1dG8tb3JpZW50L3N0cmlwfGltYWdlVmlldzIvMi93LzU5Ni9mb3JtYXQvd2VicA?x-oss-
process=image/format,png)

RPC协议主要由以下几部分组成:

  1. 地址:服务提供者地址

  2. 端口:协议指定开放的端口

  3. 报文编码:协议报文编码,分为请求头和请求体两部分

  4. 序列化方式:将请求体序列化成对象,具体的方式有Hessian2Serialization,DubboSerialization,JavaSerialization,JsonSerization等

  5. 运行服务:网络传输实现,实现方式主要有netty,mina,RMI服务,Servlet容器(jetty,tomcat,jboss)

RPC框架原理

![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly91cGxvYWQtaW1hZ2VzLmppYW5zaHUuaW8vdXBsb2FkX2ltYWdlcy8xMjEzNTMzOS1kNDUzOGVhNmQ3ZTJjZWJlLnBuZz9pbWFnZU1vZ3IyL2F1dG8tb3JpZW50L3N0cmlwfGltYWdlVmlldzIvMi93LzkyMS9mb3JtYXQvd2VicA?x-oss-
process=image/format,png)

服务暴露

远程提供者需要以某种形式提供服务调用相关的信息,包括但不限于服务接口定义、数据结构或者中间态的服务定义文件,web
service的WSDL文件;服务调用者需要通过一定的途径获取远程服务调用相关的信息

序列化

得到服务地址后,客户端在发起调用前需要对调用信息进行编码,这就要考虑需要编码些什么信息并以什么格式传输到服务端才能让服务端完成调用。
出于效率考虑,编码的信息越少越好(传输数据少),编码的规则越简单越好(执行效率高)。

需要信息如下:

调用编码

1. 接口方法  包括接口名、方法名2. 方法参数  包括参数类型、参数值3. 调用属性  包括调用属性信息,例如调用附件隐式参数、调用超时时间等

返回编码

1. 返回结果  接口方法中定义的返回值2. 返回码  异常返回码3. 返回异常信息  调用异常信息

通信

协议编码之后,自然就是需要将编码后的 RPC 请求消息传输到服务方,服务方执行后返回结果消息或确认消息给客户方。 RPC的应用场景实质是一种可靠的请求应答消息流,和 HTTP 类似。 因此选择长连接方式的 TCP 协议会更高效,与 HTTP不同的是在协议层面我们定义了每个消息的唯一 id,因此可以更容易的复用连接。

RPC框架的通信其实与具体的协议无关,RPC可基于HTTP或者TCP协议

远程代理

服务调用者使用的服务实际上是远程服务的本地代理,说白了就是通过动态代理实现 java中至少提供了两种动态代码的生成:

  • 一种是jdk动态代理

  • 一种是字节码生成

动态代理比字节码生成使用起来更加方便,但是性能上没有字节码生成好,字节码生成在代码可读性上要差一些。

RPC 异常处理

无论 RPC 怎样努力把远程调用伪装的像本地调用,但它们依然有很大的不同点,而且有一些异常情况是在本地调用时绝对不会碰到的。
在说异常处理之前,我们先比较下本地调用和 RPC 调用的一些差异:

1. 本地调用一定会执行,而远程调用则不一定,调用消息可能因为网络原因并未发送到服务方。
2. 本地调用只会抛出接口声明的异常,而远程调用还会跑出 RPC 框架运行时的其他异常。
3. 本地调用和远程调用的性能可能差距很大,这取决于 RPC 固有消耗所占的比重。

正是这些区别决定了使用 RPC 时需要更多考量。 当调用远程接口抛出异常时,异常可能是一个业务异常, 也可能是 RPC
框架抛出的运行时异常(如:网络中断等)。 业务异常表明服务方已经执行了调用,可能因为某些原因导致未能正常执行, 而 RPC
运行时异常则有可能服务方根本没有执行,对调用方而言的异常处理策略自然需要区分。

由于 RPC 固有的消耗相对本地调用高出几个数量级,本地调用的固有消耗是纳秒级,而 RPC 的固有消耗是在毫秒级。
那么对于过于轻量的计算任务就并不合适导出远程接口由独立的进程提供服务, 只有花在计算任务上时间远远高于 RPC 的固有消耗才值得导出为远程接口提供服务。

RPC 框架设计与实现

RPC框架功能组件主要分为以下几类:

RpcServer

​ 负责导出(export)远程接口

@Slf4j
public class RpcServer {private String serverAddress;private EventLoopGroup bossGroup = new NioEventLoopGroup();private EventLoopGroup workerGroup = new NioEventLoopGroup();private volatile Map<String /* interface name */, Object> handlerMap = new HashMap<String, Object>();public RpcServer(String serverAddress) throws InterruptedException {this.serverAddress = serverAddress;this.start();}/***     $start* @throws InterruptedException*/private void start() throws InterruptedException {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)// tpc = sync + accept  = backlog.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline cp = ch.pipeline();cp.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0));cp.addLast(new RpcDecoder(RpcRequest.class));cp.addLast(new RpcEncoder(RpcResponse.class));cp.addLast(new RpcSeverHandler(handlerMap));}});String[] array = serverAddress.split(":");String host = array[0];int port = Integer.parseInt(array[1]);ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if(future.isSuccess()) {log.info("server success bing to " + serverAddress);} else {log.info("server fail bing to " + serverAddress);throw new Exception("server start fail, cause: " + future.cause());}}});try {channelFuture.await(5000, TimeUnit.MILLISECONDS); if(channelFuture.isSuccess()) {log.info("start rapid rpc success! ");}} catch (InterruptedException e) {log.error("start rapid rpc occur Interrupted, ex: " + e);}}/***    $registerProcessor 程序注册器*/public void registerProcessor(ProviderConfig providerConfig) {//key : providerConfig.insterface (userService接口权限命名)//value : providerConfig.ref (userService接口下的具体实现类 userServiceImpl实例对象)handlerMap.put(providerConfig.getInterface(), providerConfig.getRef());}/***  $close*/public void close() {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}

RpcClient

负责导入(import)远程接口的代理实现

public class RpcClient {private String serverAddress;private long timeout;private final Map<Class<?>, Object> syncProxyIntanceMap = new ConcurrentHashMap<Class<?>, Object>();private final Map<Class<?>, Object> asyncProxyIntanceMap = new ConcurrentHashMap<Class<?>, Object>();public void initClient(String serverAddress, long timeout) {this.serverAddress = serverAddress;this.timeout = timeout;connect();}private void connect() {RpcConnectManager.getInstance().connect(serverAddress);}public void stop() {RpcConnectManager.getInstance().stop();}/***     $invokeSync 同步调用方法* @param <T>* @param interfaceClass* @return*/@SuppressWarnings("unchecked")public <T> T invokeSync(Class<T> interfaceClass) {if(syncProxyIntanceMap.containsKey(interfaceClass)) {return (T)syncProxyIntanceMap.get(interfaceClass);} else {Object proxy = Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] {interfaceClass},new RpcProxyImpl<>(interfaceClass, timeout));syncProxyIntanceMap.put(interfaceClass, proxy);return (T)proxy;          }}/***  $invokeAsync 异步调用方式的方法* @param <T>* @param interfaceClass* @return*/public <T> RpcAsyncProxy invokeAsync(Class<T> interfaceClass) {if(asyncProxyIntanceMap.containsKey(interfaceClass)) {return (RpcAsyncProxy) asyncProxyIntanceMap.get(interfaceClass);} else {RpcProxyImpl<T> asyncProxyInstance = new RpcProxyImpl<>(interfaceClass, timeout);asyncProxyIntanceMap.put(interfaceClass, asyncProxyInstance);return asyncProxyInstance;}}}

RpcProxy

远程接口的代理实现

public class RpcProxyImpl<T> implements InvocationHandler, RpcAsyncProxy {private Class<T> clazz;private long timeout;public RpcProxyImpl(Class<T> clazz, long timeout) {this.clazz = clazz;this.timeout = timeout;}/***     invoke代理接口调用方式*/@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//1.设置请求对象RpcRequest request = new RpcRequest();request.setRequestId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParamterTypes(method.getParameterTypes());request.setParamters(args);//2.选择一个合适的Client任务处理器RpcClientHandler handler = RpcConnectManager.getInstance().chooseHandler();//3. 发送真正的客户端请求 返回结果RpcFuture future = handler.sendRequest(request);return future.get(timeout, TimeUnit.SECONDS);}/***   $call 异步的代理接口实现, 真正的抱出去RpcFuture 给业务方做实际的回调等待处理*/@Overridepublic RpcFuture call(String funcName, Object... args) {//1.设置请求对象RpcRequest request = new RpcRequest();request.setRequestId(UUID.randomUUID().toString());request.setClassName(this.clazz.getName());request.setMethodName(funcName);request.setParamters(args);// TODO: 对应的方法参数类型应该通过 类类型 + 方法名称 通过反射得到parameterTypesClass<?>[] parameterTypes = new Class[args.length];for(int i = 0; i < args.length; i++) {parameterTypes[i] = getClassType(args[i]);}request.setParamterTypes(parameterTypes);//2.选择一个合适的Client任务处理器RpcClientHandler handler = RpcConnectManager.getInstance().chooseHandler();RpcFuture future = handler.sendRequest(request);return future;}private Class<?> getClassType(Object obj) {Class<?> classType = obj.getClass();String typeName = classType.getName();if (typeName.equals("java.lang.Integer")) {return Integer.TYPE;} else if (typeName.equals("java.lang.Long")) {return Long.TYPE;} else if (typeName.equals("java.lang.Float")) {return Float.TYPE;} else if (typeName.equals("java.lang.Double")) {return Double.TYPE;} else if (typeName.equals("java.lang.Character")) {return Character.TYPE;} else if (typeName.equals("java.lang.Boolean")) {return Boolean.TYPE;} else if (typeName.equals("java.lang.Short")) {return Short.TYPE;} else if (typeName.equals("java.lang.Byte")) {return Byte.TYPE;}return classType;}
}

RpcInvoker

客户方实现:负责编码调用信息和发送调用请求到服务方并等待调用结果返回
服务方实现:负责调用服务端接口的具体实现并返回调用结果

@Slf4j
public class RpcFuture implements Future<Object> {private RpcRequest request;private RpcResponse response;private long startTime;private static final long TIME_THRESHOLD = 5000;private List<RpcCallback> pendingCallbacks = new ArrayList<RpcCallback>();private Sync sync ;private ReentrantLock lock = new ReentrantLock();private ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(65536));public RpcFuture(RpcRequest request) {this.request = request;this.startTime = System.currentTimeMillis();this.sync = new Sync();}/***     $done 实际的回调处理* @param response*/public void done(RpcResponse response) {this.response = response;boolean success = sync.release(1);if(success) {invokeCallbacks();}// 整体rpc调用的耗时long costTime = System.currentTimeMillis() - startTime;if(TIME_THRESHOLD < costTime) {log.warn("the rpc response time is too slow, request id = " + this.request.getRequestId() + " cost time: " + costTime);}}/***  依次执行回调函数处理*/private void invokeCallbacks() {lock.lock();try {for(final RpcCallback callback : pendingCallbacks) {runCallback(callback);}            } finally {lock.unlock();}}private void runCallback(RpcCallback callback) {final RpcResponse response = this.response;executor.submit(new Runnable() {@Overridepublic void run() {if(response.getThrowable() == null) {callback.success(response.getResult());} else {callback.failure(response.getThrowable());}}});}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {throw new UnsupportedOperationException();}@Overridepublic boolean isCancelled() {throw new UnsupportedOperationException();}@Overridepublic boolean isDone() {return sync.isDone();}@Overridepublic Object get() throws InterruptedException, ExecutionException {sync.acquire(-1);if(this.response != null) {return this.response.getResult();} else {return null;}}@Overridepublic Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {boolean success = sync.tryAcquireNanos(-1, unit.toNanos(timeout));if(success) {if(this.response != null) {return this.response.getResult();} else {return null;}} else {throw new RuntimeException("timeout excetion requestId: " + this.request.getRequestId() + ",className: " + this.request.getClassName()+ ",methodName: " + this.request.getMethodName());}}class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -3989844522545731058L;private final int done = 1;private final int pending = 0;protected boolean tryAcquire(int acquires) {return getState() == done ? true : false;}protected boolean tryRelease(int releases) {if(getState() == pending) {if(compareAndSetState(pending, done)) {return true;}}return false;}public boolean isDone() {return getState() == done;}}/***    可以在应用执行的过程中添加回调处理函数* @param callback* @return*/public RpcFuture addCallback(RpcCallback callback) {lock.lock();try {if(isDone()) {runCallback(callback);} else {this.pendingCallbacks.add(callback);}} finally {lock.unlock();}return this;}}

RpcProtocol

负责协议编/解码

public class Serialization {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();private static Objenesis objenesis = new ObjenesisStd(true);public Serialization() {}private static <T> Schema<T> getSchema(Class<T> cls) {@SuppressWarnings("unchecked")Schema<T> schema = (Schema<T>) cachedSchema.get(cls);if (schema == null) {schema = RuntimeSchema.createFrom(cls);if (schema != null) {cachedSchema.put(cls, schema);}}return schema;}/***   序列化:对象->字节数组*/public static <T> byte[] serialize(T obj) {@SuppressWarnings("unchecked")Class<T> cls = (Class<T>) obj.getClass();LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {Schema<T> schema = getSchema(cls);return ProtostuffIOUtil.toByteArray(obj, schema, buffer);} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}}/***    反序列化(字节数组->对象)*/public static <T> T deserialize(byte[] data, Class<T> cls) {try {T message = objenesis.newInstance(cls);Schema<T> schema = getSchema(cls);ProtostuffIOUtil.mergeFrom(data, message, schema);return message;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}
}

RpcConnector

负责维持客户方和服务方的连接通道和发送数据到服务方

@Slf4j
public class RpcConnectManager {private static volatile RpcConnectManager  RPC_CONNECT_MANAGER = new RpcConnectManager();private RpcConnectManager() {}public static RpcConnectManager getInstance() {return RPC_CONNECT_MANAGER;}/*   一个连接的地址,对应一个实际的业务处理器(client) */private Map<InetSocketAddress, RpcClientHandler> connectedHandlerMap = new ConcurrentHashMap<InetSocketAddress, RpcClientHandler>();/*   所有连接成功的地址 所对应的 任务执行器列表 connectedHandlerList */private CopyOnWriteArrayList<RpcClientHandler> connectedHandlerList = new CopyOnWriteArrayList<RpcClientHandler>();/*    用于异步的提交连接请求的线程池 */private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(65536));private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);private ReentrantLock connectedLock = new ReentrantLock();private Condition connectedCondition = connectedLock.newCondition();private long connectTimeoutMills = 6000;private volatile boolean isRunning = true;private volatile AtomicInteger handlerIdx = new AtomicInteger(0);//1. 异步连接 线程池 真正的发起连接,连接失败监听,连接成功监听//2. 对于连接进来的资源做一个缓存(做一个管理)updateConnectedServer/***   $connect 发起连接方法* @param serverAddress*/public void connect(final String serverAddress) {List<String> allServerAddress = Arrays.asList(serverAddress.split(","));updateConnectedServer(allServerAddress);}/***     $更新缓存信息 并 异步发起连接*   192.168.11.111:8765,192.168.11.112:8765* @param allServerAddress*/public void updateConnectedServer(List<String> allServerAddress) {if(CollectionUtils.isNotEmpty(allServerAddress)) {// 1.解析allServerAddress地址 并且临时存储到我们的newAllServerNodeSet HashSet集合中HashSet<InetSocketAddress> newAllServerNodeSet = new HashSet<InetSocketAddress>();for(int i =0; i < allServerAddress.size(); i++) {String[] array = allServerAddress.get(i).split(":");if(array.length == 2) {String host = array[0];int port = Integer.parseInt(array[1]);final InetSocketAddress remotePeer = new InetSocketAddress(host, port);newAllServerNodeSet.add(remotePeer);}}//    2.调用建立连接方法 发起远程连接操作for(InetSocketAddress serverNodeAddress : newAllServerNodeSet) {if(!connectedHandlerMap.keySet().contains(serverNodeAddress)) {connectAsync(serverNodeAddress);}}//  3. 如果allServerAddress列表里不存在的连接地址,那么我需要从缓存中进行移除for(int i = 0; i< connectedHandlerList.size(); i++) {RpcClientHandler rpcClientHandler = connectedHandlerList.get(i);SocketAddress remotePeer = rpcClientHandler.getRemotePeer();if(!newAllServerNodeSet.contains(remotePeer)) {log.info(" remove invalid server node " + remotePeer);RpcClientHandler handler = connectedHandlerMap.get(remotePeer);if(handler != null) {handler.close();connectedHandlerMap.remove(remotePeer);}connectedHandlerList.remove(rpcClientHandler);}}} else {// 添加告警log.error(" no available server address! ");// 清除所有的缓存信息clearConnected();}}/***    $connectAsync 异步发起连接的方法* @param serverNodeAddress*/private void connectAsync(InetSocketAddress remotePeer) {threadPoolExecutor.submit(new Runnable() {@Overridepublic void run() {Bootstrap b = new Bootstrap();b.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new RpcClientInitializer());connect(b, remotePeer);}});}private void connect(final Bootstrap b, InetSocketAddress remotePeer) {//    1.真正的建立连接final ChannelFuture channelFuture = b.connect(remotePeer);//  2.连接失败的时候添加监听 清除资源后进行发起重连操作channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {log.info("channelFuture.channel close operationComplete, remote peer =" + remotePeer);future.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {log.warn(" connect fail, to reconnect! ");clearConnected();connect(b, remotePeer);}}, 3, TimeUnit.SECONDS);}});//  3.连接成功的时候添加监听 把我们的新连接放入缓存中channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if(future.isSuccess()) {log.info("successfully connect to remote server, remote peer = " + remotePeer);RpcClientHandler handler = future.channel().pipeline().get(RpcClientHandler.class);addHandler(handler);}}});}/***  $clearConnected*    连接失败时,及时的释放资源,清空缓存*   先删除所有的connectedHandlerMap中的数据*  然后再清空connectedHandlerList中的数据*/private void clearConnected() {for(final RpcClientHandler rpcClientHandler : connectedHandlerList) {// 通过RpcClientHandler 找到具体的remotePeer, 从connectedHandlerMap进行移除指定的 RpcClientHandlerSocketAddress remotePeer = rpcClientHandler.getRemotePeer();RpcClientHandler handler = connectedHandlerMap.get(remotePeer);if(handler != null) {handler.close();connectedHandlerMap.remove(remotePeer);}}connectedHandlerList.clear();}/***    $addHandler 添加RpcClientHandler到指定的缓存中*  connectedHandlerMap & connectedHandlerList*     * @param handler*/private void addHandler(RpcClientHandler handler) {connectedHandlerList.add(handler);InetSocketAddress remoteAddress = //(InetSocketAddress) handler.getRemotePeer();(InetSocketAddress) handler.getChannel().remoteAddress();connectedHandlerMap.put(remoteAddress, handler);//signalAvailableHandler 唤醒可用的业务执行器signalAvailableHandler();}/***     唤醒另外一端的线程(阻塞的状态中) 告知有新连接接入*/private void signalAvailableHandler() {connectedLock.lock();try {connectedCondition.signalAll();} finally {connectedLock.unlock();}}/***    $waitingForAvailableHandler 等待新连接接入通知方法* @return* @throws InterruptedException*/private boolean waitingForAvailableHandler() throws InterruptedException {connectedLock.lock();try {return connectedCondition.await(this.connectTimeoutMills, TimeUnit.MICROSECONDS);} finally {connectedLock.unlock();}}/*** $chooseHandler 选择一个实际的业务处理器* @return RpcClientHandler*/public RpcClientHandler chooseHandler() {CopyOnWriteArrayList<RpcClientHandler> handlers = (CopyOnWriteArrayList<RpcClientHandler>)this.connectedHandlerList.clone();int size = handlers.size();while(isRunning && size <= 0) {try {boolean available = waitingForAvailableHandler();if(available) {handlers = (CopyOnWriteArrayList<RpcClientHandler>)this.connectedHandlerList.clone();size = handlers.size();}} catch (InterruptedException e) {log.error(" wating for available node is interrupted !");throw new RuntimeException("no connect any servers!", e);}}if(!isRunning) {return null;}// 最终使用取模方式取得其中一个业务处理器进行实际的业务处理return handlers.get(((handlerIdx.getAndAdd(1) + size) % size));}/***     $stop 关闭的方法*/public void stop() {isRunning = false;for(int i = 0; i< connectedHandlerList.size(); i++) {RpcClientHandler rpcClientHandler = connectedHandlerList.get(i);rpcClientHandler.close();}// 在这里要调用一下唤醒操作signalAvailableHandler();threadPoolExecutor.shutdown();eventLoopGroup.shutdownGracefully();}/*** $reconnect 发起重连方法 需要把对应的资源进行释放* @param handler* @param remotePeer*/public void reconnect(final RpcClientHandler handler , final SocketAddress remotePeer) {if(handler != null) {handler.close();connectedHandlerList.remove(handler);connectedHandlerMap.remove(remotePeer);}connectAsync((InetSocketAddress) remotePeer);}}

RpcAcceptor

负责接收客户方请求并返回请求结果

@Slf4j
public class RpcSeverHandler extends SimpleChannelInboundHandler<RpcRequest> {private Map<String, Object> handlerMap;private ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 16, 600L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(65536));public RpcSeverHandler(Map<String, Object> handlerMap) {this.handlerMap = handlerMap;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {executor.submit(new Runnable() {@Overridepublic void run() {RpcResponse response = new RpcResponse();response.setRequestId(rpcRequest.getRequestId());try {Object result = handle(rpcRequest);response.setResult(result);} catch (Throwable t) {response.setThrowable(t);log.error("rpc service handle request Throwable: " + t);}ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if(future.isSuccess()) {// afterRpcHook}}});}});}/***     $handle 解析request请求并且去通过反射获取具体的本地服务实例后执行具体的方法* @param request* @return* @throws InvocationTargetException*/private Object handle(RpcRequest request) throws InvocationTargetException {String className = request.getClassName();Object serviceRef = handlerMap.get(className);Class<?> serviceClass = serviceRef.getClass();String methodName = request.getMethodName();Class<?>[] paramterTypes = request.getParamterTypes();Object[] paramters = request.getParamters();// JDK relect// CglibFastClass serviceFastClass = FastClass.create(serviceClass);FastMethod servicFastMethod = serviceFastClass.getMethod(methodName, paramterTypes);return servicFastMethod.invoke(serviceRef, paramters);}/***  $exceptionCaught 异常处理关闭连接*/public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("server caught Throwable: " + cause);ctx.close();}}

RpcRequest

封装RPC调用客户端请求

@Data
public class RpcRequest implements Serializable {private static final long serialVersionUID = 3424024710707513070L;private String requestId;private String className;private String methodName;private Class<?>[] paramterTypes;private Object[] paramters;}

RpcResponse

封装RPC调用服务端返回结果

@Data
public class RpcResponse implements Serializable {private static final long serialVersionUID = -7989400623370901861L;private String requestId;private Object result;private Throwable throwable;}

RpcProcessor

负责在服务方控制调用过程,包括管理调用线程池、超时时间等

RpcChannel

数据传输通道

RPC框架原理与实现相关推荐

  1. 一篇文章了解RPC框架原理

    转载自   一篇文章了解RPC框架原理 1.RPC框架的概念 RPC(Remote Procedure Call)–远程过程调用,通过网络通信调用不同的服务,共同支撑一个软件系统,微服务实现的基石技术 ...

  2. RPC框架原理及从零实现系列博客(二):11个类实现简单RPC框架

    项目1.0版本源码 https://github.com/wephone/Me... 在上一博文中 跟大家讲了RPC的实现思路 思路毕竟只是思路 那么这篇就带着源码给大家讲解下实现过程中的各个具体问题 ...

  3. 七个步骤,带你快速读懂 RPC 框架原理

    1. RPC框架的概念 RPC(Remote Procedure Call)–远程过程调用,通过网络通信调用不同的服务,共同支撑一个软件系统,微服务实现的基石技术.使用RPC可以解耦系统,方便维护,同 ...

  4. 一个高性能RPC框架原理剖析

    业务与底层网络通信分离 Server大部分主要分为两层: 网络接收层:负责监听端口,负责收包,编码,解码工作,负责将响应包回传给客户端. 业务处理层:负责接收网络接收层完整的包,如果是RPCserve ...

  5. Dubbo面试 - 如何自己设计一个类似 Dubbo 的 RPC 框架?

    Dubbo面试 - 如何自己设计一个类似 Dubbo 的 RPC 框架? 面试题 如何自己设计一个类似 Dubbo 的 RPC 框架? 面试官心理分析 说实话,就这问题,其实就跟问你如何自己设计一个 ...

  6. 从零开始实现RPC框架 - RPC原理及实现

    从零开始实现RPC框架 - RPC原理及实现 RPC概述 RPC(Remote Procedure Call)即远程过程调用,允许一台计算机调用另一台计算机上的程序得到结果,而代码中不需要做额外的编程 ...

  7. 1、RPC框架解析:开篇-什么是RPC?

    1.1.简介 从客户端转后台开发已经快三年了,决定沉淀一些系统性的东西,想了很多题目,最终决定写一篇关于RPC框架相关的吧. 准备从概念,应用,到实践总结出一个系列. 1.2.涉及知识 以gRPC为示 ...

  8. 走进Dubbo——RPC框架简介

    前言 dubbo是阿里开源的分布式rpc框架,在许多中小企业的微服务化过程中发挥着核心作用.但是想把dubbo运行起来也不是那么简单的,这几天我想搭个dubbo环境玩玩,一路受阻. 相信前来了解rpc ...

  9. RPC 就好像是谈一场异地恋

    RPC 作为目前的主流技术之一,它打破了某一项任务所需的计算资源只能靠一台计算机来实现的固有想法,对分布式计算.微服务等领域都有着重要而深远的影响. 从20世纪80年代至今近四十年的时间内,由RPC衍 ...

最新文章

  1. CSS超出部分隐藏,显示滚动条
  2. luogu P4035 [JSOI2008]球形空间产生器(高斯消元 / 模拟退火)
  3. Redis 高负载排查记录
  4. const的使用CC++
  5. stm32f103 spi slave从机模式miso需要上拉
  6. 第一阶段unity基础
  7. BZOJ3209(n的二进制表示中1的个数的乘积)
  8. 关于移动端的一些tip
  9. com.haodf.android,有坑!Android新版QQ获取packageInfo引发异常崩溃
  10. suse tomcat mysql_JDK TOMCAT MYSQL SUSE LINUX 环境搭建
  11. 技术团队管理实践及心得
  12. kali linux如何更新软件源
  13. 直流无刷电机工作原理
  14. python化学公式配平_最简单易懂的化学方程式的配平方法
  15. 【TouchDesigner】用Replicator制作选择器
  16. 2022年上海应届生落户公司要求!打分不够72的同学可以考虑!
  17. 大连医科大学中山学院计算机科学与技术,大连医科大学中山学院计算机科学与技术专业2016年在山西理科高考录取最低分数线...
  18. arduino 下16进制转2进制
  19. CRA 5.0.0加入代理后项目启动报错
  20. 红米k30至尊纪念版和华为p30pro的区别 哪个好

热门文章

  1. 《十年》中的没有颤抖的那两个字——“始于你好,终于你好”
  2. 服务器文件怎么删,怎么删除服务器文件
  3. 3、★☛基于STM32的手机通过wifi控LED灯√♠★
  4. java成神之路——网络编程
  5. Laravel Excel(maatwebsite/excel )导入
  6. 炫酷的生日快乐网页 【附带源码】
  7. eclipse下连接MYSQL教程
  8. 石家庄康业软件科技有限公司
  9. 江苏东方四通科技股份有限公司参观学习有感
  10. 怎么给word插入页码,详细图文教学,轻松学会