各节点关系:

这里的Invoker是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息;

Directory代表多个Invoker,可以把它看成List,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更;
Cluster将Directory中的多个Invoker伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个;
Router负责从多个Invoker中按路由规则选出子集,比如读写分离,应用隔离等;
LoadBalance负责从多个Invoker中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选;

Cluster经过目录,路由,负载均衡获取到一个可用的Invoker,交给上层调用,接口如下:

@SPI(FailoverCluster.NAME)
public interface Cluster {/*** Merge the directory invokers to a virtual invoker.** @param <T>* @param directory* @return cluster invoker* @throws RpcException*/@Adaptive<T> Invoker<T> join(Directory<T> directory) throws RpcException;}

Cluster是一个集群容错接口,经过路由,负载均衡之后获取的Invoker,由容错机制来处理,dubbo提供了多种容错机制包括:

Failover Cluster:失败自动切换,当出现失败,重试其它服务器 [1]。通常用于读操作,但重试会带来更长延迟。可通过 retries=”2″ 来设置重试次数(不含第一次)。

Failfast Cluster:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

Failsafe Cluster:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

Failback Cluster:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

Forking Cluster:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2″ 来设置最大并行数。

Broadcast Cluster:广播调用所有提供者,逐个调用,任意一台报错则报错 [2]。通常用于通知所有提供者更新缓存或日志等本地资源信息。

默认使用了FailoverCluster,失败的时候会默认重试其他服务器,默认为两次;当然也可以扩展其他的容错机制;看一下默认的FailoverCluster容错机制,具体源码在FailoverClusterInvoker中:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyinvokers = invokers;checkInvokers(copyinvokers, invocation);int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;if (len <= 0) {len = 1;}// retry loop.RpcException le = null; // last exception.List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.Set<String> providers = new HashSet<String>(len);for (int i = 0; i < len; i++) {//Reselect before retry to avoid a change of candidate `invokers`.//NOTE: if `invokers` changed, then `invoked` also lose accuracy.if (i > 0) {checkWhetherDestroyed();copyinvokers = list(invocation);// check againcheckInvokers(copyinvokers, invocation);}Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);invoked.add(invoker);RpcContext.getContext().setInvokers((List) invoked);try {Result result = invoker.invoke(invocation);if (le != null && logger.isWarnEnabled()) {logger.warn("Although retry the method " + invocation.getMethodName()+ " in the service " + getInterface().getName()+ " was successful by the provider " + invoker.getUrl().getAddress()+ ", but there have been failed providers " + providers+ " (" + providers.size() + "/" + copyinvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion() + ". Last error is: "+ le.getMessage(), le);}return result;} catch (RpcException e) {if (e.isBiz()) { // biz exception.throw e;}le = e;} catch (Throwable e) {le = new RpcException(e.getMessage(), e);} finally {providers.add(invoker.getUrl().getAddress());}}throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "+ invocation.getMethodName() + " in the service " + getInterface().getName()+ ". Tried " + len + " times of the providers " + providers+ " (" + providers.size() + "/" + copyinvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "+ Version.getVersion() + ". Last error is: "+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);}

invocation是客户端传给服务器的相关参数包括(方法名称,方法参数,参数值,附件信息),invokers是经过路由之后的服务器列表,loadbalance是指定的负载均衡策略;首先检查invokers是否为空,为空直接抛异常,然后获取重试的次数默认为2次,接下来就是循环调用指定次数,如果不是第一次调用(表示第一次调用失败),会重新加载服务器列表,然后通过负载均衡策略获取唯一的Invoker,最后就是通过Invoker把invocation发送给服务器,返回结果Result;

具体的doInvoke方法是在抽象类AbstractClusterInvoker中被调用的:

public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();LoadBalance loadbalance = null;List<Invoker<T>> invokers = list(invocation);if (invokers != null && !invokers.isEmpty()) {loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));}RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);return doInvoke(invocation, invokers, loadbalance);}protected List<Invoker<T>> list(Invocation invocation) throws RpcException {List<Invoker<T>> invokers = directory.list(invocation);return invokers;}

首先通过Directory获取Invoker列表,同时在Directory中也会做路由处理,然后获取负载均衡策略,最后调用具体的容错策略;下面具体看一下Directory;

Directory接口

接口定义如下:

public interface Directory<T> extends Node {/*** get service type.** @return service type.*/Class<T> getInterface();/*** list invokers.** @return invokers*/List<Invoker<T>> list(Invocation invocation) throws RpcException;}

目录服务作用就是获取指定接口的服务列表,具体实现有两个:StaticDirectory和RegistryDirectory,同时都继承于AbstractDirectory;从名字可以大致知道StaticDirectory是一个固定的目录服务,表示里面的Invoker列表不会动态改变;RegistryDirectory是一个动态的目录服务,通过注册中心动态更新服务列表;list实现在抽象类中:

public List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed) {throw new RpcException("Directory already destroyed .url: " + getUrl());}List<Invoker<T>> invokers = doList(invocation);List<Router> localRouters = this.routers; // local referenceif (localRouters != null && !localRouters.isEmpty()) {for (Router router : localRouters) {try {if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {invokers = router.route(invokers, getConsumerUrl(), invocation);}} catch (Throwable t) {logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);}}}return invokers;}

首先检查目录是否被销毁,然后调用doList,具体在实现类中定义,最后调用路由功能,下面重点看一下StaticDirectory和RegistryDirectory中的doList方法

1.RegistryDirectory

是一个动态的目录服务,所有可以看到RegistryDirectory同时也继承了NotifyListener接口,是一个通知接口,注册中心有服务列表更新的时候,同时通知RegistryDirectory,通知逻辑如下:

public synchronized void notify(List<URL> urls) {List<URL> invokerUrls = new ArrayList<URL>();List<URL> routerUrls = new ArrayList<URL>();List<URL> configuratorUrls = new ArrayList<URL>();for (URL url : urls) {String protocol = url.getProtocol();String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);if (Constants.ROUTERS_CATEGORY.equals(category)|| Constants.ROUTE_PROTOCOL.equals(protocol)) {routerUrls.add(url);} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {configuratorUrls.add(url);} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {invokerUrls.add(url);} else {logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());}}// configuratorsif (configuratorUrls != null && !configuratorUrls.isEmpty()) {this.configurators = toConfigurators(configuratorUrls);}// routersif (routerUrls != null && !routerUrls.isEmpty()) {List<Router> routers = toRouters(routerUrls);if (routers != null) { // null - do nothingsetRouters(routers);}}List<Configurator> localConfigurators = this.configurators; // local reference// merge override parametersthis.overrideDirectoryUrl = directoryUrl;if (localConfigurators != null && !localConfigurators.isEmpty()) {for (Configurator configurator : localConfigurators) {this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);}}// providersrefreshInvoker(invokerUrls);}

此通知接口会接受三种类别的url包括:router(路由),configurator(配置),provider(服务提供方);

路由规则:决定一次dubbo服务调用的目标服务器,分为条件路由规则和脚本路由规则,并且支持可扩展,向注册中心写入路由规则的操作通常由监控中心或治理中心的页面完成;

配置规则:向注册中心写入动态配置覆盖规则 [1]。该功能通常由监控中心或治理中心的页面完成;

provider:动态提供的服务列表

路由规则和配置规则其实就是对provider服务列表更新和过滤处理,refreshInvoker方法就是根据三种url类别刷新本地的invoker列表,下面看一下RegistryDirectory实现的doList接口:

public List<Invoker<T>> doList(Invocation invocation) {if (forbidden) {// 1. No service provider 2. Service providers are disabledthrow new RpcException(RpcException.FORBIDDEN_EXCEPTION,"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");}List<Invoker<T>> invokers = null;Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local referenceif (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {String methodName = RpcUtils.getMethodName(invocation);Object[] args = RpcUtils.getArguments(invocation);if (args != null && args.length > 0 && args[0] != null&& (args[0] instanceof String || args[0].getClass().isEnum())) {invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter}if (invokers == null) {invokers = localMethodInvokerMap.get(methodName);}if (invokers == null) {invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);}if (invokers == null) {Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();if (iterator.hasNext()) {invokers = iterator.next();}}}return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;}

refreshInvoker处理之后,服务列表已methodInvokerMap存在,一个方法对应服务列表Map>>;
通过Invocation中指定的方法获取对应的服务列表,如果具体的方法没有对应的服务列表,则获取”*”对应的服务列表;处理完之后就在父类中进行路由处理,路由规则同样是通过通知接口获取的,路由规则在下章介绍;

2.StaticDirectory

这是一个静态的目录服务,里面的服务列表在初始化的时候就已经存在,并且不会改变;StaticDirectory用得比较少,主要用在服务对多注册中心的引用;

protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {return invokers;}

因为是静态的,所有doList方法也很简单,直接返回内存中的服务列表即可;

Router接口

路由规则决定一次dubbo服务调用的目标服务器,分为条件路由规则和脚本路由规则,并且支持可扩展,接口如下:

public interface Router extends Comparable<Router> {/*** get the router url.** @return url*/URL getUrl();/*** route.** @param invokers* @param url        refer url* @param invocation* @return routed invokers* @throws RpcException*/<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}

接口中提供的route方法通过一定的规则过滤出invokers的一个子集;提供了三个实现类:ScriptRouter,ConditionRouter和MockInvokersSelector

ScriptRouter:脚本路由规则支持 JDK 脚本引擎的所有脚本,比如:javascript, jruby, groovy 等,通过type=javascript参数设置脚本类型,缺省为javascript;

ConditionRouter:基于条件表达式的路由规则,如:host = 10.20.153.10 => host = 10.20.153.11;=> 之前的为消费者匹配条件,所有参数和消费者的 URL 进行对比,=> 之后为提供者地址列表的过滤条件,所有参数和提供者的 URL 进行对比;

MockInvokersSelector:是否被配置为使用mock,此路由器保证只有具有协议MOCK的调用者出现在最终的调用者列表中,所有其他调用者将被排除;

下面重点看一下ScriptRouter源码

public ScriptRouter(URL url) {this.url = url;String type = url.getParameter(Constants.TYPE_KEY);this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);String rule = url.getParameterAndDecoded(Constants.RULE_KEY);if (type == null || type.length() == 0) {type = Constants.DEFAULT_SCRIPT_TYPE_KEY;}if (rule == null || rule.length() == 0) {throw new IllegalStateException(new IllegalStateException("route rule can not be empty. rule:" + rule));}ScriptEngine engine = engines.get(type);if (engine == null) {engine = new ScriptEngineManager().getEngineByName(type);if (engine == null) {throw new IllegalStateException(new IllegalStateException("Unsupported route rule type: " + type + ", rule: " + rule));}engines.put(type, engine);}this.engine = engine;this.rule = rule;}

构造器分别初始化脚本引擎(engine)和脚本代码(rule),默认的脚本引擎是javascript;看一个具体的url:在此我向大家推荐一个架构学习交流裙。交流学习裙号:821169538,里面会分享一些资深架构师录制的视频录像

"script://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=" + URL.encode("(function route(invokers) { ... } (invokers))")

script协议表示一个脚本协议,rule后面是一段javascript脚本,传入的参数是invokers;

(function route(invokers) {var result = new java.util.ArrayList(invokers.size());for (i = 0; i < invokers.size(); i ++) {if ("10.20.153.10".equals(invokers.get(i).getUrl().getHost())) {result.add(invokers.get(i));}}return result;
} (invokers)); // 表示立即执行方法

如上这段脚本过滤出host为10.20.153.10,具体是如何执行这段脚本的,在route方法中:

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {try {List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers);Compilable compilable = (Compilable) engine;Bindings bindings = engine.createBindings();bindings.put("invokers", invokersCopy);bindings.put("invocation", invocation);bindings.put("context", RpcContext.getContext());CompiledScript function = compilable.compile(rule);Object obj = function.eval(bindings);if (obj instanceof Invoker[]) {invokersCopy = Arrays.asList((Invoker<T>[]) obj);} else if (obj instanceof Object[]) {invokersCopy = new ArrayList<Invoker<T>>();for (Object inv : (Object[]) obj) {invokersCopy.add((Invoker<T>) inv);}} else {invokersCopy = (List<Invoker<T>>) obj;}return invokersCopy;} catch (ScriptException e) {//fail then ignore rule .invokers.logger.error("route error , rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);return invokers;}}

首先通过脚本引擎编译脚本,然后执行脚本,同时传入Bindings参数,这样在脚本中就可以获取invokers,然后进行过滤;最后来看一下负载均衡策略

LoadBalance接口

在集群负载均衡时,Dubbo提供了多种均衡策略,缺省为random随机调用,可以自行扩展负载均衡策略;接口类如下:

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {/*** select one invoker in list.** @param invokers   invokers.* @param url        refer url* @param invocation invocation.* @return selected invoker.*/@Adaptive("loadbalance")<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}

SPI定义了默认的策略为RandomLoadBalance,提供了一个select方法,通过策略从服务列表中选择一个invoker;dubbo默认提供了多种策略:

Random LoadBalance:随机,按权重设置随机概率,在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重;

RoundRobin LoadBalance:轮询,按公约后的权重设置轮询比率;存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,

久而久之,所有请求都卡在调到第二台上;

LeastActive LoadBalance:最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差;使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大;

ConsistentHash LoadBalance:一致性 Hash,相同参数的请求总是发到同一提供者;当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动;

下面重点看一下默认的RandomLoadBalance源码

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size(); // Number of invokersint totalWeight = 0; // The sum of weightsboolean sameWeight = true; // Every invoker has the same weight?for (int i = 0; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);totalWeight += weight; // Sumif (sameWeight && i > 0&& weight != getWeight(invokers.get(i - 1), invocation)) {sameWeight = false;}}if (totalWeight > 0 && !sameWeight) {// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.int offset = random.nextInt(totalWeight);// Return a invoker based on the random value.for (int i = 0; i < length; i++) {offset -= getWeight(invokers.get(i), invocation);if (offset < 0) {return invokers.get(i);}}}// If all invokers have the same weight value or totalWeight=0, return evenly.return invokers.get(random.nextInt(length));}

首先计算总权重,同时检查是否每一个服务都有相同的权重;如果总权重大于0并且服务的权重都不相同,则通过权重来随机选择,否则直接通过Random函数来随机;

有帮到你的点赞、收藏关注一下吧

需要更多教程,微信扫码即可

深入分析之Cluster层相关推荐

  1. 深入分析redis cluster 集群安装配置详解

    Redis 集群是一个提供在多个Redis间节点间共享数据的程序集.redis3.0以前,只支持主从同步的,如果主的挂了,写入就成问题了.3.0出来后就可以很好帮我们解决这个问题. 目前redis 3 ...

  2. 聊聊并发(一)——深入分析Volatile的实现原理

    引言 在多线程并发编程中synchronized和Volatile都扮演着重要的角色,Volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的"可见性" ...

  3. 北大华为鹏城联合首次提出视觉 Transformer 后量化算法!

    点击上方"视学算法",选择加"星标"或"置顶" 重磅干货,第一时间送达 AI 科技评论报道 编辑 | 陈大鑫 AI 科技评论今天为大家介绍一 ...

  4. Dubbo开源现状与未来规划

    摘要: Dubbo 在过去一段时间疏于维护,去年阿里高调宣布重启 Dubbo 开源之后,社区里问的最多的问题是,这次开源与上次有什么一样,还有就是 Dubbo 和 Spring Boot.Spring ...

  5. vivo 互联网业务就近路由技术实战

    一.问题背景 在vivo互联网业务高速发展的同时,支撑的服务实例规模也越来越大,然而单个机房能承载的机器容量是有限的,于是同城多机房甚至多地域部署就成为了业务在实际部署过程中不得不面临的场景. 一般情 ...

  6. Java并发机制的底层实现原理

    Java代码在编译后会变成Java字节码,字节码被类加载器加载到JVM里,JVM执行字节码,最终需要转化为汇编指令在CPU上执行,Java中所使用的并发机制依赖于JVM的实现和CPU的指令.本章我们将 ...

  7. dubbo 内存溢出怎么处理_关于 Dubbo 的重要入门知识点总结

    目录: 一 重要的概念 1.1 什么是 Dubbo? 1.2 什么是 RPC?RPC原理是什么? 1.3 为什么要用 Dubbo? 1.4 什么是分布式? 1.5 为什么要分布式? 二 Dubbo 的 ...

  8. 北京沙龙报名 | 关于Dubbo开源的那些事儿

    自从去年7月份阿里宣布重启Dubbo维护以后,开源社区热度最高的两个问题:一是这次开源和以前有什么不一样的地方?二是阿里的Dubbo和Spring Boot. Spring Cloud 是什么关系? ...

  9. oracle 10g real application clusters introduction (RAC原理)

     1.什么是cluster     一个cluster是由两个或是多个独立的.通过网络连接的servers组成的.几个硬件供应商多年以来提供了Cluster性能的各种需求.一些Clusters仅仅为了 ...

最新文章

  1. 邓文迪撑杆跳,甩开老公要独立?_富杂志_新浪博客
  2. Macbook pro 2016/2017 接入扩展坞时断 WIFI 问题的解决办法
  3. markdown 笔记
  4. bat基础学习--bat批量执行其他bat(startcall),bat执行后暂停(调试)关闭,批量执行关闭exe----基础
  5. 大话数据结构:最短路径算法
  6. 制作点击文字变颜色_腾讯的动态时间轴PPT火了!制作简单又有逼格,都学起来啊...
  7. 微信亿级用户异常检测框架的设计与实践
  8. 亲测有效!实现Chrome浏览器下载速度提升3倍!
  9. 如何使用XGBoost开发随机森林集成
  10. php输出100以内素数(质数)
  11. 关于编程等宽字体 Cascadia Code
  12. 创建加密访问网站,端口443
  13. SIEBEL应用概述
  14. phalcon mysql_phalcon mysql_phalcon数据库操作
  15. Swift不深入只浅出入门教程-孟祥月-专题视频课程
  16. js实现彩票机选效果
  17. rails--图片的使用
  18. 通过百度API实现图片车牌号识别
  19. 富斯i6航模遥控器配apm(pix)飞控mission planner疑难杂症解决策略(上)
  20. msbuild 语法_MSBuild 编译asp.net 命令写法

热门文章

  1. 苹果大翻身!双11卖疯了,10分钟成交量超去年全天7倍
  2. 直降500+24期免息!天猫双11全线iPhone打折
  3. 过气旗舰不如?刘作虎确认一加新机:比一加7 Pro更超值
  4. 三星Galaxy Note10+最后的爆料:配备更大的S-Pen手写笔
  5. 2019年中国十大富豪排名出炉!马化腾力压马云位列榜首
  6. opencv qt5安装linux,Ubuntu OpenCV安装和设置(Qt5吗?)
  7. 4 相机切换_景明促销IIVlog小新机 索尼Vlog相机ZV1
  8. python大游戏_玩游戏就能学Python?太炫酷了!
  9. 触发器_PLCDCS组态中SR触发器介绍
  10. 【Spark】Spark ML 机器学习的一个案例