文章目录

  • dubbo服务发现-RegistryDirectory
    • 1.前言
    • 2.RegistryDirectory结构
    • 3.源码分析
    • 4.总结的流程图
    • 5.结语
    • 6.服务内方法重载了,会有影响吗?

dubbo服务发现-RegistryDirectory

1.前言

在dubbo请求处理流程中,在consumer端有个很重要的功能,从RegistryDirectory根据methodName获取Invoker集合,这些目标Invoker又是如何保存到RegistryDirectory的呢?在服务停止,又是如何感知的呢?下面进行分析

在进行深入分析之前,先来了解一下RegistryDirectory(服务目录)是什么。服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。

在一个服务集群中,服务提供者数量并不是一成不变的,如果集群中新增了一台机器,相应地在服务目录中就要新增一条服务提供者记录。或者,如果服务提供者的配置修改了,服务目录中的记录也要做相应的更新。如果这样说,服务目录和注册中心的功能不就雷同了吗?确实如此,这里这么说是为了方便大家理解。实际上服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。Invoker 有什么用呢?看名字就知道了,这是一个具有远程调用功能的对象。讲到这大家应该知道了什么是服务目录了,它可以看做是 Invoker 集合,且这个集合中的元素会随注册中心的变化而进行动态调整。

总结:Directory代表多个Invoker,可以把它看成List,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。

从注册表RegistryDirectory(动态服务目录)获取Invoker集合源码分析记录

public List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed) {throw new RpcException("Directory already destroyed .url: " + getUrl());}List<Invoker<T>> invokers = doList(invocation);//@1List<Router> localRouters = this.routers; // local referenceif (localRouters != null && !localRouters.isEmpty()) {for (Router router : localRouters) {try {if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {//运行时执行invokers = router.route(invokers, getConsumerUrl(), invocation);//@2}} catch (Throwable t) {logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);}}}return invokers;
}

代码@1处,使用RegistryDirectory获取Invoker集合,查看但很简单,就是根据methodName从缓存RegistryDirectory.methodInvokerMap获取Invoker集合

代码@2处,使用RegistryDirectory上的路由集合对Invoker集合进行过滤,获取过滤后的Invoker集合。路由通常默认是[MockInvokersSelector,TagRouter],第一个路由是用于Mock,第二个是dubbo2.6.5后新增的标签路由,可以用于流量隔离,非常好用。

那么疑问来了methodInvokerMap是如何保存的,如何感知zk变化从而refresh的呢? 下面解答

2.RegistryDirectory结构

首先看下RegistryDirectory结构

使用了模板模式,RegistryDirectory的属性较多,下面做下说明

cluster:static属性,dubbo集群,具体是代理生成的自适应类型Cluster$Adaptive,可以根据url上集群类型生成对应Cluster,默认是failove快速容错集群策略

routerFactory:static属性,dubbo路由工厂,具体是代理生成的自适应类型RouterFactory,根据url上的路由类型生成对应Cluster

configuratorFactory:static属性,dubbo配置工厂,具体是代理生成的自适应类型ConfiguratorFactory$Adaptive,可以根据url上override/absent协议生成对应的配置工厂,作用是动态配置。override-覆盖,absent-清空。

serviceKey:用于表示一组服务,组成格式group+/+group+/+group+/+interfaceName+:+$version。仅仅用于打印日志

serviceType:interface的clazz

queryMap:即consumer url上的参数,用于refreshInvoker内与provider url merge生成新url

directoryUrl:

serviceMethods:服务提供方的methodName集合

multiGroup:多分组标识

protocol:是Protoco$Adaptive,在RegistryProtocol.doRefer(Cluster, Registry, Class, URL)操作内赋值

registry:是ZookeeperRegistry,在RegistryProtocol.doRefer(Cluster, Registry, Class, URL)操作内赋值

forbidden:

overrideDirectoryUrl:

registeredConsumerUrl:在zk上consumer临时节点值

configurators:对应zk上/dubbo/interfaceName/configuratiors下的节点

urlInvokerMap:保存的是merge后的providerUrl与Invoker集合

methodInvokerMap:缓存provider端Invoker

cachedInvokerUrls:保存的是prodivers节点下的url

父类consumerUrl:是consumer协议的url,其中category=providers,configurators,routers

父类routers:路由集合,默认是[MockInvokersSelector,TagRouter],在RegistryDirectory创建时候随之创建

3.源码分析

入口在com.alibaba.dubbo.registry.integration.RegistryProtocol.doRefer(Cluster, Registry, Class, URL)

//传入参数cluster是Cluster$Adaptive, registry是ZookeeperRegistry  接口clazz, url是zk协议
private <T> Invoker<T> doRefer(Cluster cluster , Registry registry, Class<T> type, URL url) {RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);//创建RegistryDirectory,directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEYMap<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);if (!Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);//conusmer url添加参数category=consumers。写入到zk上的节点值如果没有category则会默认是provider,因为是consumer节点因此要增加category=consumers表示consumer节点registry.register(registeredConsumerUrl);//在zk创建consumer临时节点节点值保存到RegistryDirectodirectory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));//@3 订阅providers、configurators、routers三个节点,这三个节点下内容有变动,则会refreshInvokerInvoker invoker = cluster.join(directory);//集群策略生成Invoker,默认是FailoverCluster生成FailoverClusterInvoker,该Invoker封装了RegistryDirectory,也即封装了目标调用方ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker;
}

工作重要是代码@3,在consumer url上添加参数category=providers,configurators,routers,表示订阅这三个节点(监听这三个节点变化从而refresh)

接着在com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.doSubscribe(URL, NotifyListener)内处理

//url是consumer协议,其中有参数category=providers,configurators,routers, listener是RegistryDirectory,注册表也是个zk监听器
protected void doSubscribe(final URL url, final NotifyListener listener) {try {if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {//忽略} else {List<URL> urls = new ArrayList<URL>();for (String path : toCategoriesPath(url)) {//@4 toCategoriesPath获取订阅节点路径ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);if (listeners == null) {zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());listeners = zkListeners.get(url);}ChildListener zkListener = listeners.get(listener);if (zkListener == null) {listeners.putIfAbsent(listener, new ChildListener() {@Overridepublic void childChanged(String parentPath, List<String> currentChilds) {ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));//@5  consumer启动后,订阅的zk节点有变化,zk通知执行}});zkListener = listeners.get(listener);}zkClient.create(path, false);//在zk创建永久节点(节点存在则不创建),比如consumer端创建的是routers节点List<String> children = zkClient.addChildListener(path, zkListener);//@6  zk节点添加监听器ChildListener,即匿名内部类ZookeeperRegistry$3。返回结果children就是对应path下的数据,比如ptah是providers,则返回这个节点下的所有注册到zk上的服务urlif (children != null) {urls.addAll(toUrlsWithEmpty(url, path, children));//toUrlsWithEmpty返回providers、routers、configurators下的所有节点数据。因此urls保存的是这三个节点所有数据(url)。但是如果不匹配情况下,返回的是empty协议,空协议}} notify(url, listener, urls);//@7 consumer端启动执行}} catch (Throwable e) {throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}
}

代码@4:toCategoriesPath获取consumer url上category=providers,configurators,routers的路径,即订阅的路径,然后循环遍历,添加监听节点

代码@5:功能同代码@7,不同之处是是由zk上监听节点变化触发

代码@6:分别监听providers、configurators、routers节点,返回各自节点下的所有数据(url)

代码@7:通知RegistryDirectory处理,核心方法

接着看notify方法处理逻辑

ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List<URL>)

//传入参数 url是conumser协议,包含参数category=providers,configurators,routers
//listener是RegistryDirectory
//urls是/dubbo/interfaceName下每个providers,configurators,routers节点下的数据并集
protected void notify(URL url, NotifyListener listener, List<URL> urls) {//忽略检测Map<String, List<URL>> result = new HashMap<String, List<URL>>();//@8-startfor (URL u : urls) {if (UrlUtils.isMatch(url, u)) {//consumer url和provider/override/router url想匹配String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);List<URL> categoryList = result.get(category);if (categoryList == null) {categoryList = new ArrayList<URL>();result.put(category, categoryList);}categoryList.add(u);}}//@8-endif (result.size() == 0) {//result存放的是key是configurators/routers/providers,value是每个节点下所有的经过match后的url集合return;}//@9-startMap<String, List<URL>> categoryNotified = notified.get(url);if (categoryNotified == null) {notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());categoryNotified = notified.get(url);}for (Map.Entry<String, List<URL>> entry : result.entrySet()) {String category = entry.getKey();List<URL> categoryList = entry.getValue();categoryNotified.put(category, categoryList);saveProperties(url);listener.notify(categoryList);//RegistryDirectory.notify()}//@9-end
}

UrlUtils.isMatch:判断逻辑,取and结果

1.u的interfaceName和consumer url的interfaceName相同

2.u的category必须包含在consumer url的category内。因为consumer url的category是[providers,configurators,routers]

3.两者group相同,或u的group包含在consumer url的group内

4.两者version相同

5.两者的classifier相同,classifier是从url上获取参数classifier,这两个通常相同。

因此通常有用的就是group和version了,通过group、version过滤一波url。

代码@8:获取和consumer url匹配的providers/configurators/routers节点下的url,保存到result,其中key就是providers/configurators/routers,value是匹配的各自节点下的数据(即url)

代码@9:对providers/configurators/routers节点下的(match)url进行服务发现。执行RegistryDirectory.notify()。因此最终核心处理是在RegistryDirectory.notify(),下面分析这个方法

//com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(List<URL>)
//参数urls是providers/configurators/routers节点下的match url
@Override
public synchronized void notify(List<URL> urls) {//加锁的原因是因为订阅的zk三个节点可能同时间都有变化,要加锁处理List<URL> invokerUrls = new ArrayList<URL>();List<URL> routerUrls = new ArrayList<URL>();List<URL> configuratorUrls = new ArrayList<URL>();//@10-startfor (URL url : urls) {String protocol = url.getProtocol();String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);if (Constants.ROUTERS_CATEGORY.equals(category)|| Constants.ROUTE_PROTOCOL.equals(protocol)) {routerUrls.add(url);} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {configuratorUrls.add(url);} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {invokerUrls.add(url);} else {logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());}}//@10-end// configuratorsif (configuratorUrls != null && !configuratorUrls.isEmpty()) {this.configurators = toConfigurators(configuratorUrls);//@11,获取OverrideConfigurator集合并保存到RegistryDirectory.configurators}// routersif (routerUrls != null && !routerUrls.isEmpty()) {List<Router> routers = toRouters(routerUrls);//@12if (routers != null) { // null - do nothingsetRouters(routers);}}List<Configurator> localConfigurators = this.configurators; // local reference// merge override parametersthis.overrideDirectoryUrl = directoryUrl;if (localConfigurators != null && !localConfigurators.isEmpty()) {for (Configurator configurator : localConfigurators) {this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);//@13}}// providersrefreshInvoker(invokerUrls);//@14
}

代码@10:

providers:节点下url是dubbo协议

configurators:节点下url是override协议

routers:节点下url是route协议

invokerUrls保存providers节点下的url。这个节点对应provider服务端注册

routerUrls保存routers节点下的url。这个节点对应dubbo-admin上的路由规则

configuratorUrls保存configurators节点下的url。这个节点对应dubbo-admin上的动态配置

注意:provider启动会在zk注册providers和configurators节点。consumer启动会在zk注册consumers和routers节点。

代码@11:通常都是获取OverrideConfigurator集合,保存到RegistryDirectory.configurators

代码@13:使用OverrideConfigurator集合配置RegistryDirectory.overrideDirectoryUrl

代码@12:根据路由url获取路由集合,通常都是条件路由,即ConditionRouter,保存路由集合到RegistryDirectory.routers,为[ConditionRouter,TagRouter,MockInvokersSelector]

代码@14:刷新Invoker,refreshInvoker方法只有providers节点下的url会真正执行

总结:该方法是consumer启动生成Invoker和zk节点变化通知刷新生成Invoker的入口。

private void refreshInvoker(List<URL> invokerUrls) {if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {this.forbidden = true; // Forbid to accessthis.methodInvokerMap = null; // Set the method invoker map to nulldestroyAllInvokers(); // Close all invokers} else {this.forbidden = false; // Allow to accessMap<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference 保存旧的Invoker,用于refresh后将旧的销毁if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {invokerUrls.addAll(this.cachedInvokerUrls);} else {this.cachedInvokerUrls = new HashSet<URL>();this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison}if (invokerUrls.isEmpty()) {//invokerUrls是providers节点下的urlreturn;//对于configurators/routers节点下的url来说,为空,直接return}//toInvokers 转换生成Invoker集合Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map//根据方法名分组Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map// state change// If the calculation is wrong, it is not processed.if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}//缓存到RegistryDirectory.methodInvokerMap。通常不使用多分组,因此忽略toMergeMethodInvokerMapthis.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;this.urlInvokerMap = newUrlInvokerMap;try {//消耗不无用的Invoker。这里真正调用是zk节点变化通知destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}}
}

重点就在toInvokers和toMethodInvokers两个方法,下面先看toInvokers

//传入的参数urls是providers节点下的url
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}Set<String> keys = new HashSet<String>();String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);for (URL providerUrl : urls) {// If protocol is configured at the reference side, only the matching protocol is selectedif (queryProtocols != null && queryProtocols.length() > 0) {boolean accept = false;String[] acceptProtocols = queryProtocols.split(",");for (String acceptProtocol : acceptProtocols) {if (providerUrl.getProtocol().equals(acceptProtocol)) {accept = true;break;}}if (!accept) {continue;}}if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {continue;}if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));continue;}URL url = mergeUrl(providerUrl);//重点方法。merge consumer url上的参数和动态配置override url上的参数merge到providerUrl上,这就是两端都配置了相同参数,以consumer端为准的原因了,因为参数被覆盖为consumer端参数。RegistryDirectory.queryMap就是consumer url参数String key = url.toFullString(); // The parameter urls are sortedif (keys.contains(key)) { // Repeated urlcontinue;}keys.add(key);// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer againMap<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local referenceInvoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(Constants.DISABLED_KEY)) {//经过merge后的providerUrl上disabled=true,则说明被禁用了,因此不生成Invoker。例如override://192.168.1.155:20880/org.pangu.api.ProductService?category=configurators&disabled=true&dynamic=false&enabled=true,则就是禁用。可以通过dubbo-adminenabled = !url.getParameter(Constants.DISABLED_KEY, false);} else {enabled = url.getParameter(Constants.ENABLED_KEY, true);//经过merge后的providerUrl上enabled=false,则说明不启用,因此不生成Invoker}if (enabled) {invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);//DubboProtocol.refer()生成consmer端Invoker,封装了nettyClient}} catch (Throwable t) {logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(key, invoker);}} else {newUrlInvokerMap.put(key, invoker);}}keys.clear();return newUrlInvokerMap;//返回Invoker集合,其中key是merge后的providerUrl
}

看mergeUrl的实现

//功能:merge consumer url上的参数到providerUrl,接着使用动态配置,把override上的参数也merge到providerUrl
private URL mergeUrl(URL providerUrl) {//把consumer url上的参数merge到providerUrl上,这就是两端都配置了相同参数,以consumer端为准的原因了,因为参数被覆盖为consumer端参数providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parametersList<Configurator> localConfigurators = this.configurators; // local referenceif (localConfigurators != null && !localConfigurators.isEmpty()) {for (Configurator configurator : localConfigurators) {providerUrl = configurator.configure(providerUrl);//使用动态配置进行配置,即把上步merge后的providerUrl上加上符合的动态配置上的参数}}providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!//其它忽略return providerUrl;
}

PS:需要注意的是通过dubbo-admin的动态配置和路由配置,在zk上写的节点都是永久节点,而非临时节点,例如

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n0oXlwBF-1649166212945)(https://cdn.jsdelivr.net/gh/zhangyj131/mdpicture/docs/20210712012104.png)]

曾经就遇到过服务启动,在zk上存在,但是consumer端就是无法发现,最后发现为了测试把这个节点服务给禁用了,因此导致一直consumer端启动无法发现这个服务。

接着看toMethodInvokers方法,代码如下

private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();// According to the methods classification declared by the provider URL, the methods is compatible with the registry to execute the filtered methodsList<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();if (invokersMap != null && invokersMap.size() > 0) {//根据methodName进行分组for (Invoker<T> invoker : invokersMap.values()) {String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);if (parameter != null && parameter.length() > 0) {String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);if (methods != null && methods.length > 0) {for (String method : methods) {if (method != null && method.length() > 0&& !Constants.ANY_VALUE.equals(method)) {List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);if (methodInvokers == null) {methodInvokers = new ArrayList<Invoker<T>>();newMethodInvokerMap.put(method, methodInvokers);}methodInvokers.add(invoker);}}}}invokersList.add(invoker);}}List<Invoker<T>> newInvokersList = route(invokersList, null);//使用路由规则进行过滤,获取路由后的Invoker集合newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);//key是*,value是路由过滤后的Invoker集合if (serviceMethods != null && serviceMethods.length > 0) {for (String method : serviceMethods) {List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);if (methodInvokers == null || methodInvokers.isEmpty()) {methodInvokers = newInvokersList;}newMethodInvokerMap.put(method, route(methodInvokers, method));//使用路由规则进行过滤并返回过滤Invoker集合,和methodName绑定保存。 这里又经过一个route()操作是因为前面的route()是为了key是*的情况,*是表示所有方法,这是dubbo的容错性,在dubbo请求时候,在RegistryDirectory获取不到Invoker时,就用*从RegistryDirectory.methodInvokerMap获取Invoker集合}}// sort and unmodifiablefor (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);Collections.sort(methodInvokers, InvokerComparator.getComparator());newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));}return Collections.unmodifiableMap(newMethodInvokerMap);
}

接着需要关注下route()操作

//com.alibaba.dubbo.registry.integration.RegistryDirectory.route(List<Invoker<T>>, String)
//使用条件路由对invokers进行规则筛选,返回筛选后的invokers
private List<Invoker<T>> route(List<Invoker<T>> invokers, String method) {Invocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);List<Router> routers = getRouters();//获取RegistryDirectory.routers,默认是[TagRouter,MockInvokersSelector],如果通过dubbo-admin使用了动态配置,则configurators节点下有override协议url,则路由集合通常是[ConditionRouter,TagRouter,MockInvokersSelector],ConditionRouter可能有多个if (routers != null) {for (Router router : routers) {// If router's url not null and is not route by runtime,we filter invokers hereif (router.getUrl() != null && !router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {//RUNTIME_KEY表示是否是运行时执行//ConditionRouter的url上通常runtime=false,因此执行route()操作//TagRouter的url上runtime=true,因此不执行route()操作//MockInvokersSelector的url是null,因此不执行route()操作//因此总结下来就是使用条件路由规则对Invoker集合进行筛选,返回筛选后的Invoker集合invokers = router.route(invokers, getConsumerUrl(), invocation);}}}return invokers;
}
//ConditionRouter条件路由是用于静态路由,在refreshInvoker时候进行路由,非在请求时候
//TagRouter是在请求时候进行路由筛选

至此分析完毕,总体流程就是:

1.根据订阅的configurators和routers节点,分别生成Configurator集合和路由集合保存到注册表RegistryDirectory,分别用于配置url和路由规则筛选

2.merge consumer url上的参数和configurators节点下(匹配的)override url参数到providerUrl

3.使用DubboProtocol.refer()生成Invoker。该Invoker封装了NettyClient,可以用于网络通信。且providerUrl携带provider端的host port,创建nettyClient知道连接对应的服务端口。

4.使用条件路由规则进行路由过滤,然后按照methodName分组,分组结果最终保存到RegistryDirectory.methodInvokerMap。这样在请求调用时候,就可以根据methodName获取获取Invoker集合。

4.总结的流程图

[外链图片转存中…(img-fVdDhX14-1649166212946)]

5.结语

花不少时间分析RegistryDirectory的原因是这个类很重要,因为涉及到服务的动态发现、动态配置、路由规则等,比如可以使用路由规则进行灰度、蓝绿。理解了这个类,才真正能明白服务动态发现的实现。我们的蓝绿就是使用的路由规则(非tag路由),后续写下。

6.服务内方法重载了,会有影响吗?

一个服务有两个同名方法,属于重载,那么服务注册和发现会有什么问题吗?

对于provider端来说,注册到zk上的providerUrl上有methods=[所有方法],获取服务所有方法代码如下

String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();//获取所有方法
if (methods.length == 0) {logger.warn("NO method found in service interface " + interfaceClass.getName());map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));//使用set去重(去除方法名相同的方法,无论这些方法是否是重载)
}
//获取服务所有方法名,然后使用set去重,这样注册到zk节点的methods参数就是去重后的方法集合

接着服务生成和注册,服务暴露缓存到DubboProtocol.exporterMap中,其中key是port+interfaceName+version+group,和方法名无关。consumer调用provider时候,也是根据key是port+interfaceName+version+group从缓存DubboProtocol.exporterMap获取DubboExporter,接着从Exporter获取Invoker,然后通过反射调用,根据参数类型+参数调用目标服务方法(目标服务方法被重载并不会影响。)

对于consumer端来说,也是获取providerUrl上的methods,获取的methodName也是provider端去重后的。consumer端根据methodName获取Invoker集合时候。要明白Invoker是针对服务级别,而非method级别(一个服务内的所有method都可以映射到这个Invoker)

dubbo系列三、 服务发现RegistryDirectory相关推荐

  1. Dubbo接口级服务发现-数据结构

    目录 Dubbo服务治理易用性的原理: URL地址数据划分: Dubbo接口级服务发现---易用性的代价 Proposal,适应云原生.更大规模集群的服务发现类型. Dubbo3应用级服务发现---基 ...

  2. SpringCloud系列--eureka(三)服务发现DiscoveryClient

    对于注册到Eureka的微服务,可以通过服务发现来获得该服务的信息. 下面将介绍如何使用DiscoveryClient获取注册中心上的实例信息. 1.Controller中注入DiscoveryCli ...

  3. 源码分析Dubbo Invoker概述----服务发现、集群、负载均衡、路由体系

    Invoker,负载网络调用组件,底层依懒与网络通信,Invoker主要负责服务调用,自然与路由(比如集群)等功能息息相关,本节先从整体上把控一下Dubbo服务调用体系,服务发现.集群.负载均衡.路由 ...

  4. Dubbo系列之服务注册与发现

    文章目录 一.分布式基本理论 1.1.分布式基本定义 1.2 架构发展演变 1.3.RPC简介 二.Dubbo理论简介 三.Dubbo环境搭建 3.1 Zookeeper搭建 3.2 Dubbo管理页 ...

  5. dubbo源码-服务发现

    服务消费者 <!-- consumer's application name, used for tracing dependency relationship (not a matching ...

  6. Dubbo源码分析(三) -- Dubbo的服务发现源码深入解析4万字长文

    前言 前面两篇基本上已经对dubbo的SPI,服务发布,注册等功能进行了分析,那么在消费端是如何发现服务,并进行透明的远程调用的呢?带着这个疑问,走入今天的篇章,Dubbo的服务发现 服务发现的流程 ...

  7. Dubbo 迈出云原生重要一步 - 应用级服务发现解析

    作者 | 刘军(陆龟)  Apache Dubbo PMC 概述 社区版本 Dubbo 从 2.7.5 版本开始,新引入了一种基于实例(应用)粒度的服务发现机制,这是我们为 Dubbo 适配云原生基础 ...

  8. dubbo优势_Dubbo 迈出云原生重要一步 应用级服务发现解析

    作者 | 刘军(陆龟)  Apache Dubbo PMC 概述 社区版本 Dubbo 从 2.7.5 版本开始,新引入了一种基于实例(应用)粒度的服务发现机制,这是我们为 Dubbo 适配云原生基础 ...

  9. 《Dubbo迈出云原生重要一步-应用级服务发现解析》

    作者 | 刘军(陆龟) Apache Dubbo PMC 概述 社区版本 Dubbo 从 2.7.5 版本开始,新引入了一种基于实例(应用)粒度的服务发现机制,这是我们为 Dubbo 适配云原生基础设 ...

最新文章

  1. VS 2017显示“高级保存选项”命令操作方法
  2. 视图解析jstlView支持便捷的国际化功能
  3. 【PC工具】更新:在线智能抠图工具,在线视频、图片、音频等转换工具,绿色免安装抠图神奇抠图工具...
  4. 聊聊hystrix的execution.isolation.semaphore.maxConcurrentRequests属性
  5. centos 更换java版本_centos7更换jdk版本
  6. 联通突然从4g变成3g了_中国联通最快明年底2G全面退网 并推进3G逐步退网
  7. JavaScript继承
  8. ioslabel阴影,UILabel的内阴影
  9. poj 1083 Moving Tables
  10. PWN-PRACTICE-BUUCTF-10
  11. script 标签到底该放在哪里
  12. 内部类详解————静态内部类
  13. latex中report目录_在 LaTeX 中将不编号的章节列入目录
  14. Bailian4124 海贼王之伟大航路【DP】
  15. ATC空管系统的实时控制软件系统分析
  16. Qt三方库开发技术:Qt应用内部打开PDF文件
  17. 荣耀总裁赵明揭秘华为薪酬制度:不看资历,只看贡献
  18. Spring+Quartz实现文件中转站
  19. 强化学习RL学习笔记2-概述(2)
  20. 获取各大电商平台,item_get_app - 获得淘宝app商品详情原数据API返回数据说明

热门文章

  1. D3.js从入门指南
  2. 与NC对接传输凭证等数据, xml格式文件传输
  3. Spring Boot 整合 阿里云短信(模板模式)
  4. HTML+CSS静态网页作业:NBA勒布朗詹姆斯篮球明星带js(5页)
  5. 苏州博物馆计算机系统操作工,行车及铁钢包调度系统在炼钢厂应用.doc
  6. android camera 对焦大小,Android camera2对焦设置
  7. 如何搭建vue项目,完整搭建vue项目
  8. 淘宝maven镜像库是个好东西
  9. Java开源工具库使用之java源代码生成库JavaPoet
  10. 关于前端 后端 数据库 时间的设置与传递