导语
  上一篇博客中介绍了关于Eureka Client源码的基础部分,如果对于基础部分不是很了解的读者可以点击下面的连接进入到源码分析一中,从头开始学习

Spring Cloud微服务系列

DiscoveryClient构造函数

第一步 构造函数

public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {

第二步 找到实际实现操作

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider)

完成上面两步之后,首先来暂停一下,来两个参数ApplicationInfoManager和EurekaClientConfig,这两个参数
ApplicationInfoManager
  作为应用信息管理器,管理服务实例的信息类InstanceInfo和服务实例的配置信息类EurekaInstanceConfig。
EurekaClientConfig
  封装Eureka Client 自身服务实例的配置信息,主要用于构建InstanceInfo通常这些信息在配置文件中的eureka.instance前缀下进行配置,Spring Cloud 通过 EurekaInstanceConfigBean配置类提供默认配置。

对于AbstractDiscoveryClientOptionalArgs参数来说,主要是用来注入一些可选择参数,以及jersey1和jersey2通用的过滤器。BackupRegistry 参数则是被用来当做备份的注册中心,当EurekaClient 无法从任何的EurekaServer 中获取注册表信息的时候,BackupRegistry将会被调用用来获取注册信息。默认的实现是

@ImplementedBy(NotImplementedRegistryImpl.class)
public interface BackupRegistry {Applications fetchRegistry();Applications fetchRegistry(String[] includeRemoteRegions);
}

  在构造方法中,忽略掉啊了构造方法中赋值操作,其中有一部分代码还是比较关键的。

 if (config.shouldFetchRegistry()) {this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});} else {this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}if (config.shouldRegisterWithEureka()) {this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});} else {this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}

这里config.shouldFetchRegistry()对应的是eureka.client.fetch-register当这个配置为true的时候,表示Eureka Client 将从Eureka Server中拉取注册表信息。config.shouldRegisterWithEureka()则是对应eureka.client.register-with-eureka 当它为true的时候表示Eureka Client 将注册到Eureka Server中,如果上面两个配置都为false,那么Discovery的初始化将直接结束,表示这个客户端既不进行服务注册也不提供服务发现。

 // default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());  // use direct handoffcacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());  // use direct handoff

  定义了一个基于线程池的定时器线程池ScheduledExecutorService,一个线程池用于发送心跳,一个线程池用于刷新缓存,同时定义了上面所示的线程池操作。

private final ScheduledExecutorService scheduler;// additional executors for supervised subtasks
private final ThreadPoolExecutor heartbeatExecutor;
private final ThreadPoolExecutor cacheRefreshExecutor;

  之后初始化Eureka Client 和 Eureka Server进行HTTP交互的Jersey客户端,其中EurekaTransport是DiscoveryClient的内部类

eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);

  EurekaTransport封装了,DiscoveryClient与Eureka Server进行HTTP调用的Jersey客户端。代码如下

  private static final class EurekaTransport {private ClosableResolver bootstrapResolver;private TransportClientFactory transportClientFactory;private EurekaHttpClient registrationClient;private EurekaHttpClientFactory registrationClientFactory;private EurekaHttpClient queryClient;private EurekaHttpClientFactory queryClientFactory;void shutdown() {if (registrationClientFactory != null) {registrationClientFactory.shutdown();}if (queryClientFactory != null) {queryClientFactory.shutdown();}if (registrationClient != null) {registrationClient.shutdown();}if (queryClient != null) {queryClient.shutdown();}if (transportClientFactory != null) {transportClientFactory.shutdown();}if (bootstrapResolver != null) {bootstrapResolver.shutdown();}}}

  接下来就是从Eureka Server中拉取注册表信息

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {fetchRegistryFromBackup();
}

  如果clientConfig.shouldFetchRegistry()为true时,fetchRegistry方法将会被调用在EurekaClient 向Eureka Server 注册前,需要先从Eureka Server拉取注册表中的信息。这是服务发现的提前,通过将Eureka Server 中注册的信息缓存到本地,就可以获取其他服务的信息,减少了与Eureka Server 的网络通信消耗。
  拉取完服务注册表的信息之后,将对服务进行注册,代码如下

 // call and execute the pre registration handler before all background tasks (inc registration) is startedif (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();}if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {try {//发起服务注册if (!register() ) {//服务注册失败抛出异常throw new IllegalStateException("Registration error at startup. Invalid server response.");}} catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());throw new IllegalStateException(th);}}// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetchinitScheduledTasks(); // 初始化定时任务

  在服务注册之前会进行注册预处理操作,Eureka 没有提供默认的实现,构造函数的最后初始化并且启动了发送心跳、缓存刷新和按需注册等定时任务。

最后总结一下在DiscoveryClient的构造函数中都进行了那些操作

  • 1、相关配置信息的赋值操作
  • 2、备份注册中心的初始化,默认没有实现
  • 3、拉取Eureka Server 注册表的信息
  • 4、注册前预处理操作
  • 5、向Eureka Server中注册自己
  • 6、初始化定时任务。

拉取注册表信息

  在上面代码中,知道在DiscoveryClient构造函数中,会调用fetchRegistry方法从EurekaServer中拉取注册表信息。

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {fetchRegistryFromBackup();
}

具体拉取方法如下

  private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();try {// If the delta is disabled or if it is the first time, get all// applications//如果增量式拉取信息被禁止,或者Application 为null ,进行全量拉取Applications applications = getApplications();if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());logger.info("Force full registry fetch : {}", forceFullRegistryFetch);logger.info("Application is null : {}", (applications == null));logger.info("Registered Applications size is zero : {}",(applications.getRegisteredApplications().size() == 0));logger.info("Application version is -1: {}", (applications.getVersion() == -1));//全量拉取注册表信息getAndStoreFullRegistry();} else {//增量拉取注册表信息getAndUpdateDelta(applications);}//计算应用集合一致性哈希码applications.setAppsHashCode(applications.getReconcileHashCode());//打印注册表上所有服务的总数logTotalInstances();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);return false;} finally {if (tracer != null) {tracer.stop();}}// Notify about cache refresh before updating the instance remote status//在更新远程实例之前推送缓存刷新事件,但是Eureka中并没有提供默认事件监听器onCacheRefreshed();// Update remote status based on refreshed data held in the cache//基于缓存中被刷新的数据跟新远程实例状态updateInstanceRemoteStatus();// registry was fetched successfully, so return true//拉取成功返回为truereturn true;}

  一般在Eureka客户端,除了第一次拉取注册表信息,之后都会尝试增量拉取,下面就来看看拉取注册表信息的两种实现方式。

1、全量拉取注册表信息

  private void getAndStoreFullRegistry() throws Throwable {//获取拉取的注册表的版本,防止拉取版本落后,这个是多线程引起long currentUpdateGeneration = fetchRegistryGeneration.get();logger.info("Getting all instance registry info from the eureka server");Applications apps = null;EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());//获取成功if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();}logger.info("The response status is {}", httpResponse.getStatusCode());if (apps == null) {logger.error("The application is null for some reason. Not storing this information");//检查fetchRegistryGeneration是否发生版本变化,无改变的话说明本次拉取是最新的} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {//从apps中筛选出状态为UP的实例,同时打乱实例的顺序,防止同一个服务在不同实例在启动时接收流量。localRegionApps.set(this.filterAndShuffle(apps));logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());} else {logger.warn("Not updating applications as another thread is updating it already");}}

  从上面代码中可以看出全量拉取的注册表中的服务实例信息都被封装到了Applications里面,经过处理之后替换本地的缓存Applications。在com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient中实现了对于apps的调用操作。这个接口可以获取到所有的App信息。http://localhost:8761/eureka/apps

  @Overridepublic EurekaHttpResponse<Applications> getApplications(String... regions) {return getApplicationsInternal("apps/", regions);}

  getAndStoreFullRegistry方法可以被多个线程同时调用,导致新拉取的注册表被旧的注册表覆盖,多线程线程安全问题,从而导致不生效,或者产生脏数据。因此,Eureka通过类型为AtomicLong的currentUpdateGeneration对apps进行版本跟踪最终保留最新数据
2、增量式拉取注册信息

    private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications delta = null;EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();}if (delta == null) {logger.warn("The server does not allow the delta revision to be applied because it is not safe. "+ "Hence got the full registry.");getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());String reconcileHashCode = "";if (fetchRegistryUpdateLock.tryLock()) {try {updateDelta(delta);reconcileHashCode = getReconcileHashCode(applications);} finally {fetchRegistryUpdateLock.unlock();}} else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");}// There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall}} else {logger.warn("Not updating application delta as another thread is updating it already");logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());}}

在根据从 Server 拉取信息缓存到本地的时候 ,Eureka定义了ActionType标记变更状态

  public enum ActionType {ADDED, // 添加Eureka ServerMODIFIED, // 在Eureka Server中信息发生改变DELETED  // 被从Eureka Server中删除}

更具在InstanceInfo.ActionType的不同,对于数据进行不同的操作。

private void updateDelta(Applications delta) {int deltaCount = 0;for (Application app : delta.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {Applications applications = getApplications();String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {Applications remoteApps = remoteRegionVsApps.get(instanceRegion);if (null == remoteApps) {remoteApps = new Applications();remoteRegionVsApps.put(instanceRegion, remoteApps);}applications = remoteApps;}++deltaCount;//变更类型操作if (ActionType.ADDED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);} else if (ActionType.MODIFIED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}logger.debug("Modified instance {} to the existing apps ", instance.getId());applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);} else if (ActionType.DELETED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}logger.debug("Deleted instance {} to the existing apps ", instance.getId());applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);}}}logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);getApplications().setVersion(delta.getVersion());getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());for (Applications applications : remoteRegionVsApps.values()) {applications.setVersion(delta.getVersion());applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());}
}

 本地注册表更新之后会通过一致性哈希做处理操作。保证数据的一致性,通过http://localhost:8761/eureka/apps/delta 接口可以看到如下效果

总结

  通过上面内容分析,了解了DiscoveryClient构造函数以及服务注册信息的拉取规则。了解了全量拉取服务注册信息和增量拉取服务注册信息的区别与联系。

Spring Cloud微服务系列-Eureka Client源码解析(二)相关推荐

  1. Spring Cloud微服务系列-Eureka Client源码解析(一)

    导语   Eureka Client 是为了简化开发人员的开发工作,将很多的Eureka Server交互的工作进行了封装,在使用的时候自动完成,在应用的不同阶段来完成不同的功能实现.下面就来了解一下 ...

  2. Spring Cloud微服务之eureka+client入门

    Spring Cloud微服务之eureka+client入门 谈到服务,想到一种"懒人思维".家政服务为懒人收拾家务提供一种便利,快餐服务为不爱做饭的懒人提供一种方便.....等 ...

  3. Spring Cloud微服务系列文,服务调用框架Feign

    之前博文的案例中,我们是通过RestTemplate来调用服务,而Feign框架则在此基础上做了一层封装,比如,可以通过注解等方式来绑定参数,或者以声明的方式来指定请求返回类型是JSON.    这种 ...

  4. 福利继续:赠书《Spring Cloud微服务-全栈技术与案例解析》

    <Spring Cloud微服务-全栈技术与案例解析> 在互联网时代,互联网产品的最大特点就是需要快速发布新功能,支持高并发和大数据.传统的架构已经慢慢不能支撑互联网业务的发展,这时候微服 ...

  5. Java生鲜电商平台-秒杀系统微服务架构设计与源码解析实战

    Java生鲜电商平台-秒杀系统微服务架构设计与源码解析实战 Java生鲜电商平台-  什么是秒杀 通俗一点讲就是网络商家为促销等目的组织的网上限时抢购活动 比如说京东秒杀,就是一种定时定量秒杀,在规定 ...

  6. spring cloud微服务治理eureka、hystrix、zuul代码例子

    spring cloud微服务中台服务代码例子,包括eureka.hystrix.zuul https://github.com/birdstudiocn/spring-cloud-sample/tr ...

  7. Spring Cloud Gateway一次请求调用源码解析

    简介: 最近通过深入学习Spring Cloud Gateway发现这个框架的架构设计非常简单.有效,很多组件的设计都非常值得学习,本文就Spring Cloud Gateway做一个简单的介绍,以及 ...

  8. 电商平台 高并发 微服务 方案_Java生鲜电商平台-秒杀系统微服务架构设计与源码解析实战...

    Java生鲜电商平台- 什么是秒杀 通俗一点讲就是网络商家为促销等目的组织的网上限时抢购活动 比如说京东秒杀,就是一种定时定量秒杀,在规定的时间内,无论商品是否秒杀完毕,该场次的秒杀活动都会结束.这种 ...

  9. 《Spring Cloud 微服务架构进阶》读书笔记

    前页 随着 DevOps 和以 Docker 为主的容器技术的发展,云原生应用架构和微服 务变得流行起来. 云原生包含的内容很多,如 DevOps.持续交付.微服务.敏捷等 第一章,微服务架构介绍 架 ...

最新文章

  1. 依赖注入框架Autofac学习笔记
  2. MAC Pro 同时安装 Python2 和 Python3
  3. 直播源码和短视频源码,相亲相爱的一家人
  4. 可视化_仓库管理可视化
  5. sqlserver evaluation是什么版本_使用SSMS扫描和查找SQL Server数据库的潜在安全漏洞...
  6. query row php,php – 如何在Codeigniter上使用$query- row获取类对象
  7. 太难了!用Python数据造假后,我被公司升职加薪了~
  8. Ubuntu18.0.4配置Hadoop1.2.1环境
  9. RMAN 学习过程之四,备份演练进阶篇
  10. 详细讲解黑客常用的远程控制木马
  11. 工行银企互联接入详解(1)--流程说明
  12. Struts+Spring+Hibernate处理Lob(Blob,Clob)--sessionFactory中注入 org.springframework.jdbc.support.lob.Def
  13. 外汇交易中正确的策略是什么标准呢?ForexClub相信要做到这5点
  14. 老熊的三分地-Oracle、UNIX、数据恢复
  15. python做cae库,基于Python的CAE自动后处理开发
  16. 多普勒效应多径效应慢衰落、快衰落
  17. Marvolo Gaunt's Ring ---CodeForces - 855B(思维题)
  18. 【创业笔记】团队建设--团队氛围的营造
  19. 天津大学与东南大学计算机专业,名气小但强大的985大学-东南大学尴尬名校?...
  20. Xshell如何配置并远程连接Linux服务器详解

热门文章

  1. html判断输入是否为空格,javascript怎么判断是否为空格?
  2. C语言CRC32 逆向算法源码
  3. spring+redis自主实现分布式session(非spring-session方式)
  4. 父类、派生类、方法重写、实例化后的执行顺序
  5. 使用sysbench来测试Row Cache解惑
  6. Oracle使用Sql把XML解析成表(Table)的方法
  7. vs2010 代码混淆 代码加密
  8. python库的学习系列之 13.2. ConfigParser — Configuration file parser
  9. 【leetcode】390. Elimination Game
  10. 【176天】黑马程序员27天视频学习笔记【Day11-上】