上一篇:Pigeon的一次调用服务端发生了什么
前面介绍了Pigeon请求调用的过程,那么Pigeon的服务是如何被调用方感知的呢,我们知道通常RPC都会有一个注册中心,以保存服务测的信息(ip,port等等),这样客户端就可以通过注册中心去拿到服务的信息,来完成请求的调用。那一起来看下Pigeon服务的注册和发现机制是如何实现的。

服务的注册

首先,我们来看下服务的注册机制。
在Pigeon中主要使用了@Reference和@Service注解,用来标志服务调用方接口和服务提供方实现类,而对这两个注解进行拦截处理的类是AnnotationBean类,这里是通过实现了Spring的BeanPostProcessor接口来完成处理的(Pigeon相关的@Service类扫描和其BeanDefinition的封装实通过实现了BeanFactoryPostProcessor接口实现的),看下postProcessAfterInitialization方法实现:

public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 先判断要注入相关属性的Bean是否在指定的注解扫描包下Class<?> beanClass = AopUtils.getTargetClass(bean);if (beanClass == null || !isMatchPackage(beanClass.getName())) {return bean;}// 判断类定义中是否存在@Service注解Service service = beanClass.getAnnotation(Service.class);if (service != null) {// 如果未自定义接口,则用当前beanClassClass serviceInterface = service.interfaceClass();if (void.class.equals(service.interfaceClass())) {serviceInterface = ServiceConfigUtils.getServiceInterface(beanClass);}if (serviceInterface == null) {serviceInterface = beanClass;}// 初始化ProviderConfig和ServerConfig,完成服务提供者配置和服务器配置的初始化ProviderConfig<Object> providerConfig = new ProviderConfig<Object>(serviceInterface, bean);providerConfig.setService(bean);providerConfig.setUrl(service.url());providerConfig.setVersion(service.version());providerConfig.setSharedPool(service.useSharedPool());providerConfig.setActives(service.actives());ServerConfig serverConfig = new ServerConfig();serverConfig.setPort(getDefaultPort(service.port()));serverConfig.setSuffix(service.group());serverConfig.setAutoSelectPort(service.autoSelectPort());providerConfig.setServerConfig(serverConfig);// 注册服务提供者,启动服务器,发布服务,完成pigeon提供方调用初始化ServiceFactory.addService(providerConfig);}// 解析bean中方法和属性是否包含Reference,完成bean作为服务调用方的依赖注入。postProcessBeforeInitialization(bean, beanName);return bean;}

这里我们关系的是注册过程,即ServiceFactory.addService(),进去看下它的最终触发的实现

public void doAddService(ProviderConfig providerConfig) {try {// 检查服务名checkServiceName(providerConfig);// 发布指定版本服务,同时解析服务方法ServicePublisher.addService(providerConfig);// 启动netty RPC服务器,作为服务提供方供调用方调用服务ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);// 更新serverConfigproviderConfig.setServerConfig(serverConfig);// 实际发布服务,会将服务注册到注册中心,供调用方发现调用ServicePublisher.publishService(providerConfig, false);} catch (Throwable t) {throw new RpcException("error while adding service:" + providerConfig, t);}}

其中在添加解析服务中,主要进行了以下流程:
判断是否配置了版本, 如果配置了,生成带版本的urlWithVersion,更新key=urlWithVersion的服务,同时如果大于key=url的对应服务版本,会用新版本覆盖默认url版本
如果服务实现了InitializingService接口,调用实现的initialize方法
调用ServiceMethodFactory.init(url)方法,用来初始化调用ServiceMethodFactory的ServiceMethodCache:遍历ServicePublisher的所有服务提供类,建立ServiceMethodCache,存储该类下所有满足要求的方法和方法id的映射关系
首先会忽略掉Object和Class的所有方法
过滤方法后,判断是否需要压缩,根据url+"#"+方法名的方式进行hash。
代码如下

public static <T> void addService(ProviderConfig<T> providerConfig) throws Exception {if (logger.isInfoEnabled()) {logger.info("add service:" + providerConfig);}String version = providerConfig.getVersion();String url = providerConfig.getUrl();// 默认版本,直接以url为keyif (StringUtils.isBlank(version)) {serviceCache.put(url, providerConfig);} else {// urlWithVersion = url + "_" + versionString urlWithVersion = getServiceUrlWithVersion(url, version);if (serviceCache.containsKey(url)) {// 如果已经存在,覆盖服务serviceCache.put(urlWithVersion, providerConfig);ProviderConfig<?> providerConfigDefault = serviceCache.get(url);String defaultVersion = providerConfigDefault.getVersion();// 如果默认服务存在默认版本,并且小于当前版本,用当前版本服务更新默认服务版本if (!StringUtils.isBlank(defaultVersion)) {if (VersionUtils.compareVersion(defaultVersion, providerConfig.getVersion()) < 0) {serviceCache.put(url, providerConfig);}}} else {// 将当前版本设为指定版本服务和默认版本服务serviceCache.put(urlWithVersion, providerConfig);// use this service as the default providerserviceCache.put(url, providerConfig);}}// 如果服务实现了InitializingService接口,调用实现的initialize方法T service = providerConfig.getService();if (service instanceof InitializingService) {((InitializingService) service).initialize();}// 解析接口自定义方法,根据方法名,参数等相关信息记录方法ServiceMethodFactory.init(url);
}// ServiceMethodFactory.init(url);方法实现如下:
public static void init(String url) {getServiceMethodCache(url);
}
// 具体调用了
public static ServiceMethodCache getServiceMethodCache(String url) {// 是否存在指定url的ServiceMethodCacheServiceMethodCache serviceMethodCache = methods.get(url);if (serviceMethodCache == null) {// 获取指定url的providerConfigMap<String, ProviderConfig<?>> services = ServicePublisher.getAllServiceProviders();ProviderConfig<?> providerConfig = services.get(url);if (providerConfig != null) {Object service = providerConfig.getService();Method[] methodArray = service.getClass().getMethods();serviceMethodCache = new ServiceMethodCache(url, service);// 遍历指定url的所有服务方法for (Method method : methodArray) {// 忽略掉Object和Class的所有方法if (!ingoreMethods.contains(method.getName())) {method.setAccessible(true);serviceMethodCache.addMethod(method.getName(), new ServiceMethod(service, method));if (isCompact) {// 压缩url,方法名等调用所需信息int id = LangUtils.hash(url + "#" + method.getName(), 0, Integer.MAX_VALUE);ServiceId serviceId = new ServiceId(url, method.getName());ServiceId lastId = CompactRequest.PROVIDER_ID_MAP.putIfAbsent(id, serviceId);// 检查如果存在相同id服务方法,抛异常if (lastId != null && !serviceId.equals(lastId)) {throw new IllegalArgumentException("same id for service:" + url + ", method:"+ method.getName());}}}}// 更新缓存methods.put(url, serviceMethodCache);}}return serviceMethodCache;
}

服务的启动主要是指netty服务端接口监听开启,请求处理线程池初始化及启动(处理请求调用链的线程池),代码如下:

这里我们重点看下服务的发布方法ServicePublisher.publishService主要逻辑

如果服务需要注册,这里调用了publishServiceToRegistry方法去注册服务,通过在ZK上创建一个持久节点完成注册,然后通知服务变化。

服务的发现

我们来看下调用方又是如何获得服务节点信息的

服务节点信息获取是在调用方获取serviceProxy动态代理时完成的,客户端动态代理的实现我们前面一篇已经说过,这里只看发现的部分

这里实现了对客户端的注册和服务发现

这一步主要将调用方相关信息注册到注册中心中,大致实现如下
1.获取服务地址列表,这里主要从注册中心zk获取,具体获取地址如/DP/SERVER/com.dianping.pigeon.demo.EchoService,获取到的实际值是ip:port,如172.23.51.30:6354,如果有多个ip:port,会以逗号分割
遍历服务列表,从注册红心获取每个服务地址的权重,如通过/DP/WEIGHT/172.23.51.30:6354获取,获取到ip,port,weight后,会封装成一个HostInfo
2.通过RegistryEventListener发布providerAdded和serverInfoChanged事件,对于providerAdded事件:
会导致ClientManager.InnerServiceProviderChangeListener触发进行客户端注册,首先会初始化一个ConnectInfo,包含serviceName, host, port, weight等信息,然后会进行以下两步:
进一步会触发相关的ClusterListener调用addConnect,这里主要包含:
触发NettyClientFactory#createClient调用,进一步创建NioClientSocketChannelFactory,建立和服务端的netty连接,并建立心跳检测
触发WeightFactorMaintainer#providerAdded,进一步调用addWeight方法,更新weights和weightFactors成员。
在RegistryManager中添加服务引用地址,更新referencedServiceAddresses和referencedAddresses
触发WeightFactorMaintainer#providerAdded,进一步调用addWeight方法,更新weights和weightFactors成员。这里和前面一步有重复

public Set<HostInfo> registerClients(InvokerConfig invokerConfig) {String remoteAppkey = invokerConfig.getRemoteAppKey();String serviceName = invokerConfig.getUrl();String group = RegistryManager.getInstance().getGroup(serviceName);String vip = invokerConfig.getVip();logger.info("start to register clients for service '" + serviceName + "#" + group + "'");String localHost = null;if (vip != null && vip.startsWith("console:")) {localHost = configManager.getLocalIp() + vip.substring(vip.indexOf(":"));}// 从注册中心获取服务地址String serviceAddress = getServiceAddress(invokerConfig);String[] addressArray = serviceAddress.split(",");Set<HostInfo> addresses = Collections.newSetFromMap(new ConcurrentHashMap<HostInfo, Boolean>());for (int i = 0; i < addressArray.length; i++) {if (StringUtils.isNotBlank(addressArray[i])) {// addressList.add(addressArray[i]);String address = addressArray[i];int idx = address.lastIndexOf(":");if (idx != -1) {String host = null;int port = -1;try {host = address.substring(0, idx);port = Integer.parseInt(address.substring(idx + 1));} catch (RuntimeException e) {logger.warn("invalid address:" + address + " for service:" + serviceName);}if (host != null && port > 0) {if (localHost != null && !localHost.equals(host + ":" + port)) {continue;}try {// 从注册中心获取相应服务权重int weight = RegistryManager.getInstance().getServiceWeight(address, serviceName, false);addresses.add(new HostInfo(host, port, weight));} catch (Throwable e) {logger.error("error while registering service invoker:" + serviceName + ", address:"+ address + ", env:" + configManager.getEnv(), e);throw new ServiceUnavailableException("error while registering service invoker:"+ serviceName + ", address:" + address + ", env:" + configManager.getEnv(), e);}}} else {logger.warn("invalid address:" + address + " for service:" + serviceName);}}}final String url = serviceName;long start = System.nanoTime();if (enableRegisterConcurrently) {final CountDownLatch latch = new CountDownLatch(addresses.size());// 并发发布providerAdded和serverInfoChanged时间给RegistryEventListenerfor (final HostInfo hostInfo : addresses) {Runnable r = new Runnable() {@Overridepublic void run() {try {RegistryEventListener.providerAdded(url, hostInfo.getHost(), hostInfo.getPort(),hostInfo.getWeight());RegistryEventListener.serverInfoChanged(url, hostInfo.getConnect());} catch (Throwable t) {logger.error("failed to add provider client:" + hostInfo, t);} finally {latch.countDown();}}};registerThreadPool.submit(r);}try {latch.await();} catch (InterruptedException e) {logger.info("", e);}} else {//顺序执行for (final HostInfo hostInfo : addresses) {RegistryEventListener.providerAdded(url, hostInfo.getHost(), hostInfo.getPort(), hostInfo.getWeight());RegistryEventListener.serverInfoChanged(url, hostInfo.getConnect());}}long end = System.nanoTime();logger.info("end to register clients for service '" + serviceName + "#" + group + "', cost:"+ ((end - start) / 1000000));return addresses;}

Pigeon服务的注册与发现相关推荐

  1. Chapter 1 快速搭建-服务的注册与发现(Eureka)

    Chapter 1 快速搭建-服务的注册与发现(Eureka) 一.Spring Cloud简介 为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智能路由,微代 ...

  2. 史上最简单的 SpringCloud 教程 | 第一篇: 服务的注册与发现(Eureka)

    最新Finchley版本请访问: https://www.fangzhipeng.com/springcloud/2018/08/30/sc-f1-eureka/ 或者 http://blog.csd ...

  3. .NET Core微服务之路:基于Consul最少集群实现服务的注册与发现(一)

    原文:.NET Core微服务之路:基于Consul最少集群实现服务的注册与发现(一) Consul介绍 Consul是HashiCorp公司推出的开源工具[开源地址:https://github.c ...

  4. 第一篇:服务的注册与发现Eureka(Finchley版本)V2.0_dev

    Eureka 简介: Eureka是Netflix 开源的服务发现组件, Spring Cloud 将其集成在 Spring Cloud Netflix 中,实现服务的注册和发现.Eureka 主要包 ...

  5. 基于Zookeeper实现简易版服务的注册与发现机制

    一.功能要求 基于Zookeeper实现简易版服务的注册与发现机制 启动2个服务端 将服务端IP和端口信息注册到Zookeeper上 启动1个客户端 从Zookeeper中获取2个服务端节点信息 客户 ...

  6. springboot使用curator实现服务的注册和发现

    本文来说下springboot使用curator实现服务的注册和发现 文章目录 概述 概述

  7. SpringCloud 教程 | 第一篇: 服务的注册与发现Eureka

    SpringCloud 教程 | 第一篇: 服务的注册与发现Eureka(Finchley版本) 原文首发于:https://www.fangzhipeng.com/springcloud/2018/ ...

  8. java B2B2C springmvc mybatis电子商务平台源码-服务的注册与发现(Eureka)

    1.介绍 对于微服务的治理而言,其核心就是服务的注册和发现.在SpringCloud 中提供了多种服务注册与发现组件:Eureka,Consul,Zookeeper.官方推荐使用Eureka. 需要J ...

  9. 服务引用定义配置(服务的注册与发现)

    本文大部分摘自极客时间胡忠想老师的<从0开始学微服务>课程,对原文做了简单概括和修改 XML 配置方式的服务发布和引用的具体流程,简单来说就是 服务提供者定义好接口,并且在服务发布配置文件 ...

  10. Spring Cloud学习笔记(三)Eureka 服务的注册与发现

    目录 注册中心 Eureka 介绍 Eureka Server 服务注册 提供注册表 同步状态 Eureka Client:注册中心客户端 Register: 服务注册 Renew: 服务续约 Evi ...

最新文章

  1. webstorm两个文件比对_webstorm/phpstorm配置连接ftp快速进行文件比较(上传下载/同步)操作...
  2. 时间组件选择一个时间段_衡南(光伏支架组件安装)施工队
  3. jeecg输入中文查询导表为空_学术利器—SCI期刊影响因子查询/中文核心期刊查询系统更新...
  4. php中函数前加符号的作用分解
  5. 期望为线性的选择算法
  6. 跨域cookie设置
  7. java 访问Domino LOtus 数据库
  8. json学习笔记,json与js对象格式的转换,js对象转json字符串,json格式转js对象
  9. Teclast/台电32G SM3267AC H27UDG8M2MTR 量产记录
  10. easyar android 开发,EasyAR 使用Unity如何导出android项目
  11. [转] 心态是最大的本钱;人无完人,重要的是怎么做人
  12. linux 计算程序运行时间
  13. bugtraq mysql,Oracle MySQL Server远程安全漏洞(CVE-2017-3459)
  14. 6617: Finite Encyclopedia of Integer Sequences
  15. Chrome Performance 页面性能分析
  16. 北京大学软件与微电子学院学习经验文章集78篇和1个专题
  17. fiilt1左耳无法同步_FIIL T1 X真无线运动耳机体验:闪连快充秒同步 媲美AirPods
  18. HTML实训实训心得
  19. catia怎样倒2d_CATIA如何使用实例化2D部件做块
  20. ubuntu18.04彻底卸载fcitx

热门文章

  1. Autoit+selenium+python实现文件上传功能
  2. python json.dumps(output) ^ SyntaxError: invalid syntax
  3. (转)《蜗居》带给校园男女多少悲喜
  4. CSS选择器(随笔)
  5. 3D打印:三维智能数字化创造(全彩)
  6. python 002 __小斌文档 | ipython的基本使用
  7. 耗时两天,Html实现小米官网
  8. Python练手项目:计算机自动还原魔方视频
  9. python_使用marshal模块序列化
  10. 路由器重温——MP配置管理