无提供者

消费端初次注册当没有提供者时或者提供者都取消注册时,监听器会通知消费端节点变更,即各个节点下无子节点时会返回一个空集合,然后组装一个空协议的URL返回


private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {List<URL> urls = toUrlsWithoutEmpty(consumer, providers);if (urls == null || urls.isEmpty()) {int i = path.lastIndexOf(Constants.PATH_SEPARATOR);String category = i < 0 ? path : path.substring(i + 1);URL empty = URLBuilder.from(consumer).setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category).build();urls.add(empty);}return urls;
}

准备通知监听器

protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}try {doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed list, retry regularlyaddFailedNotified(url, listener, urls);logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);}
}protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {super.notify(url, listener, urls);
}

把该URL添加到通知集合中,并把最新的通知信息写入到文件中。

protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}if ((CollectionUtils.isEmpty(urls))&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {logger.warn("Ignore empty notify urls for subscribe url " + url);return;}if (logger.isInfoEnabled()) {logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);}// keep every provider's category.Map<String, List<URL>> result = new HashMap<>();for (URL u : urls) {if (UrlUtils.isMatch(url, u)) {String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());categoryList.add(u);}}if (result.size() == 0) {return;}Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());for (Map.Entry<String, List<URL>> entry : result.entrySet()) {String category = entry.getKey();List<URL> categoryList = entry.getValue();categoryNotified.put(category, categoryList);listener.notify(categoryList);// We will update our cache file after each notification.// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.saveProperties(url);}
}

通知注册明细类监听器,对各个节点进行校验并分类

public synchronized void notify(List<URL> urls) {Map<String, List<URL>> categoryUrls = urls.stream().filter(Objects::nonNull).filter(this::isValidCategory).filter(this::isNotCompatibleFor26x).collect(Collectors.groupingBy(url -> {if (UrlUtils.isConfigurator(url)) {return CONFIGURATORS_CATEGORY;} else if (UrlUtils.isRoute(url)) {return ROUTERS_CATEGORY;} else if (UrlUtils.isProvider(url)) {return PROVIDERS_CATEGORY;}return "";}));List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());toRouters(routerURLs).ifPresent(this::addRouters);// providersList<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());refreshOverrideAndInvoker(providerURLs);
}

重置覆盖明细URL后刷新invoker代理,设置禁止标志,清除所有的提供端代理信息

private void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return nulloverrideDirectoryUrl();refreshInvoker(urls);
}if (invokerUrls.size() == 1&& invokerUrls.get(0) != null&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {this.forbidden = true; // Forbid to accessthis.invokers = Collections.emptyList();routerChain.setInvokers(this.invokers);destroyAllInvokers(); // Close all invokers
}

有提供者

返回带有具体子节点的集合,清除掉集合中不满足匹配的URL


private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {List<URL> urls = new ArrayList<>();if (CollectionUtils.isNotEmpty(providers)) {for (String provider : providers) {provider = URL.decode(provider);if (provider.contains(Constants.PROTOCOL_SEPARATOR)) {URL url = URL.valueOf(provider);if (UrlUtils.isMatch(consumer, url)) {urls.add(url);}}}}return urls;
}

匹配规则如下

public static boolean isMatch(URL consumerUrl, URL providerUrl) {String consumerInterface = consumerUrl.getServiceInterface();String providerInterface = providerUrl.getServiceInterface();//FIXME accept providerUrl with '*' as interface name, after carefully thought about all possible scenarios I think it's ok to add this condition.if (!(Constants.ANY_VALUE.equals(consumerInterface)|| Constants.ANY_VALUE.equals(providerInterface)|| StringUtils.isEquals(consumerInterface, providerInterface))) {return false;}if (!isMatchCategory(providerUrl.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY),consumerUrl.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY))) {return false;}if (!providerUrl.getParameter(Constants.ENABLED_KEY, true)&& !Constants.ANY_VALUE.equals(consumerUrl.getParameter(Constants.ENABLED_KEY))) {return false;}String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY);String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY);String consumerClassifier = consumerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY);String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY);String providerClassifier = providerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup))&& (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))&& (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
}

获取满足规则的URL,判断是否为空,不为空的话就代表还有满足条件的提供端,设置允许访问标志

this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {invokerUrls.addAll(this.cachedInvokerUrls);
} else {this.cachedInvokerUrls = new HashSet<>();this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map/*** If the calculation is wrong, it is not processed.** 1. The protocol configured by the client is inconsistent with the protocol of the server.*    eg: consumer protocol = dubbo, provider only has other protocol services(rest).* 2. The registration center is not robust and pushes illegal specification data.**/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;
}List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;try {destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);
}

转化URL为invoker代理,验证协议是否满足,排除空协议,查看扩展是否支持该协议,合并提供端URL和本地消费端URL的相关参数,主要以消费端的参数优先,然后引用该URL返回invoker代理,给路由链设置代理集合属性,保存URL代理对应关系

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}Set<String> keys = new HashSet<>();String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);for (URL providerUrl : urls) {// If protocol is configured at the reference side, only the matching protocol is selectedif (queryProtocols != null && queryProtocols.length() > 0) {boolean accept = false;String[] acceptProtocols = queryProtocols.split(",");for (String acceptProtocol : acceptProtocols) {if (providerUrl.getProtocol().equals(acceptProtocol)) {accept = true;break;}}if (!accept) {continue;}}if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {continue;}if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));continue;}URL url = mergeUrl(providerUrl);String key = url.toFullString(); // The parameter urls are sortedif (keys.contains(key)) { // Repeated urlcontinue;}keys.add(key);// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer againMap<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local referenceInvoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(Constants.DISABLED_KEY)) {enabled = !url.getParameter(Constants.DISABLED_KEY, false);} else {enabled = url.getParameter(Constants.ENABLED_KEY, true);}if (enabled) {invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);}} catch (Throwable t) {logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(key, invoker);}} else {newUrlInvokerMap.put(key, invoker);}}keys.clear();return newUrlInvokerMap;
}

清除失效的invoker代理信息

private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {destroyAllInvokers();return;}// check deleted invokerList<String> deleted = null;if (oldUrlInvokerMap != null) {Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {if (!newInvokers.contains(entry.getValue())) {if (deleted == null) {deleted = new ArrayList<>();}deleted.add(entry.getKey());}}}if (deleted != null) {for (String url : deleted) {if (url != null) {Invoker<T> invoker = oldUrlInvokerMap.remove(url);if (invoker != null) {try {invoker.destroy();if (logger.isDebugEnabled()) {logger.debug("destroy invoker[" + invoker.getUrl() + "] success. ");}} catch (Exception e) {logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e);}}}}}
}

Dubbo源码分析之构建远程Invoker相关推荐

  1. Dubbo 源码分析 - 服务导出

    1.服务导出过程 本篇文章,我们来研究一下 Dubbo 导出服务的过程.Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑.整个逻辑大致可 ...

  2. 志宇-dubbo源码分析

    dubbo源码分析 文档 dubbo加载配置文件 dubboSPI dubbo服务提供 1.校验配置信息 2.创建URL 3.本地注册 4.远程注册 4.1 开启netty服务端 4.2 连接注册中心 ...

  3. Dubbo 源码分析 - 集群容错之 Cluster

    1.简介 为了避免单点故障,现在的应用至少会部署在两台服务器上.对于一些负载比较高的服务,会部署更多台服务器.这样,同一环境下的服务提供者数量会大于1.对于服务消费者来说,同一环境下出现了多个服务提供 ...

  4. dubbo源码分析7 之 服务本地暴露

    在上一篇文章我们分析了一下 dubbo 在服务暴露发生了哪些事,今天我们就来分析一下整个服务暴露中的本地暴露.(PS:其实我感觉本地暴露蛮鸡肋的).本地暴露需要服务提供方与服务消费方在同一个 JVM. ...

  5. Dubbo 源码分析 - 集群容错之 LoadBalance

    1.简介 LoadBalance 中文意思为负载均衡,它的职责是将网络请求,或者其他形式的负载"均摊"到不同的机器上.避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况.通 ...

  6. Dubbo 源码分析 - 集群容错之 Router

    1. 简介 上一篇文章分析了集群容错的第一部分 – 服务目录 Directory.服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由.上一篇文章关于服务路由相关逻辑没有细 ...

  7. dubbo源码分析系列(1)扩展机制的实现

    1 系列目录 dubbo源码分析系列(1)扩展机制的实现 dubbo源码分析系列(2)服务的发布 dubbo源码分析系列(3)服务的引用 dubbo源码分析系列(4)dubbo通信设计 2 SPI扩展 ...

  8. apache dubbo 源码分析系列汇总

    Dubbo(读音[ˈdʌbəʊ])是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成.后面捐献给了知名的开源社区 ...

  9. Dubbo源码分析:小白入门篇

    关注公众号"java后端技术全栈" 回复"000"获取优质面试资料 大家好,我是老田 答应了小伙伴的Dubbo源码分析系列,今天终于来了,希望不是很晚. 主要也 ...

最新文章

  1. 厉害!从电影花瓶到 Wi-Fi 之母,这才是乘风破浪的姐姐!
  2. 题目 1083: Hello, world!
  3. java mysql乱码_41、java与mysql乱码的问题
  4. LeetCode 40. 组合总和 II(排列组合 回溯)
  5. 每日一题(C语言基础篇)1
  6. 安卓案例:注册用户免启动时的广告页面
  7. 快手短视频领域为例的领域数据建设探索
  8. DBA_实践指南系列7_Oracle Erp R12监控OAM(案例)
  9. 【BAT面试题系列】Java面试必考题JVM详解,BAT师兄深度解析背后原理
  10. jvm参数-verbose:gc和-XX:+PrintGC有区别?
  11. java fri星期转_Java日期时间以及日期相互转换
  12. vue 中使用海康威视视频插件
  13. 论文查重率这么高,是由什么原因造成的?
  14. 天翼网关-无线光纤猫的设置和有WIFI信号无网络问题排查与解决
  15. 股市最好用的大数据软件_最实用的5款炒股软件
  16. “斗”转星移,高精度(NTP网络授时)北斗授时系统
  17. 规划--如何成为一名web安全工程师
  18. MATLAB获取Excel指定行列数据及删除指定行数据
  19. python将多张图片合并成一张
  20. Field [price] of type [text] is not supported for aggregation [avg]

热门文章

  1. MySQL 时间戳操作
  2. .net简单web开发
  3. 2023年十大流媒体发展趋势展望
  4. mysql temporary table
  5. MYSQL ‘S APPLY
  6. backtrack 5 虚拟机 安装
  7. Linux常用命令——mailq命令
  8. jsf取js变量_JSF页面中的JS取得受管bean的数据(受管bean发送数据到页面)
  9. web开发框架_Web开发的最佳PHP框架
  10. ThinkPhp5.2加减法验证码