在消费者通过refer()方法获得Invoker的时候同时完成对于相关方法的订阅。以默认的dubbo作为注册中心为例子。

当消费者调用refer()方法的时候由于当中的Url中的protocol为registry,所调用的refer()方法实则为registryProtocol的refer()方法。

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);Registry registry = registryFactory.getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));String group = qs.get(Constants.GROUP_KEY);if (group != null && group.length() > 0) {if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1|| "*".equals(group)) {return doRefer(getMergeableCluster(), registry, type, url);}}return doRefer(cluster, registry, type, url);
}

首先,尝试获得registry服务,通过registryFactory的getRegistry()方法。这里采用默认的dubbo实现。但是getRegistry()方法实现在AbstractRegistryFactory中。

@Override
public Registry getRegistry(URL url) {url = url.setPath(RegistryService.class.getName()).addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);String key = url.toServiceString();// Lock the registry access process to ensure a single instance of the registryLOCK.lock();try {Registry registry = REGISTRIES.get(key);if (registry != null) {return registry;}registry = createRegistry(url);if (registry == null) {throw new IllegalStateException("Can not create registry " + url);}REGISTRIES.put(key, registry);return registry;} finally {// Release the lockLOCK.unlock();}
}

这里直接把url所要注册的服务改为RegistryService,作为注册中心也需要实现的接口。在这个方法里,会根据生成的注册中心服务url找寻注册中心服务的实例,如果已经生成过,直接返回,否则,通过createRegistry()方法生成新的注册中心。

这里默认的dubbo实现的registryFatctory为DubboRegistryFactory,createRegistryFactory()方法也实现在这里。

public Registry createRegistry(URL url) {url = getRegistryURL(url);List<URL> urls = new ArrayList<URL>();urls.add(url.removeParameter(Constants.BACKUP_KEY));String backup = url.getParameter(Constants.BACKUP_KEY);if (backup != null && backup.length() > 0) {String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(backup);for (String address : addresses) {urls.add(url.setAddress(address));}}RegistryDirectory<RegistryService> directory = new RegistryDirectory<RegistryService>(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString()));Invoker<RegistryService> registryInvoker = cluster.join(directory);RegistryService registryService = proxyFactory.getProxy(registryInvoker);DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);directory.setRegistry(registry);directory.setProtocol(protocol);directory.notify(urls);directory.subscribe(new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, RegistryService.class.getName(), url.getParameters()));return registry;
}

这里构造了registryService的代理,在这里,注册中心被当做一个dubbo应用,与其连接实则为registryService服务的远程调用,不同的是,注册中心被作为生产者,采用的是地址直连的方式与当前消费者完成连接。这里subscribe()方法的调用实则调用的是DubboRegistry中的RegistryServce的方法,这个RegistryServce也是与一般方法一样经过代理最后由InvokerInvoctionHandler所调用的,其实就是一次普通的方法发远程调用。这里的url被加入了回调参数,尤其是subscribe()方法的第二个参数也就是listener的回调属性被设置为true,也就是说注册中心对于这个参数的调用会回调消费者。

在获得了registry之后,就可以回到registryProtocpol中,继续通过doRefer()进行所需要接口方法的订阅。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEYMap<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);if (!Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY+ "," + Constants.CONFIGURATORS_CATEGORY+ "," + Constants.ROUTERS_CATEGORY));Invoker invoker = cluster.join(directory);ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker;
}

这里的registry就是刚才生成的registryService,通过远程调用的方式以接口作为参数,来完成接口方法的订阅。在默认的dubbo协议下,异步等待注册中心的回复。而相应的invoker也通过join()方法得到,默认得到的是FailoverClusterInvoker。

注册中心需要在subscribe()方法中的末尾远程回调lintener的notify()方法,达到服务地址的回调,保证消费者所订阅的服务消息能够被消费者所得知,并调用。

Dubbo消费者服务的订阅相关推荐

  1. Dubbo笔记 ㉘ : 服务自省-消费者

    文章目录 一.前言 二.ReferenceAnnotationBeanPostProcessor#doGetInjectedBean 1. newProxyInstance 2. ReferenceB ...

  2. Dubbo—— 一个服务既是消费者又是提供者

    Dubbo-- 一个服务既是消费者又是提供者 实现目标 使用dubbo实现每个服务互相远程调用 ssm实现 依赖 在每个服务中添加: <dependency><groupId> ...

  3. 【SpringCloudAlibaba学习 04】整合Dubbo(实现数据消费者服务调用数据生产者服务)

    文章目录 引言 1.[数据生产者工程]创建service-api子工程 1.1.[数据生产者服务]创建user-service-api服务 目录结构 UserService服务接口 UserServi ...

  4. dubbo k8s 服务发现_工商银行基于 Dubbo 构建金融微服务架构的实践-服务发现篇

    作者 | 张远征来源|阿里巴巴云原生公众号 导读:Dubbo 作为分布式微服务框架,众多公司在实践中基于 Dubbo 进行分布式系统架构.重启开源后,我们不仅看到 Dubbo 3.0 最新的 Road ...

  5. Dubbo消费者代理的调用

    当消费者调用服务接口的方法时,实际调用的是接口代理类的InvokerInvocationHandler的invoke()方法. public Object invoke(Object proxy, M ...

  6. Dubbo之服务导入流程解析

    Dubbo之服务导入流程解析 接着上回<Dubbo之服务暴露流程浅析>,上一篇文章已经介绍完了Dubbo的服务提供者的服务暴露的整个流程,本文主要介绍Dubbo服务消费者的服务导入流程. ...

  7. Dubbo笔记 ⑤ : 服务发布流程 - Protocol#export

    文章目录 一.前言 二.RegistryProtocol#export 1. URL解析 1.1 获取注册中心URL 1.2 获取服务URL 1.3 获取订阅URL 2. 服务暴露 3. 服务注册 4 ...

  8. Dubbo笔记 ㉗ : 服务自省-提供者

    文章目录 一.前言 1. 概念 二.服务自省 1. 相关配置 3.1 dubbo.application.metadata-type 3.2 dubbo.application.register-co ...

  9. Spring Cloud Alibaba配置实例nacos+sentinel+dubbo实行服务注册、配置中心、熔断限流

    通过Spring Cloud Alibaba相关组件nacos+sentinel+dubbo实行服务注册.配置中心.熔断限流等功能 1.本机安装nacos和sentinel-dashboard服务端 ...

最新文章

  1. 深度学习浪潮下的自然语言处理,百度NeurIPS 2019展现领域新突破
  2. QT中文显示乱码解决
  3. 如何使方法行数达到最优、常量与变量如何优雅的定义?
  4. 2021年速卖通828年中大促活动报名攻略
  5. SQL Server 数据库优化文章
  6. 后台启动_Windows10中如何阻止程序在后台运行
  7. kubernetes+Azure DevOps实现.Net Core项目的自动化部署均衡负载
  8. linux read recv,read()/ recv()成功,但缓冲区未更改且为空
  9. 基于.Net + SqlServer的分库分表设计方案
  10. php上传完没进度条_php使用APC实现实时上传进度条功能
  11. hadoop之Combiner
  12. 简明python教程 答案1
  13. WiFi6模块 RW6852S-50 2T2R蓝牙模块 SDIO接口 WIFI6
  14. 细说SDRAM控制器
  15. et200sp系统服务器模块,西门子ET200
  16. 小葵花妈妈课堂开课了:《AsyncTask源码分析》
  17. 上海宝付高谈保险扣费如何追回
  18. WinDBG 技巧:列出模块(DLL/EXE)里面所有的符号(symbol)
  19. 【前端单元测试入门03】Sinon
  20. 用 PHP 来刷leetCode 之 电话号码的字母组合

热门文章

  1. SpringBoot集成Cache缓存(Redis缓存,RedisTemplate方式)
  2. maven+Spring boot 打包发布运行
  3. python设计小游戏谁先走到17_蜗牛绕墙爬(python小游戏源码)
  4. java作业不能运行_从Windows运行时,YARN作业失败
  5. mysql select命令加速_数据库教程
  6. 【转】file_get_contents(php://input)的使用方法
  7. 如何保护 .NET 应用的安全?
  8. global语句(python学习手册422页)
  9. 如何通过使用fiddler对安卓系统设备抓包总结
  10. XHTML的使用规范