Dubbo之服务导入流程解析

接着上回《Dubbo之服务暴露流程浅析》,上一篇文章已经介绍完了Dubbo的服务提供者的服务暴露的整个流程,本文主要介绍Dubbo服务消费者的服务导入流程。

前言:

Dubbo服务消费者的服务相关代码下:

xml配置:配置了两个

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"xmlns="http://www.springframework.org/schema/beans"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"><!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),don't set it same as provider --><dubbo:application name="demo-consumer"/><!-- use multicast registry center to discover service --><dubbo:registry group="aaa" address="zookeeper://127.0.0.1:2181"/><!-- generate proxy for the remote service, then demoService can be used in the same way as thelocal regular interface --><dubbo:reference id="helloService" check="false" interface="org.apache.dubbo.samples.api.client.HelloService"/><dubbo:reference id="greetingService" check="false" interface="org.apache.dubbo.samples.api.client.GreetingService"  init="true"/></beans>

消费者启动类:

public class BasicConsumerBootstrap {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-demo-consumer.xml"});context.start();HelloService helloService = (HelloService) context.getBean("helloService"); /while (true) {try {Thread.sleep(1000);String hello = helloService.sayHello("world"); System.out.println(hello); } catch (Throwable throwable) {throwable.printStackTrace();}}}
}

Dubbo的引入服务的方式

Dubbo引入服务的方法有如下两种方式:

1.懒汉式引入服务

​ Dubbo默认的引入服务的方式为懒汉式,当我们的程序从容器中获取对应的Bean的时候会进行Dubbo的Consumer Bean的装载。

具体配置如下:

PS:对应具体的代码则是在 (HelloService) context.getBean(“helloService”)的时候装载Bean。

​ Spring懒汉式引入Dubbo的Bean的流程如下:

2.饿汉式引入服务

饿汉式引入服务需要我们在Dubbo的Consumer 配置中额外配置reference 的init属性为true。然后Spring则会在容器启动时,装载Dubbo的Bean,即 new ClassPathXmlApplicationContext(new String[]{“spring/dubbo-demo-consumer.xml”})的时候。

具体配置如下:

Spring饿汉式引入Dubbo服务的流程如下:

Dubbo之服务引用流程

从Dubbo的引入服务方式的流程来看,无论是懒汉式引入服务的方式 还是说饿汉式引入服务的方式,最终都会调用到ReferenceBean#getObject()的方法。接下来我们会着重解析ReferenceBean#getObject()这个方法。

然而ReferenceBean的getObject()是实现于FactoryBean的getObject方法,那么什么时候FactoryBean呢?下面是网上较多的解释:

FactoryBean是一个工厂Bean,可以生成某一个类型Bean实例,它最大的一个作用是:可以让我们自定义Bean的创建过程。FactoryBean本质就是用来给我们实例化、或者动态的注入一些比较复杂的Bean,比如像一些接口的代理对象。

可以总结以下几点:

  1. FactoryBean是个Bean,泛指一种类型。
  2. 可以自定义创建过程,比较灵活
  3. 通常配合代理模式一起使用
  4. 常用的使用场景:
    1. Mybatis的DAO的Bean的生成。
    2. Dubbo consumer Bean的生成。
    3. Feign Client Bean的生成。

关于Dubbo引入服务流程,由于直接撸源码会比较干,我们先简单过一下Dubbo整体流程:

整体流程:

1.校验Dubbo的配置及初始化

关于Dubbo 的配置类的介绍 可以查看上一篇文章《Dubbo地址服务暴露流程浅析》,相关的类继承关系上一篇文章也有介绍。有兴趣的可以翻一下上一篇文章。

2.导入服务,构造Invoker

通过Protocol的refer方法进行Invoker对象的构造,具体流程如下:

1.将服务注册到注册中心

Dubbo会通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 构造一个Protocol的Adaptive类,然后根据适配类走整个调用链路。

在把服务注册到注册中心的过程中,不会涉及到Filter、Listener 链式组装(在调用DubboProtocol.refer的过程中会涉及到),但是Dubbo默认会开启QOS的服务,用于服务的监控。

最后调用RegistryProtocol的refer方法,将Dubbo的服务注册到注册中心中。

2.开启Netty 客户端

在RegistryProtocol的refer方法中, 第一步会将Dubbo服务注册到注册中心中,然后会调用directory的subscribe方法,间接调用Protocol.refer方法从而触发DubboProtocol的refer调用生成DubboInvoker,并将DubboInvoker 缓存到RegistryDirectory中,以便后续调用时使用。

同理,Dubbo会通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 构造一个Protocol的Adaptive类,然后根据适配类走整个调用链路。

1.ProtocolFilterWrapper的refer方法,组装一个Filter的链
Dubbo默认包含:ConsumerContextFilter、FutureFilter、MonitorFilter。

2.ProtocolListenerWrapper的refer方法,会组装一个Listener的列表(Dubbo默认不组装)

3.QosProtocolWrapper:默认啥都不做。

4.DubboProtocol的refer方法,构造一个DubboInvoker,DubboInvoker中包含了NettyClient。

3.构造Invoker,缓存到注册表

仍然是通过Dubbo 的SPI机制加载的Cluster的服务提供者,具体的调用链路如下:

通过这个流程会构造一个MockClusterInbvoker,具体的结构如下

3.生成代理Bean

1.将步骤2生成的Invoker 通过ProxyFactory.getProxy(invoker) 生成代理类。

相关代理类的结构:

源码的主要流程

1.会通过调用ReferenceBean的getObject() 执行Dubbo 服务导入的流程。

(1)通过refProtocol.refer的方法获取invoker(会通过Dubbo的SPI 生成调用链: Adaptive、 ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper、RegistryProtocol)

(2)最终会调用RegistryProtocol的refer方法:
1> 注册服务

​ 2> 通过DubboProtocol的refer生成DubboInvoker(DubboInvoker中持有一个NettyClient用于网络请求),然后由各层Wapper类装饰(其中FilterWrapper会封装Filter链)。

​ 3> new InvokerDelegate(invoker)

​ 4> 在通过cluster.join(directory) 生成MockClusterInvoker 如上图。

  1. 通过proxyFactory.getBean(invoker)创建一个Bean。

源码解析

由于ReferenceBean实现了InitializingBean接口,所以会现在ReferenceBean的afterPropertiesSet()的方法

具体源码如下:

@Override
@SuppressWarnings({"unchecked"})
public void afterPropertiesSet() throws Exception {// 判断是否有consumer 配置,有则设置(一般情况下不用)if (getConsumer() == null) {Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);if (consumerConfigMap != null && consumerConfigMap.size() > 0) {ConsumerConfig consumerConfig = null;for (ConsumerConfig config : consumerConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {if (consumerConfig != null) {throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);}consumerConfig = config;}}if (consumerConfig != null) {setConsumer(consumerConfig);}}}// 判断是否有 Application配置, 有则设置application 属性 (xml配置的application配置,默认有值,因为主要配置application)if (getApplication() == null&& (getConsumer() == null || getConsumer().getApplication() == null)) {Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);if (applicationConfigMap != null && applicationConfigMap.size() > 0) {ApplicationConfig applicationConfig = null;for (ApplicationConfig config : applicationConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {if (applicationConfig != null) {throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);}applicationConfig = config;}}if (applicationConfig != null) {setApplication(applicationConfig);}}}// 判断是否有Module配置,有则设置moudle属性(一般情况下 没有值)if (getModule() == null&& (getConsumer() == null || getConsumer().getModule() == null)) {Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);if (moduleConfigMap != null && moduleConfigMap.size() > 0) {ModuleConfig moduleConfig = null;for (ModuleConfig config : moduleConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {if (moduleConfig != null) {throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);}moduleConfig = config;}}if (moduleConfig != null) {setModule(moduleConfig);}}}// 判断是否有Registry配置,有则设置Registry属性(一般情况下 有值)if ((getRegistries() == null || getRegistries().isEmpty())&& (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().isEmpty())&& (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);if (registryConfigMap != null && registryConfigMap.size() > 0) {List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();for (RegistryConfig config : registryConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {registryConfigs.add(config);}}if (registryConfigs != null && !registryConfigs.isEmpty()) {super.setRegistries(registryConfigs);}}}// 判断是否有Monitor配置,有则设置if (getMonitor() == null&& (getConsumer() == null || getConsumer().getMonitor() == null)&& (getApplication() == null || getApplication().getMonitor() == null)) {Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);if (monitorConfigMap != null && monitorConfigMap.size() > 0) {MonitorConfig monitorConfig = null;for (MonitorConfig config : monitorConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {if (monitorConfig != null) {throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);}monitorConfig = config;}}if (monitorConfig != null) {setMonitor(monitorConfig);}}}// 判断xml配置中 是否配置了init属性为true, 如果没有设置,则为懒加载,等这个Consumer 真正需要使用到的时候再去加载。Boolean b = isInit();if (b == null && getConsumer() != null) {b = getConsumer().isInit();}// 判断init属性 是不是不为空,且为true ? 不满足条件则懒加载, 满足条件则直接加载if (b != null && b.booleanValue()) {// 直接预加载(**重点**)getObject();}
}
核心逻辑:
1. 判断是否有Consumer配置,有则设值。
2. 判断是否有Application配置,有则设值。
3. 判断是否有Module配置,有则设值。
4. 判断是否有Registry配置,有则设置。
5. 判断是否有Monitor配置,有则设置。
6. 判断对应的配置是否为懒加载, 不是懒加载则直接加载。

接下来 我们看一下getObject()的逻辑:

注:ReferenceBean为FactoryBean,不了解FactoryBean可以先自行百度一下。

@Override
public Object getObject() throws Exception {// 调用get()方法进行加载return get();
}/*** get()方法*/
public synchronized T get() {// 如果destroyed了 则直接抛错if (destroyed) {throw new IllegalStateException("Already destroyed!");}// 判断ref 是否为空(默认为空),为空则加载init()方法if (ref == null) {init();}return ref;
}/*** init()方法*/
private void init() {// 如果已经初始化了,则返回if (initialized) {return;}initialized = true;// 如果interfaceName 为空,则报错if (interfaceName == null || interfaceName.length() == 0) {throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");}// get consumer's global configuration// 获取consumer的全局配置,有则加载,没有则初始化一个checkDefault();// 获取当前配置的全局配置,有则加载appendProperties(this);// 设置泛化调用if (getGeneric() == null && getConsumer() != null) {setGeneric(getConsumer().getGeneric());}// 如果是泛化调用,则设值interfaceClass 为GenericService.classif (ProtocolUtils.isGeneric(getGeneric())) {interfaceClass = GenericService.class;} else {// 获取接口类型try {interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}// 校验接口和方法(不是接口报错,没有方法报错)checkInterfaceAndMethods(interfaceClass, methods);}String resolve = System.getProperty(interfaceName);String resolveFile = null;if (resolve == null || resolve.length() == 0) {resolveFile = System.getProperty("dubbo.resolve.file");if (resolveFile == null || resolveFile.length() == 0) {File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");if (userResolveFile.exists()) {resolveFile = userResolveFile.getAbsolutePath();}}if (resolveFile != null && resolveFile.length() > 0) {Properties properties = new Properties();FileInputStream fis = null;try {fis = new FileInputStream(new File(resolveFile));properties.load(fis);} catch (IOException e) {throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);} finally {try {if (null != fis) fis.close();} catch (IOException e) {logger.warn(e.getMessage(), e);}}resolve = properties.getProperty(interfaceName);}}if (resolve != null && resolve.length() > 0) {url = resolve;if (logger.isWarnEnabled()) {if (resolveFile != null) {logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");} else {logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");}}}if (consumer != null) {if (application == null) {application = consumer.getApplication();}if (module == null) {module = consumer.getModule();}if (registries == null) {registries = consumer.getRegistries();}if (monitor == null) {monitor = consumer.getMonitor();}}if (module != null) {if (registries == null) {registries = module.getRegistries();}if (monitor == null) {monitor = module.getMonitor();}}if (application != null) {if (registries == null) {registries = application.getRegistries();}if (monitor == null) {monitor = application.getMonitor();}}// 检验ApplicationConfig不为空,并append系统参数checkApplication();// stub、mock的合理性校验checkStub(interfaceClass);checkMock(interfaceClass);// 构造URL的paramMap<String, String> map = new HashMap<String, String>();Map<Object, Object> attributes = new HashMap<Object, Object>();map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}if (!isGeneric()) {String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put("revision", revision);}String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length == 0) {logger.warn("NO method found in service interface " + interfaceClass.getName());map.put("methods", Constants.ANY_VALUE);} else {map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));}}map.put(Constants.INTERFACE_KEY, interfaceName);appendParameters(map, application);appendParameters(map, module);appendParameters(map, consumer, Constants.DEFAULT_KEY);appendParameters(map, this);String prefix = StringUtils.getServiceKey(map);if (methods != null && !methods.isEmpty()) {for (MethodConfig method : methods) {appendParameters(map, method, method.getName());String retryKey = method.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);if ("false".equals(retryValue)) {map.put(method.getName() + ".retries", "0");}}appendAttributes(attributes, method, prefix + "." + method.getName());checkAndConvertImplicitConfig(method, map, attributes);}}String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);if (hostToRegistry == null || hostToRegistry.length() == 0) {hostToRegistry = NetUtils.getLocalHost();} else if (isInvalidLocalHost(hostToRegistry)) {throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);}map.put(Constants.REGISTER_IP_KEY, hostToRegistry);//attributes are stored by system context.// 将属性放到容器中StaticContext.getSystemContext().putAll(attributes);// 通过createProxy(map) 构造InvokerInvocationHandler(***重要**)ref = createProxy(map);// 构造ConsumerModel并放入consumedServices中ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

最终map信息如下:

主要流程如下:
1.校验参数,并初始化配置信息
2.组装InvokerInvocationHandler参数map如上图.

通过createProxy构造InvokerHandler

private T createProxy(Map<String, String> map) {// temp://localhost?application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12248&register.ip=10.166.66.100&side=consumer&timestamp=1669037035244URL tmpUrl = new URL("temp", "localhost", 0, map);final boolean isJvmRefer;// 判断isJvmRefer是否为trueif (isInjvm() == null) {if (url != null && url.length() > 0) { // if a url is specified, don't do local referenceisJvmRefer = false;} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {// by default, reference local service if there isisJvmRefer = true;} else {isJvmRefer = false;}} else {isJvmRefer = isInjvm().booleanValue();}// 如果isJvmRefer 为trueif (isJvmRefer) {// injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12248&register.ip=10.166.66.100&side=consumer&timestamp=1669037035244URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);// 1.本地导入invoker(**重要**)invoker = refprotocol.refer(interfaceClass, url);if (logger.isInfoEnabled()) {logger.info("Using injvm service " + interfaceClass.getName());}} else {// 默认情况下 url 为空if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);if (us != null && us.length > 0) {for (String u : us) {URL url = URL.valueOf(u);if (url.getPath() == null || url.getPath().length() == 0) {url = url.setPath(interfaceName);}if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));} else {urls.add(ClusterUtils.mergeUrl(url, map));}}}} else { // assemble URL from register center's configuration// 获取注册中心的URL列表List<URL> us = loadRegistries(false);if (us != null && !us.isEmpty()) {// 遍历注册中心的URL列表,并加载监控(没配置的话monitor 为空)for (URL u : us) {URL monitorUrl = loadMonitor(u);if (monitorUrl != null) {map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));}// 缓存注册中心URL到urls中// registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&group=aaa&pid=12248&refer=application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12248&register.ip=10.166.66.100&side=consumer&timestamp=1669037035244&registry=zookeeper&timestamp=1669037368525urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));}}if (urls.isEmpty()) {throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");}}if (urls.size() == 1) {// 2.远程导入invoker(**重要**)invoker = refprotocol.refer(interfaceClass, urls.get(0));} else {List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();URL registryURL = null;for (URL url : urls) {invokers.add(refprotocol.refer(interfaceClass, url));if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {registryURL = url; // use last registry url}}if (registryURL != null) { // registry url is available// use AvailableCluster only when register's cluster is availableURL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);invoker = cluster.join(new StaticDirectory(u, invokers));} else { // not a registry urlinvoker = cluster.join(new StaticDirectory(invokers));}}}Boolean c = check;if (c == null && consumer != null) {c = consumer.isCheck();}if (c == null) {c = true; // default true}if (c && !invoker.isAvailable()) {// make it possible for consumer to retry later if provider is temporarily unavailableinitialized = false;throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());}if (logger.isInfoEnabled()) {logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());}// create service proxy// 3.通过proxyFactory.getProxy(invoker) 生成代理对象 (**重要**)return (T) proxyFactory.getProxy(invoker);
}

上述标注了几个重要的方法

  1. 本地暴露的生成invoker方法: refprotocol.refer(interfaceClass, urls.get(0))
  2. 远程暴露的生成invoker方法: refprotocol.refer(interfaceClass, urls.get(0))
  3. 生成代理对象的方法: (T) proxyFactory.getProxy(invoker)

重要方法解析

1.本地暴露生成invoker方法的逻辑如下

具体的逻辑和《Dubbo之服务暴露流程浅析》该文中的本地暴露流程类似,这边就不详细介绍了。我们重点看远程暴露生成invoker的方法。

2.远程暴露生成invoker的方法解析

1.在执行的时候对应的URL的协议为registry,所以最终走的是RegistryProtocol的refer方法,入参的URL如下:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&group=aaa&pid=25887&refer=application=demo-consumer&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&register.ip=10.166.66.100&side=consumer&timestamp=1669080319427&registry=zookeeper&timestamp=1669080594410

具体的调用链如下:由于其他的逻辑和《Dubbo之服务暴露流程浅析》该文中的远程暴露流程类似,我们主要看一下RegistryProtocol的refer方法


RegistryProtocol的refer方法:

@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {// 设置协议,我们配置的为ZK,URL 如下:// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&group=aaa&pid=25887&refer=application=demo-consumer&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&register.ip=10.166.66.100&side=consumer&timestamp=1669080319427&timestamp=1669080594410url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);// 根据URL获取注册中心ZKRegistry registry = registryFactory.getRegistry(url);// 如果入参type的类型为RegistryService.class, 则直接通过proxyFactory获取Invoker(一般情况下不走该逻辑)if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"// 获取 URL 的参数group,如果配置中有配置group相关的信息,则走第一个doRefer,不然则走第二个doRefer// 两个 doRefer的 区别为Cluster不一样Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));String group = qs.get(Constants.GROUP_KEY);if (group != null && group.length() > 0) {if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1|| "*".equals(group)) {// getMergeableCluster() 是通过Dubbo的SPI 获取name为'mergeable'的Cluster提供者.return doRefer(getMergeableCluster(), registry, type, url);}}// 最后调用doRefer获取Invokerreturn doRefer(cluster, registry, type, url);
}

RegistryProtocol的doRefer方法:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {// 构造注册目录,并设置注册中心及协议RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEY// 获取所有的属性Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());// 构造真正的服务消费者的URL// consumer://10.166.66.100/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&side=consumer&timestamp=1669080319427URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);// 如果URL中的interface不是*if (!Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {// 补充url的属性信息(category)// consumer://10.166.66.100/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&side=consumer&timestamp=1669080319427URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);// 将url 注册到注册中心, 并将url设置缓存到注册目录中registry.register(registeredConsumerUrl);directory.setRegisteredConsumerUrl(registeredConsumerUrl);}// 注册目录订阅URL,内部逻辑会调用Protocol的refer,然后将invoker封装到RegistryDirectory的methodInvokerMap中(**重要**)// 具体的RegistryDirectory的methodInvokerMap的属性可以看下图。// consumer://10.166.66.100/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&side=consumer&timestamp=1669080319427directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));// 通过Dubbo的SPI构造一个Invoker对象(**重要**)Invoker invoker = cluster.join(directory);// 缓存到consumerInvokers中ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker;
}
2.1 RegistryDirectory#subscribe的流程图如下:

最后会在RegistryDirectory的toInvokers方法中调用protocol#refer方法,通过protocol最终生成ProtocolFilterWrapper,然后由InvokerDelegate封装一下ProtocolFilterWrapper对象,放入到urlInvokerMap和methodInvokerMap中。所以在RegistryDirectory中的urlInvokerMap和methodInvokerMap已经缓存了通过Protocol链的refer方法一层层生成的Invoker。
(ProtocolFilterWrapper -> ListenerInvokerWrapper -> DubboInvoker)

Protocol的refer的调用链和《Dubbo之服务暴露流程浅析》的暴露流程类似,具体流程代码不做详细介绍。我们主要看DubboProtocol的refer方法

DubboProtocol的refer方法流程如下:

@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {// 如果url中有配置optimizer,则将类放入到优化器中optimizeSerialization(url);// create rpc invoker.// 创建一个DubboInvoker(**重要**)会创建一个Client用于网络请求DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);// 将invoker 缓存到invokers中invokers.add(invoker);return invoker;
}/*** 构造一个Client*/
private ExchangeClient[] getClients(URL url) {// whether to share connectionboolean service_share_connect = false;// url中的connections的值是否为0,没有connections值 默认为0int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);// if not configured, connection is shared, otherwise, one connection for one serviceif (connections == 0) {service_share_connect = true;connections = 1;}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {// 默认为trueif (service_share_connect) {// Client 为共享模式clients[i] = getSharedClient(url);} else {// 新创建一个Clientclients[i] = initClient(url);}}return clients;
}/*** Get shared connection*/
private ExchangeClient getSharedClient(URL url) {// 访问地址(ip:port)String key = url.getAddress();ReferenceCountExchangeClient client = referenceClientMap.get(key);// 第一次调用 client 为空if (client != null) {if (!client.isClosed()) {client.incrementAndGetCount();return client;} else {referenceClientMap.remove(key);}}// 加锁locks.putIfAbsent(key, new Object());// 同步锁synchronized (locks.get(key)) {// duble check,有值则返回if (referenceClientMap.containsKey(key)) {return referenceClientMap.get(key);}// 构造一个ExchangeClientExchangeClient exchangeClient = initClient(url);// 构造一个ReferenceCountExchangeClientclient = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);// 设置到缓存中referenceClientMap.put(key, client);ghostClientMap.remove(key);//解锁locks.remove(key);return client;}
}/*** Create new connection*/
private ExchangeClient initClient(URL url) {// client type setting.// 获取url中的client属性(默认为netty)String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));// 添加codec 属性为dubbourl = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);// enable heartbeat by default// 添加心跳属性 heartbeat 为 60surl = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));// BIO is not allowed since it has severe performance issue.// 校验name为str(client属性,默认为netty) 的Transporter提供者是否存在,不存在则抛错if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: " + str + "," +" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));}ExchangeClient client;try {// connection should be lazy// 判断URL的lazy属性是否为true, 默认为fasleif (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {// 构建一个Lazy的Client,等要网路请求时,会调用私有的initClient方法构造一个HeaderExchangeClient,和下面的Exchangers.connect(url, requestHandler) 逻辑类似client = new LazyConnectExchangeClient(url, requestHandler);} else {// 调用ExchangeClient的connect方法创建client(**重点接口**)client = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}return client;
}
上述代码主要逻辑为:
1.构造一个DubboInvoker
2.DubboInvoker会持有一个ExchangeClient
3.ExchangeClient为根据是 是否为共享模式,去初始化
3.1. 如果是共享模式,则根据url的address(ip:port) 查询本地缓存中是否有对应的client,没有则会根据是否要懒加载创建一个征程的或者懒加载的Client
3.2. 如果不是共享模式,则直接创建一个Client
2.2 通过Exchangers.connect生成Client的流程

1.获取Exchanger真正的服务提供者

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");// 获取Exchanger服务提供者(默认为HeaderExchanger对象),然后调用connect方法return getExchanger(url).connect(url, handler);
}public static Exchanger getExchanger(URL url) {// 获取URL的exchanger属性,默认为header,获取HeaderExchanger对象String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);return getExchanger(type);
}public static Exchanger getExchanger(String type) {// 通过Dubbo的SPI获取服务提供者,为HeaderExchangerreturn ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}

2.通过HeaderExchanger.connect生成client

@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {// 1.通过Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))) 创建client// 2.将Client封装为HeaderExchangeClientreturn new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

3.通过Transporters.connect生成Client

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}ChannelHandler handler;if (handlers == null || handlers.length == 0) {handler = new ChannelHandlerAdapter();} else if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}// getTransporter() 最终获取NettyTransporter,// 通过NettyTransporter.connect方法获取NettyClient.return getTransporter().connect(url, handler);
}public static Transporter getTransporter() {// 通过Dubbo SPI获取Transporter的适配类对象(Transporter$Adaptive -> NettyTransporter)return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

NettyTransporter代码

public class NettyTransporter implements Transporter {public static final String NAME = "netty";@Overridepublic Server bind(URL url, ChannelHandler listener) throws RemotingException {return new NettyServer(url, listener);}@Overridepublic Client connect(URL url, ChannelHandler listener) throws RemotingException {// 创建NettyClientreturn new NettyClient(url, listener);}
}

关于NettyClient后续的代码就不跟了,有兴趣的小伙伴可以自行去了解。

2.3 cluster.join(directory)的流程如下:

最终生成MockClusterInvoker,具体结构如下:

3.构造Invoker,缓存到注册表

主要流程如下:

StubProxyFactoryWrapper的getProxy方法

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public <T> T getProxy(Invoker<T> invoker) throws RpcException {// 继续通过Dubbo SPI的方式获取proxyT proxy = proxyFactory.getProxy(invoker);// 如果不是泛化类if (GenericService.class != invoker.getInterface()) {// 如果有stub配置(默认没有),不走下面的逻辑直接返回proxyString stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));if (ConfigUtils.isNotEmpty(stub)) {Class<?> serviceType = invoker.getInterface();if (ConfigUtils.isDefault(stub)) {if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {stub = serviceType.getName() + "Stub";} else {stub = serviceType.getName() + "Local";}}try {Class<?> stubClass = ReflectUtils.forName(stub);if (!serviceType.isAssignableFrom(stubClass)) {throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());}try {Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);proxy = (T) constructor.newInstance(new Object[]{proxy});//export stub serviceURL url = invoker.getUrl();if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());try {export(proxy, (Class) invoker.getInterface(), url);} catch (Exception e) {LOGGER.error("export a stub service error.", e);}}} catch (NoSuchMethodException e) {throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e);}} catch (Throwable t) {LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);// ignore}}}return proxy;
}

AbstractProxyFactory的getProxy方法

@Override
public <T> T getProxy(Invoker<T> invoker) throws RpcException {// 调用重载的方法return getProxy(invoker, false);
}@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {Class<?>[] interfaces = null;String config = invoker.getUrl().getParameter("interfaces");if (config != null && config.length() > 0) {String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);if (types != null && types.length > 0) {interfaces = new Class<?>[types.length + 2];interfaces[0] = invoker.getInterface();interfaces[1] = EchoService.class;for (int i = 0; i < types.length; i++) {interfaces[i + 1] = ReflectUtils.forName(types[i]);}}}if (interfaces == null) {interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};}if (!invoker.getInterface().equals(GenericService.class) && generic) {int len = interfaces.length;Class<?>[] temp = interfaces;interfaces = new Class<?>[len + 1];System.arraycopy(temp, 0, interfaces, 0, len);interfaces[len] = GenericService.class;}// 调用子类的getProxy方法return getProxy(invoker, interfaces);
}

JavassistProxyFactory的getProxy方法

@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {// 构造一个InvokerInvocationHandler对象,其中的入参为MockClusterInvoker对象return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

自此Dubbo 的服务导入流程就结束啦。

Dubbo之服务导入流程解析相关推荐

  1. Dubbo笔记 ⑤ : 服务发布流程 - Protocol#export

    文章目录 一.前言 二.RegistryProtocol#export 1. URL解析 1.1 获取注册中心URL 1.2 获取服务URL 1.3 获取订阅URL 2. 服务暴露 3. 服务注册 4 ...

  2. Dubbo暴露服务源码解析

    Dubbo服务提供方的JavaBean对应的就是ServiceBean.ServiceBean除了继承dubbo自己的配置抽象类以外,还实现了一系列的spring接口用来参与到spring容器的启动以 ...

  3. Dubbo笔记 ㉗ : 服务自省-提供者

    文章目录 一.前言 1. 概念 二.服务自省 1. 相关配置 3.1 dubbo.application.metadata-type 3.2 dubbo.application.register-co ...

  4. Dubbo原理何源码解析之服务暴露

    2019独角兽企业重金招聘Python工程师标准>>> 一.框架设计 在官方<Dubbo 用户指南>架构部分,给出了服务调用的整体架构和流程: 另外,在官方<Dub ...

  5. dubbo服务暴露流程总结

    这篇文章主要总结一下dubbo服务端启动的时候服务暴露过程,虽然官方网站和各种博客上已经有很多介绍服务暴露的帖子,但还是想把自己跟源码过程中遇到的问题和心得记录下来,算是个总结,并且本篇文章是基于du ...

  6. Dubbo中暴露服务的过程解析

    原文链接 dubbo暴露服务有两种情况,一种是设置了延迟暴露(比如delay="5000"),另外一种是没有设置延迟暴露或者延迟设置为-1(delay="-1" ...

  7. Dubbo源码分析(三) -- Dubbo的服务发现源码深入解析4万字长文

    前言 前面两篇基本上已经对dubbo的SPI,服务发布,注册等功能进行了分析,那么在消费端是如何发现服务,并进行透明的远程调用的呢?带着这个疑问,走入今天的篇章,Dubbo的服务发现 服务发现的流程 ...

  8. dubbo服务暴露原理解析

    配置解析 dubbo 的各个配置项,详细的可以参考官网 只有 group,interface,version 是服务的匹配条件,三者决定是不是同一个服务,其它配置项均为调优和治理参数 所有的配置最终都 ...

  9. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

最新文章

  1. 深度学习 免费课程_深入学习深度学习,提供15项免费在线课程
  2. 带宽计算方法 及 大B与小b 说明
  3. 8.2 TensorFlow实现KNN与TensorFlow中的损失函数,优化函数
  4. SpringBoot基础篇AOP之基本使用姿势小结
  5. 使用svnsync同步svn
  6. win 10 java 安装_win10---Java安装及环境变量配置
  7. js截取字符串区分汉字字母代码
  8. 把数字翻译成字符串python_python中如何将字符串强制转为数字
  9. 【笔记】具有O-DU和O-RU的eNB / gNB架构
  10. python语法学习第三天--列表
  11. 安卓使用MediaPlayer播放RTSP无画面的解决办法
  12. 需求分析说明书SRS
  13. CLM 陆面过程模式
  14. Win10黑屏的时候显示时钟怎么设置
  15. 二进制与格雷码相互转换
  16. word2vec算法
  17. 体育教学硕士毕业论文题目
  18. java-net-php-python-jsp安利达物流公司管理系统计算机毕业设计程序
  19. 去中心化存储的QoS是什么?
  20. android系统 vender添加自定义的预编译的应用程序

热门文章

  1. 抱抱脸(hugging face)教程-中文翻译-任务总结
  2. 2014南京甲骨文实习生面试
  3. 阿里云建站之模板建站的核心优势有哪些?
  4. krpano1.20版本正式发布!
  5. UEFI 基础教程 (一) - 基于QEMU搭建UEFI开发环境(win/linux)
  6. 软件众包网站有哪些?
  7. Android系统各个版本系统特性整理(1.1-6.0)
  8. 《编码的奥秘》——手电筒剖析——读后感!
  9. SAP 物料标准价和移动平均价详解
  10. 深入计算机组成原理(四)穿越功耗墙,我们该从哪些方面提升“性能”?