前言

关于Dubbo入门的网上教程也特别多,因此我没有专门出关于Dubbo的系列博文(主要呢… 也是在忙些工作上的事儿),用Dubbo特别简单,但是想要把Dubbo学好,学精还得花费不少时间的,特别是Dubbo的源码是非常值得研究的!有时间会再出一套关于Dubbo的源码解析,那么本篇博文呢,咱也不讲别的,主要是带各位看看Dubbo最核心的部分,也就是远程调用是如何实现的,如果这个给弄明白了,那么你对Dubbo的认知会更上一层楼!
如果是初学Dubbo的朋友建议不要往下看啦!以下代码纯本人手写,可能会有写的不严谨的地方,还请见谅啦!也欢迎各位指出不足~

Dubbo原理分析

好啦,废话咱也不多说,大家也知道Dubbo本质上就是使用JAVA开发的高性能RPC(Remote Procedure Call)框架,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议,当然这么说很笼统,很官方,也不好理解,那如何去理解这个RPC框架是干啥的呢?
还是举个栗子吧!也比方说以前呢,你一个单身狗(我不是针对在场的各位… 哈哈哈)在家,会自己给自己做饭吃,自给自足,做好了饭,自己就可以吃上了,这就是本地调用。
那有一天,你谈了个女朋友,女朋友在外面和闺蜜们逛街,肚子饿了,想回家就吃上饭,就call你,“晚上我回家吃饭哦”,那你不得屁颠屁颠的去买菜给准备准备,那么这种情况对于你女朋友来说就是远程调用!

懂了什么是远程调用,我们再来看这张Dubbo的基本架构图

首先Provider也就是服务的提供者会先运行,然后将服务注册到注册中心,这里的注册中心通常指的是Zookeeper,当Provider运行之后,会在Provider项目的Spring容器中生成相对应的服务对象,注意了!Provider和Consumer分别是在不同的JVM上运行的,因此Consumer是无法直接引用Provider中的对象的,这就是为什么我们要使用RPC框架来调用对象中的方法的根本原因了!

如上图,当Provider将AddServiceImpl服务注册到注册中心之后,其实并不是把这个对象传输给Zookeeper,而是注册了这个服务的 “身份信息”,我们通过 “身份信息” 可以找到这个方法, “身份信息” 具体包含哪些内容呢?
我这里就先卖个关子… 哈哈哈,下面手写框架会重点讲到。
当AddServiceImpl服务的信息被注册之后,Consumer会向注册中心订阅这个服务,此时Consumer就相当于是拥有了AddServiceImpl服务的所有信息,你可以把Consumer理解成你女朋友,Provider就是你了,你们俩约定了,做饭的事情你来,那么就好比在注册中心注册了 “做饭” 这个服务,当然可能还有其他的特殊服务我就不多说了… 当服务注册之后,接下来女朋友想去 “调用” 你去做饭,就只用打电话传呼你了,此时的电话号码就是服务的注册信息(当然这么说可能有些牵强,大致是这个意思啦!) 。

手写RPC框架

讲了这么多,主要是让大家熟悉一下Dubbo是如何工作的,以及核心功能是什么?接下来就进入今天的正题了,此次手写RPC框架,包含以下4个项目。

rpc是父项目,父项目怎么创建我就不多赘述啦,其余的几个项目都是子项目(Maven Moduel),rpc-core是核心项目,包含RPC调用的核心代码,rpc-myself扮演consumer的角色,rpc-roommate则扮演的是provider的角色,为了便于大家理解,我这里就以考试为例。
相信这么多年了,大家或多或少会有些抄袭的独门看家本领吧!(别说没有!我不信!) 那么此次呢,你将扮演的就是成绩很差的 “菜比”,而你的室友呢扮演的就是学霸的角色,你答应学霸,考试过了,就请吃饭!但前提是我这次复试你一定要把答案发给我,手机一定要保持开机状态,并且要一直盯着手机看!这次考试能不能过就看大神的了…
学霸一般都是老实人,就答应了,故事就这么开始了…

第一步:创建rpc父项目

注意要创建Maven的pom项目

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.marco</groupId><artifactId>rpc</artifactId><version>1.0</version><packaging>pom</packaging><name>远程调用</name><description>远程调用</description><modules><module>rpc-roommate</module><module>rpc-myself</module><module>rpc-domain</module></modules>
</project>
第二步:创建rpc-core项目

rpc-core项目包含手写Dubbo框架的核心代码,这里先简单介绍一下这个包里面的各个包的作用吧。anno是注解包,coder是用于对象的二进制字节码的转化,domain就不多解释啦,关于这里面的两个类我们下面会详细做介绍,invoker专门处理方法的调用,包含本地方法的调用,以及远程方法的调用,net主要用于网络传输,proxy则存放的代理类,service嘛主要是模拟服务接口。

rpc-core的内容就是上面这些啦!先看简单的anno注解包吧。
anno
@Expose 实则就是Dubbo中的@Service@Reference含义不变。@Expose 作用于类上,@Reference则作用于属性上。

package com.marco.anno;import static java.lang.annotation.RetentionPolicy.RUNTIME;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;@Retention(RUNTIME)
@Target(ElementType.TYPE)
public @interface Expose {}
package com.marco.anno;import static java.lang.annotation.RetentionPolicy.RUNTIME;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;@Retention(RUNTIME)
@Target(ElementType.FIELD)
public @interface Reference {}

coder
coder包的主要用途是为了将对象转换为二进制流,以及将二进制流转为对象,方便对象在网络上进行传输。

package com.marco.coder;import java.io.Serializable;public interface Coder {/*** 将对象编码为字节码* @param object* @return*/byte[] code(Serializable object);/*** 将字节数组转换为对象* @return*/Object decode(byte[] bt);
}

JDKCoderCoder的实现类,这里我们使用的还是最传统的JDK序列化方式传输对象,但是此种方式在实际开发中并不推荐,我们可以使用Hessian、Jackson…等等序列化框架优化对象的序列化,使之序列化后的二进制流成倍的缩小,提升传输的速度,当然,这里我们就不去引用别的框架啦!

package com.marco.coder;import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;public class JDKCoder implements Coder {@Overridepublic byte[] code(Serializable object) {try (ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(baos)) {// 将对象转为字节,写入到内存中并返回oos.writeObject(object);//为了避免客户端阻塞,因此采取在二进制流的最末尾设置标记(-1)的措施来规避此问题发生oos.writeByte(-1);return baos.toByteArray();} catch (Exception e) {e.printStackTrace();}return null;}@Overridepublic Object decode(byte[] bt) {try (ByteArrayInputStream bais = new ByteArrayInputStream(bt);ObjectInputStream ois = new ObjectInputStream(bais)) {// 将对象写入到内存中并返回Object object = ois.readObject();return object;} catch (Exception e) {e.printStackTrace();}return null;}
}

domain
上面咱们讲到,当Provider将AddServiceImpl服务注册到注册中心之后,其实并不是把这个对象传输给Zookeeper,而是注册了这个服务的 “身份信息”,我们通过 “身份信息” 可以找到这个方法,这个 “身份信息” 其实就是下面的Request对象(包含接口名称,方法名称,传入的参数信息),当然还包含你需要请求的服务注册在Zookeeper里的主机的IP地址和port端口号。
Reponse对象主要是用于回复信息给发送请求的消费端。


invoker
定义MethodInvoker接口,实现类为本地调用对象LocalInvoker,和远程调用对象RPCInvoker

package com.marco.invoker;import com.marco.domain.Request;
import com.marco.domain.Response;public interface MethodInvoker {Response invoker(Request request);
}
package com.marco.invoker;import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;import com.marco.domain.Request;
import com.marco.domain.Response;
import com.marco.spring.provider.RPCContextAware;/*** 本地调用* @author Marco**/
public class LocalInvoker implements MethodInvoker {/*** 用于缓存Class<?>对象*/private static Map<String,Class<?>> classCache = new HashMap<>();@Overridepublic Response invoker(Request request) {//获取request对象中本地调用的参数信息String interfaceName = request.getInterfaceName();String methodName = request.getMethod();Object[] args = request.getArgs();//获取被调用接口的实现类的完全限定名String implClassName = getImplClassName(interfaceName);Object answer = null;String msg = "success";Class<?> clazz = null;try {//通过反射获取到该实现类的Class对象if(classCache.containsKey(interfaceName)) {clazz = classCache.get(interfaceName);} else {clazz = Class.forName(implClassName);classCache.put(interfaceName,clazz);}Object obj = RPCContextAware.getBean(clazz);//将被调用方法的参数封装到paramTypes数组中Class<?>[] parameterTypes = new Class<?>[args.length];for (int i = 0; i < args.length; i++) {parameterTypes[i] = args[i].getClass();}//通过反射原理获取被调用方法的Method对象Method method = clazz.getMethod(methodName, parameterTypes);//通过invoke方法获取最终的结果并返回结果answer = method.invoke(obj,args);} catch (Exception e) {msg = e.getMessage();e.printStackTrace();}return new Response(answer, msg);}/*** 获取被调用接口的实现类的完全限定名,例如将com.marco.service.UserService* 转化为com.marco.service.impl.UserServiceImpl* @param interfaceName* @return*/private String getImplClassName(String interfaceName) {int lastIndexOf = interfaceName.lastIndexOf(".");String simpleClassName = interfaceName.substring(lastIndexOf);return interfaceName.replaceAll(simpleClassName, ".impl"+simpleClassName+"Impl");}
}
package com.marco.invoker;import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;import com.marco.coder.Coder;
import com.marco.coder.JDKCoder;
import com.marco.domain.Request;
import com.marco.domain.Response;
import com.marco.loadbalance.LoadBalance;
import com.marco.loadbalance.RandomLoadBalance;
import com.marco.net.ConsumerNet;
import com.marco.net.client.ConsumerNetImpl;
import com.marco.register.ServerDiscovery;
import com.marco.register.impl.ServerDiscoveryImpl;public class RPCInvoker implements MethodInvoker {private ConsumerNet consumerNet = new ConsumerNetImpl();private Coder coder = new JDKCoder();private LoadBalance loadBalance = new RandomLoadBalance();private ServerDiscovery discovery = new ServerDiscoveryImpl();@Overridepublic Response invoker(Request request) {//获取服务中所有的服务提供者List<String> servers = discovery.discovery(request.getInterfaceName());// 连接提供者String select = loadBalance.select(servers);String host = select.split(":")[0];Integer port = Integer.valueOf(select.split(":")[1]);consumerNet.connect(host, port);byte[] result = null;try (OutputStream outputStream = consumerNet.getOutputStream();BufferedOutputStream bos = new BufferedOutputStream(outputStream);InputStream inputStream = consumerNet.getInputStream();BufferedInputStream bis = new BufferedInputStream(inputStream)) {// 将request对象转成字节(二进制)并写出byte[] code = coder.code(request);bos.write(code);bos.flush();byte[] by = new byte[1024];int len = 0;ByteArrayOutputStream baos = new ByteArrayOutputStream();while ((len = bis.read(by)) != -1) {baos.write(by, 0, len);// 为了避免客户端阻塞,因此采取在最末尾设置标记(-1)的措施规避此问题发生byte lastByte = by[len - 1];if (lastByte == -1) {break;}}result = baos.toByteArray();} catch (IOException e) {e.printStackTrace();}// 将获取到的字节流转为对象Object decode = coder.decode(result);return (Response) decode;}
}

net
net包下的类,主要是封装了客户端和服务端的远程网络访问(Socket)方法,方便调用,当然Dubbo的底层肯定不是使用原生的Socket来进行远程通信,而是使用Netty框架来实现通信,毕竟Socket的效率实在是太低啦!
关于Netty的原理和解析大家可以参考博文 Netty高性能原理和框架架构解析

package com.marco.net;import java.io.InputStream;
import java.io.OutputStream;public interface ConsumerNet {/*** 连接提供者* @param ip* @param port*/void connect(String ip, int port);/*** 输入* @return*/InputStream getInputStream();/*** 输出* @return*/OutputStream getOutputStream();
}
package com.marco.net.client;import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;import com.marco.net.ConsumerNet;public class ConsumerNetImpl implements ConsumerNet {private Socket client;@Overridepublic void connect(String ip, int port) {try {//创建Socket客户对象client = new Socket(ip, port);} catch (Exception e) {e.printStackTrace();}}@Overridepublic InputStream getInputStream() {InputStream inputStream = null;try {inputStream = client.getInputStream();} catch (IOException e) {e.printStackTrace();}return inputStream;}@Overridepublic OutputStream getOutputStream() {OutputStream outputStream = null;try {outputStream = client.getOutputStream();} catch (IOException e) {e.printStackTrace();}return outputStream;}
}
package com.marco.net.server;import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;import com.marco.net.ProviderNet;public class ProviderNetImpl implements ProviderNet {private ServerSocket serverSocket = null;private Socket server;public ProviderNetImpl(int port) {//创建Socket服务对象try {serverSocket = new ServerSocket(port);} catch (IOException e) {e.printStackTrace();}}@Overridepublic void listen() {try {//开启监听server = serverSocket.accept();} catch (Exception e) {e.printStackTrace();}}@Overridepublic InputStream getInputStream() {InputStream inputStream = null;try {inputStream = server.getInputStream();} catch (IOException e) {e.printStackTrace();}return inputStream;}@Overridepublic OutputStream getOutputStream() {OutputStream outputStream = null;try {outputStream = server.getOutputStream();} catch (IOException e) {e.printStackTrace();}return outputStream;}
}

proxy
定义远程调用代理类接口RPCProxy

package com.marco.proxy;/*** 远程调用代理* @author Marco**/
public interface RPCProxy{/*** 获取代理对象* @param <T>* @param clazz* @return*/<T> T getProxy(Class<T> clazz);
}

创建代理类RPCObjectProxy实现RPCProxy接口,当消费者执行方法,例如addService.add(1,2)的时候会调用invoke方法,进而通过远程调用获取执行完成之后的结果。

package com.marco.proxy;import java.lang.reflect.Proxy;import com.marco.domain.Request;
import com.marco.invoker.MethodInvoker;public class RPCObjectProxy implements RPCProxy {private MethodInvoker methodInvoker;public RPCObjectProxy(MethodInvoker methodInvoker) {super();this.methodInvoker = methodInvoker;}@SuppressWarnings("unchecked")@Overridepublic <T> T getProxy(Class<T> clazz) {return (T) Proxy.newProxyInstance(RPCObjectProxy.class.getClassLoader(), new Class<?>[] { clazz },(proxy, method, args) -> {//创建请求对象Request request = new Request();request.setArgs(args);request.setInterfaceName(clazz.getName());request.setMethod(method.getName());return methodInvoker.invoker(request).getAnswer();});}
}

register
注册和发现服务的接口类ServerRegisterServerDiscovery

package com.marco.register;import java.util.List;public interface ServerDiscovery {List<String> discovery(String serverName);
}
package com.marco.register;public interface ServerRegister {void register(String serverName, String serverPath);
}

以上接口的实现类ServerRegisterImplServerDiscoveryImpl,顾名思义,用途就是方法的注册和方法的发现(引用)。

package com.marco.register.impl;import com.marco.register.ServerRegister;
import com.marco.utils.ZookeeperUtils;public class ServerRegisterImpl implements ServerRegister {@Overridepublic void register(String serverName, String serverPath) {ZookeeperUtils.addEphemeralNode(serverName, serverPath);}
}
package com.marco.register.impl;import java.util.List;import com.marco.register.ServerDiscovery;
import com.marco.utils.ZookeeperUtils;public class ServerDiscoveryImpl implements ServerDiscovery {@Overridepublic List<String> discovery(String serverName) {return ZookeeperUtils.getSubNode(serverName);}
}

utils
大家也发现了,上面的ServerRegisterImpl以及ServerDiscoveryImpl本质上就是调用了ZookeeperUtils中的addEphemeralNode()创建临时节点和getSubNode()获取子节点的方法,ZookeeperUtils中封装了我们手写Dubbo中所有的关于和Zookeeper集成的操作。
并且使用一个简单的 “缓存” serverCache,将被暴露并注册在Zookeeper的服务进行缓存处理,适当的提高了框架的性能,但是需要注意的一点是,当Provider服务端因为某种原因,导致服务停止之后,Zookeeper会监控到这个异常,并且会通知到Consumer,那么此时我们的缓存serverCache就必须得更新或者删除节点得信息。
否则,如果不更新缓存,假设当Provider的服务子节点有更新,从localhost:8888变成localhost:9999,那么此时我们不更新缓存读出来的数据依然是localhost:8888,此时我们根据这个 “身份信息” 再次去连接服务端时,必定是会报 java.net.ConnectException: Connection refused 等连接异常的。

因此在以下得代码中,我通过zkClient.subscribeChildChanges()的监听子节点方法,解决了缓存的脏数据读取问题。

package com.marco.utils;import java.util.HashMap;
import java.util.List;
import java.util.Map;import org.I0Itec.zkclient.ZkClient;/*** 操作一: zk 创建持久节点* 操作二: zk 创建临时节点* 操作三: zk 获取父节点里面的子节点* 操作四: zk 订阅节点* @author Marco**/
public class ZookeeperUtils {/*** Zookeeper的连接地址*/private static final String ZK_URL = "127.0.0.1:2181";/*** 声明ZkClient*/private static ZkClient zkClient = null;/*** 缓存,key为服务名称,value为该服务下的类的全类名*/private static Map<String,List<String>> serverCache = new HashMap<String,List<String>>();/*** 初始化ZkClient*/static {zkClient = new ZkClient(ZK_URL, 5*1000, 20*1000);}/*** 创建持久节点* @param nodeName*/public static void addPersistentNode(String nodeName) {if(!zkClient.exists("/" + nodeName)) {//若节点不存在,则创建新的节点zkClient.createPersistent("/" + nodeName);}System.out.println("节点创建完成");}/*** 创建临时节点* @param parentNodeName* @param nodeName*/public static void addEphemeralNode(String parentNodeName, String nodeName) {//若父节点不存在,则创建新的节点addPersistentNode(parentNodeName);zkClient.createEphemeral("/" + parentNodeName + "/" + nodeName);System.out.println("临时节点创建完成");/*try {System.in.read();} catch (IOException e) {e.printStackTrace();}*/}/*** 获取子节点* @param nodeName* @return*/public static List<String> getSubNode(String serverName) {//如果缓存中有serverName的服务,则直接从缓存中获取if(serverCache.containsKey(serverName)) {return serverCache.get(serverName);}if(!zkClient.exists("/" + serverName)) {throw new RuntimeException("没有" + serverName + "的服务提供者");}List<String> servers = zkClient.getChildren("/" + serverName);//如果缓存中没有serverName的服务,则存入缓存中serverCache.put(serverName, servers);/*** 采用节点订阅的方式,解决节点缓存的数据脏读问题*/zkClient.subscribeChildChanges("/" + serverName, (parentPath, currentChilds) -> {System.out.println("已解决缓存的脏读问题!");serverCache.put(serverName, currentChilds);});return servers;}
}

这里我们采用节点订阅的方式,解决节点缓存的数据脏读问题,另外需要提到的时,当服务挂掉的时候,该服务依然可以被访问,因为缓存中还有该服务的实例,zookeeper节点的上下线,实际上就是通过zookeeper中对子节点的监听来实现的。
spring
接下来就是我们的集成spring的包,可以算是咱们核心包中的核心了,直接亮出代码吧!

ReferenceBeanContext类中,我们实现了spring的BeanPostProcessor接口,并重写postProcessBeforeInitialization()postProcessAfterInitialization(),这两个方法很类似,但是依然有区别,前者是在bean实例化,依赖注入之后及自定义初始化方法(例如:配置文件中bean标签添加init-method属性指定Java类中初始化方法、@PostConstruct注解指定初始化方法,Java类实现InitailztingBean接口)之前调用。
而后置处理器的postProcessorAfterInitailization方法是在bean实例化、依赖注入及自定义初始化方法之后调用,实现BeanPostProcessor并借用此方法的主要目的是为了查找IoC容器中的是否拥有属性上设置@Reference注解的对象,如果有,则获取这个属性的名称,得到它的接口,并从IoC容器中取出这个和接口对应的实现类对象,而这个实现类对象,恰巧是我们使用@Expose(等同于Dubbo中的@Service)注解暴露并注册的服务类对象。

package com.marco.spring.consumer;import java.lang.reflect.Field;import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;import com.marco.anno.Reference;
import com.marco.invoker.RPCInvoker;
import com.marco.proxy.RPCObjectProxy;
import com.marco.proxy.RPCProxy;@Component
public class ReferenceBeanContext implements BeanPostProcessor{private RPCProxy rpcProxy = new RPCObjectProxy(new RPCInvoker());@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return bean;}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Field[] declaredFields = bean.getClass().getDeclaredFields();if(null != declaredFields && declaredFields.length > 0) {for (Field field : declaredFields) {//检查该属性上是否有@Reference注解Reference reference = field.getAnnotation(Reference.class);//如果属性上有@Reference注解,则将代理对象注入给该属性if(null != reference) {System.out.println(beanName + "类中的属性" + field.getName() + "上有" + reference + "注解");//设置强制注入field.setAccessible(true);Class<?> clazz = field.getType();//获取代理对象Object proxy = rpcProxy.getProxy(clazz);//注入代理对象try {field.set(bean, proxy);} catch (IllegalArgumentException | IllegalAccessException e) {e.printStackTrace();}}}}return bean;}
}
package com.marco.spring.provider;import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;import com.marco.anno.Expose;
import com.marco.coder.Coder;
import com.marco.coder.JDKCoder;
import com.marco.domain.Request;
import com.marco.domain.Response;
import com.marco.invoker.LocalInvoker;
import com.marco.invoker.MethodInvoker;
import com.marco.net.ProviderNet;
import com.marco.net.server.ProviderNetImpl;
import com.marco.register.ServerRegister;
import com.marco.register.impl.ServerRegisterImpl;@Component
public class RPCContextAware implements ApplicationContextAware, InitializingBean{/*** IoC容器对象*/private static ApplicationContext applicationContext = null;/*** 用于保存被暴露的服务*/private Map<String,String> exposedServers = new HashMap<String, String>();/*** 用于将服务注册到注册表中*/private ServerRegister register = new ServerRegisterImpl();/*** 用于编码、解码*/private Coder coder = new JDKCoder();/*** 本地调用处理器*/private MethodInvoker methodInvoker = new LocalInvoker();/*** 需要监听端口,默认不设置则为6666*/@Value("${port:6666}")private int port;/*** 提供服务者的网络连接装置*/private ProviderNet providerNet;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {/*** 在IoC容器中搜索类上有Expose注解的类 * key 类的名称,如addServiceImpl  * value 对象*/RPCContextAware.applicationContext = applicationContext;Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Expose.class);if(!beans.isEmpty()) {beans.forEach((k,v) -> {Class<?>[] interfaces = v.getClass().getInterfaces();//添加需要被暴露的服务exposedServers.put(interfaces == null ? v.getClass().getName():interfaces[0].getName(), "localhost:" + port);});}System.out.println("需要注册的服务有" + exposedServers);}@Overridepublic void afterPropertiesSet() throws Exception {/*** 将暴露的服务注册到Zookeeper中*/exposedServers.forEach((k,v) -> {register.register(k, v);System.out.println(k + "服务已经注册成功,暴露服务的地址是" + v);});listen();}private void listen() {System.out.println("*************监听就绪***************");providerNet = new ProviderNetImpl(port);// 循环监听port端口(等待菜比队友的通知),解决服务端被请求一次就死机的问题while(true) {providerNet.listen();// 当收到消息之后,监听中止,取消阻塞状态Request request = null;System.out.println("收到菜比室友的题目了...");try (InputStream inputStream = providerNet.getInputStream();BufferedInputStream bis = new BufferedInputStream(inputStream);OutputStream outputStream = providerNet.getOutputStream();BufferedOutputStream bos = new BufferedOutputStream(outputStream)) {byte[] by = new byte[1024];int len = 0;ByteArrayOutputStream baos = new ByteArrayOutputStream();while ((len = bis.read(by)) != -1) {baos.write(by, 0, len);//字节数组长度没有1024表示这是最后一次读取数据了if(len < 1024) {break;}}// 获取request对象,通过coder的decode解码方法将二进制流转为Request对象request = (Request) coder.decode(baos.toByteArray());System.out.println("题目为" + request.toString());// 执行invoker方法获取结果Response answer = methodInvoker.invoker(request);System.out.println("计算出来的答案为" + answer);bos.write(coder.code(answer));} catch (Exception e) {e.printStackTrace();}}}/*** 通过clazz从IoC容器中获取已经被创建出来的对象* @param clazz* @return*/public static Object getBean(Class<?> clazz) {return applicationContext.getBean(clazz);}
}

loadbalance

loadbalance包主要是为了实现负载均衡,目前只添加了Random随机选择的策略,有兴趣的朋友可以试着添加轮询RoundRobin 、ConsistentHash等策略。

package com.marco.loadbalance;import java.util.List;public interface LoadBalance {/*** 从list集合中挑选一个服务出来* @param servers* @return*/String select(List<String> servers);
}
package com.marco.loadbalance;import java.util.List;
import java.util.Random;public class RandomLoadBalance implements LoadBalance {private static Random random = new Random();@Overridepublic String select(List<String> servers) {if(null == servers || servers.isEmpty()) {throw new RuntimeException("当前注册服务列表为空!");}if(servers.size() == 1) {return servers.get(0);}int nextInt = random.nextInt(servers.size());return servers.get(nextInt);}
}

service
AddService类主要是模拟被暴露的服务接口,就暂且先放在这里吧。

package com.marco.service;public interface AddService {Integer add(Integer a, Integer b);
}

最后,我们需要导入rpc-core项目所依赖的jar包spring-context

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.marco</groupId><artifactId>rpc</artifactId><version>1.0</version></parent><artifactId>rpc-core</artifactId><dependencies><!-- https://mvnrepository.com/artifact/com.101tec/zkclient --><dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework/spring-context --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.3.16.RELEASE</version></dependency></dependencies>
</project>

第三步:创建rpc-roommate项目

以上就是我们的难点部分,接下来创建的这两个子项目主要是为了测试我们的手写Dubbo框架是否能正常运行,达到我们想要的效果。

大家注意看AddServiceImpl类上的注解,添加@Expose注解的目的正是将服务暴露出去,并在Zookeeper中注册,关于这一块的逻辑业务代码请参照上面的RPCContextAware类的实现。

package com.marco.service.impl;import com.marco.anno.Expose;
import com.marco.service.AddService;@Expose
public class AddServiceImpl implements AddService {@Overridepublic Integer add(Integer a, Integer b) {return a + b;}
}

Roommate可以把它当作是一个启动类,因为provider的核心业务逻辑代码在RPCContextAware类中,但是它和我们当前的Roommate并不在一个项目里面,因此需要利用xml文件将RPCContextAware加载并置入Spring的IoC容器中,这样才能使RPCContextAware中得方法被执行调用,完成服务得注册,以及服务端的监听。
所以我们通过new ClassPathXmlApplicationContext()加载spring-rpc-provider.xml配置文件。

import org.springframework.context.support.ClassPathXmlApplicationContext;public class Roommate {public static void main(String[] args) {System.out.println("等待菜比室友发题目过来");ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-rpc-provider.xml");context.start();}
}

spring-rpc-provider.xml配置文件内容如下。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd"><context:component-scan base-package="com.marco.service.impl"></context:component-scan><context:property-placeholder location="classpath:/properties/app.properties"/><bean name="rPCContextAware" class="com.marco.spring.provider.RPCContextAware"></bean>
</beans>

除了RPCContextAware需要被放入IoC容器中,我们的AddServiceImpl也应该被扫描,置入容器中,这就是为何我在@Expose注解中添加了@Component元注解的原因。

另外还有一个小细节,就是properties/app.properties文件的加载。

不知道大家有没有留意RPCContextAware中的这段代码,通过SpEL表达式,动态的获取端口的值,仅需要在app.properties文件中添加你要运行的服务的端口号(例如:port=6666)就可以了,:6666的意思是当配置文件什么都没有填写的时候,默认会以port=6666来启动我们的服务。

/*** 需要监听端口,默认不设置则为6666*/
@Value("${port:6666}")
private int port;

我们大可以尝试运行一下,比方说端口号是7777,运行之后的结果如下

假如我们没有写任何端口号,则执行port=6666

我们接着再运行一个port=8888的服务,并查看我们的Zookeeper注册中心,发现服务注册成功了!并且子节点有三个!显然是正常的状态。


第四步:创建rpc-myself项目

终于到我们的最后一步了。rpc-myself项目模拟的就是消费端,结构如下

AddController类主要为了模拟远程获取AddService服务,相关的业务逻辑核心代码请参照上面的ReferenceBeanContext类。

package com.marco.controller;import org.springframework.stereotype.Component;import com.marco.anno.Reference;
import com.marco.service.AddService;@Component
public class AddController {@Referenceprivate AddService addService;public Integer add(Integer a, Integer b) {return addService.add(a, b);}
}

最后是Consumer端的启动类Myself

package com.marco;import org.springframework.context.support.ClassPathXmlApplicationContext;import com.marco.controller.AddController;public class Myself {public static void main(String[] args){System.out.println("这道题目不会做 1+1=? 我要求教我的室友");ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-rpc-consumer.xml");
//      AddService addService = new RPCObjectProxy(new RPCInvoker()).getProxy(AddService.class);AddController addController = context.getBean(AddController.class);while(true) {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}Integer answer = addController.add(1, 4);System.out.println("室友发送过来的答案是" + answer.toString());}}
}

同样的,Myself中也是通过new ClassPathXmlApplicationContext("classpath:spring-rpc-consumer.xml")的方式将ReferenceBeanContext类和AddController类注入到IoC容器中。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><context:component-scan base-package="com.marco.controller"></context:component-scan><bean name="referenceBeanContext" class="com.marco.spring.consumer.ReferenceBeanContext"></bean>
</beans>

终于完成了!赶紧启动看看效果吧!

通过测试发现,我们的random策略着实生效了,每间隔5秒钟,消费端就会随机挑选一个服务端,并通过Socket向服务户端发送一次请求,收到Request请求的服务户端,则会解析Request对象,找到对应的服务方法,将方法执行完成的结果,通过Socket返回给消费端,通过这种方式,将远程调用的所有过程全部 “隐藏”。这就是神奇的RPC框架!!

Marco's Java【Dubbo 之手写Dubbo框架实现远程调用】相关推荐

  1. 【Java ORM】手写ORM框架:源代码、jar、生成JavaDoc文档

    SORMSourceCode 把这个手写框架取名为SORM. (1)源代码(Eclipse项目文件):GitHub地址 (2)jar包.源代码.JavaDoc文档.使用说明:Github地址 一个简单 ...

  2. 手写篇:如何手写RPC框架?

    手写篇:如何手写RPC框架? 首先我们讲下什么是RPC? RPC(Remote Procedure Call)远程过程调用协议,他是一种通过网络从远程计算机程序请求服务.简单的来说,就是通过网络进行远 ...

  3. MyRPCDemo netty+jdk动态代理+反射+序列化,反序列化手写rpc框架

    RPC RPC(remote procedure call)远程过程调用 RPC是为了在分布式应用中,两台主机的Java进程进行通信,当A主机调用B主机的方法时,过程简洁,就像是调用自己进程里的方法一 ...

  4. 手写ORM框架----(数据库单表CRUD万能框架)

    目录 一.准备 1.1 ORM介绍 1.2 准备工作 二.手写ORM的CRUD 2.1 数据库准备 2.2 所需注解 2.3 实体类student 2.4 CRUD 2.4.1 添加功能 2.4.2 ...

  5. Golang之手写web框架

    Go手写Web框架 1.1 标准启动方式 通过定义接口,使用 net/http 库封装基础的功能,通过自定义函数的方式可以自定义 StandardStart.go // Handler 用于实现处理器 ...

  6. JAVA项目代码手写吗_一个老程序员是如何手写Spring MVC的

    见人爱的Spring已然不仅仅只是一个框架了.如今,Spring已然成为了一个生态.但深入了解Spring的却寥寥无几.这里,我带大家一起来看看,我是如何手写Spring的.我将结合对Spring十多 ...

  7. arraylist下标从几开始_剖析JAVA面试题 手写ArrayList的实现,在笔试中过关斩将?...

    面试官Q1:可以手写一个ArrayList的简单实现吗? 我们都知道ArrayList是基于数组实现,如果让你实现JDK源码ArrayList中add().remove().get()方法,你知道如何 ...

  8. java appdata_纯手写!!转移系统用户资料和更改AppData路径的方法 不接受反驳!...

    对于Windows用户来讲,当下最流行的还是win7操系.以后的趋势肯定是win10,只是时间的考验 相信大家在用windows系统的时,一直有个问题在困扰大家包括我 就是刚装的sys,无论是98.M ...

  9. 手写识别ocr java,怎么识别手写文字?迅捷OCR文字识别软件帮你快速完成!

    怎么识别手写文字?虽然现在手机.平板等设备已经普及开来,但是从小在学校养成的习惯,还是让大部分人选择会手写的方式.手写其实也有很大的缺陷,无论是在生活中还是在网络上进行分享都比较困难. 那么有没有将手 ...

最新文章

  1. C和C++安全编码笔记:总结
  2. qt快速加载图片_Qt实用技巧:使用Qt加载超大图片的耗时测试
  3. 进程在与Windows Process Activation Service通信时出现严重错误 w3wp.exe错误
  4. 华为Dorado固态存储技术手册合集
  5. IDEA将项目上传至码云/GitHub托管
  6. python迭代器是什么百度百科,python迭代器的接口是什么?
  7. pep3评估报告解读_quot;聚焦慢病、助力医改,检验项目风险评估培训计划“大兴区第四期培训班成功举办...
  8. 产生随机小数_如果取到小数区间内的任一数字?
  9. 分页技术与JDBC一览
  10. [再学Python] - 6 - 函数的定义调用与返回
  11. 20145206《Java程序设计》实验五Java网络编程及安全
  12. 插件 微信 自动 抢红包
  13. IE主页简单篡改修复
  14. 线上数据库增加字段导致服务请求超时总结
  15. 【网络通信 -- 直播】SRS 实战记录 -- 开源流媒体服务器对比与 SRS 直播效果测试
  16. 实现自动化测试,首先不是一个技术问题
  17. antv g2字体阴影_antv-g2学习手册-中
  18. Vue Antdv 上传组件(a-upload、a-upload-dragger)二次封装(DZMAntdvUpload)
  19. C++经典算法题-排列组合
  20. python调用图灵机器人实现微信公众号的自动回复功能

热门文章

  1. 常用的第三方api汇总[获取天气]
  2. 重装Linux系统初始化步骤
  3. decimal.TryParse int.TryParse DateTime.TryParse
  4. 2021-2027全球与中国磁吸假睫毛市场现状及未来发展趋势
  5. python元组和列表的联系_Python元组与列表
  6. 比例放大器设置接线US-DAS1/US-DAS2
  7. 量子多方秘密共享 OQQ-LOCC(三)
  8. Python-OpenCV改变摄像头输出编码格式和帧率
  9. uniapp 切换WIFI
  10. 分布式事务框架Seata原理详解