Eureka-Server是如何判断一个服务不可用的?

Eureka是通过心跳续约的方式来检查各个服务提供者的健康状态。

实际上,在判断服务不可用这个部分,会分为两块逻辑。

  1. Eureka-Server需要定期检查服务提供者的健康状态。
  2. Eureka-Client在运行过程中需要定期更新注册信息。

Eureka的心跳续约机制如下图所示。

  1. 客户端在启动时, 会开启一个心跳任务,每隔30s向服务单发送一次心跳请求。
  2. 服务端维护了每个实例的最后一次心跳时间,客户端发送心跳包过来后,会更新这个心跳时间。
  3. 服务端在启动时,开启了一个定时任务,该任务每隔60s执行一次,检查每个实例的最后一次心跳时间是否超过90s,如果超过则认为过期,需要剔除。

关于上述流程中涉及到的时间,可以通过以下配置来更改.

#Server 至上一次收到 Client 的心跳之后,等待下一次心跳的超时时间,在这个时间内若没收到下一次心跳,则将移除该 Instance。
eureka.instance.lease-expiration-duration-in-seconds=90
# Server 清理无效节点的时间间隔,默认60000毫秒,即60秒。
eureka.server.eviction-interval-timer-in-ms=60

客户端心跳发起流程

心跳续约是客户端发起的,每隔30s执行一次。

DiscoveryClient.initScheduledTasks

继续回到
DiscoveryClient.initScheduledTasks 方法中,

private void initScheduledTasks() {//省略....heartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread());scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);//省略....
}

renewalIntervalInSecs=30s, 默认每隔30s执行一次。

HeartbeatThread

这个线程的实现很简单,调用 renew() 续约,如果续约成功,则更新最后一次心跳续约时间。

private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}
}

在 renew() 方法中,调用EurekaServer的 "apps/" + appName + '/' + id; 这个地址,进行心跳续约。

boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}
}

服务端收到心跳处理

服务端具体为调用[
com.netflix.eureka.resources]包下的InstanceResource类的renewLease方法进行续约,代码如下

@PUT
public Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);//调用renew进行续约boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);// Not found in the registry, immediately ask for a registerif (!isSuccess) { //如果续约失败,返回异常logger.warn("Not Found (Renew): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponse response;//校验客户端与服务端的时间差异,如果存在问题则需要重新发起注册if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);// Store the overridden status since the validation found out the node that replicates winsif (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()&& (overriddenStatus != null)&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))&& isFromReplicaNode) {registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));}} else {response = Response.ok().build(); // 续约成功,返回200}logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());return response;
}

InstanceRegistry.renew

renew的实现方法如下,主要有两个流程

  1. 从服务注册列表中找到匹配当前请求的实例
  2. 发布EurekaInstanceRenewedEvent事件
@Override
public boolean renew(final String appName, final String serverId,boolean isReplication) {log("renew " + appName + " serverId " + serverId + ", isReplication {}"+ isReplication);//获取所有服务注册信息List<Application> applications = getSortedApplications();for (Application input : applications) { //逐一遍历if (input.getName().equals(appName)) { //如果当前续约的客户端和某个服务注册信息节点相同InstanceInfo instance = null;for (InstanceInfo info : input.getInstances()) { //遍历这个服务集群下的所有节点,找到某个匹配的实例instance返回。if (info.getId().equals(serverId)) {instance = info; //break;}}//发布EurekaInstanceRenewedEvent事件,这个事件在EurekaServer中并没有处理,我们可以监听这个事件来做一些事情,比如做监控。publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,instance, isReplication));break;}}return super.renew(appName, serverId, isReplication);
}

super.renew

public boolean renew(final String appName, final String id, final boolean isReplication) {if (super.renew(appName, id, isReplication)) { //调用父类的续约方法,如果续约成功replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); //同步给集群中的所有节点return true;}return false;
}

AbstractInstanceRegistry.renew

在这个方法中,会拿到应用对应的实例列表,然后调用Lease.renew()去进行心跳续约。

public boolean renew(String appName, String id, boolean isReplication) {RENEW.increment(isReplication);Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); //根据服务名字获取实例信息Lease<InstanceInfo> leaseToRenew = null;if (gMap != null) { leaseToRenew = gMap.get(id);  //获取需要续约的服务实例,}if (leaseToRenew == null) { //如果为空,说明这个服务实例不存在,直接返回续约失败RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);return false;} else { //表示实例存在InstanceInfo instanceInfo = leaseToRenew.getHolder(); //获取实例的基本信息if (instanceInfo != null) { //实例基本信息不为空// touchASGCache(instanceInfo.getASGName());//获取实例的运行状态InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { //如果运行状态未知,也返回续约失败logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);return false;}//如果当前请求的实例信息if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),overriddenInstanceStatus.name(),instanceInfo.getId());instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}//更新上一分钟的续约数量renewsLastMin.increment();leaseToRenew.renew(); //续约return true;}
}

续约的实现,就是更新服务端最后一次收到心跳请求的时间。

public void renew() {lastUpdateTimestamp = System.currentTimeMillis() + duration;}

Eureka的自我保护机制

实际,心跳检测机制有一定的不确定性,比如服务提供者可能是正常的,但是由于网络通信的问题,导致在90s内没有收到心跳请求,那将会导致健康的服务被误杀。

为了避免这种问题,Eureka提供了一种叫 自我保护 机制的东西。简单来说,就是开启自我保护机制后,Eureka Server会包这些服务实例保护起来,避免过期导致实例被剔除的问题,从而保证Eurreka集群更加健壮和稳定。

进入自我保护状态后,会出现以下几种情况

  • Eureka Server不再从注册列表中移除因为长时间没有收到心跳而应该剔除的过期服务,如果在保护期内如果服务刚好这个服务提供者非正常下线了,此时服务消费者就会拿到一个无效的服务实例,此时会调用失败,对于这个问题需要服务消费者端要有一些容错机制,如重试,断路器等!
  • Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其他节点上,保证当前节点依然可用。

Eureka自我保护机制,通过配置
eureka.server.enable-self-preservation 来【 true 】打开/【 false 禁用】自我保护机制,默认打开状态,建议生产环境打开此配置。

自我保护机制应该如何设计,才能更加精准地控制到 “是网络异常” 导致的通信延迟,而不是服务宕机呢?

Eureka是这么做的: 如果低于85%的客户端节点都没有正常的心跳,那么Eureka Server就认为客户端与注册中心出现了网络故障,Eureka Server自动进入自我保护状态 .

其中, 85% 这个阈值,可以通过下面这个配置来设置

# 自我保护续约百分比,默认是0.85
eureka.server.renewal-percent-threshold=0.85

但是还有个问题,超过谁的85%呢?这里有一个预期的续约数量,这个数量计算公式如下:

//自我保护阀值 = 服务总数 * 每分钟续约数(60S/客户端续约间隔) * 自我保护续约百分比阀值因子

假设如果有 100 个服务,续约间隔是 30S ,自我保护阈值 0.85 ,那么它的预期续约数量为:

自我保护阈值 =100 * 60 / 30 * 0.85 = 170。

自动续约的阈值设置

在EurekaServerBootstrap这个类的 contextInitialized 方法中,会调用 initEurekaServerContext 进行初始化

public void contextInitialized(ServletContext context) {try {initEurekaEnvironment();initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);}catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);}
}

继续往下看。

protected void initEurekaServerContext() throws Exception {EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();//...registry.openForTraffic(applicationInfoManager, registryCount);
}

在openForTraffic方法中,会初始化
expectedNumberOfClientsSendingRenews 这个值,这个值的含义是: 预期每分钟收到续约的客户端数量,取决于注册到eureka server上的服务数量

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// Renewals happen every 30 seconds and for a minute it should be a factor of 2.this.expectedNumberOfClientsSendingRenews = count; //初始值是1.updateRenewsPerMinThreshold();logger.info("Got {} instances from neighboring DS node", count);logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");applicationInfoManager.setInstanceStatus(InstanceStatus.UP);super.postInit();
}

updateRenewsPerMinThreshold

接着调用
updateRenewsPerMinThreshold 方法,会更新一个每分钟最小的续约数量,也就是Eureka Server期望每分钟收到客户端实例续约的总数的阈值。如果小于这个阈值,就会触发自我保护机制。

protected void updateRenewsPerMinThreshold() {this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())* serverConfig.getRenewalPercentThreshold());
}
//自我保护阀值 = 服务总数 * 每分钟续约数(60S/客户端续约间隔) * 自我保护续约百分比阀值因子
  • getExpectedClientRenewalIntervalSeconds,客户端的续约间隔,默认为30s
  • getRenewalPercentThreshold,自我保护续约百分比阈值因子,默认0.85。 也就是说每分钟的续约数量要大于85%

预期值的变化触发机制

expectedNumberOfClientsSendingRenews 和
numberOfRenewsPerMinThreshold 这两个值,会随着新增服务注册以及服务下线的触发而发生变化。

PeerAwareInstanceRegistryImpl.cancel

当服务提供者主动下线时,表示这个时候Eureka-Server要剔除这个服务提供者的地址,同时也代表这这个心跳续约的阈值要发生变化。所以在
PeerAwareInstanceRegistryImpl.cancel 中可以看到数据的更新

调用路径
PeerAwareInstanceRegistryImpl.cancel -> AbstractInstanceRegistry.cancel->internalCancel

服务下线之后,意味着需要发送续约的客户端数量递减了,所以在这里进行修改

protected boolean internalCancel(String appName, String id, boolean isReplication) {//....synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to cancel it, reduce the number of clients to send renews.this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;updateRenewsPerMinThreshold();}}
}

PeerAwareInstanceRegistryImpl.register

当有新的服务提供者注册到eureka-server上时,需要增加续约的客户端数量,所以在register方法中会进行处理

register ->super.register(AbstractInstanceRegistry)
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {//....    // The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}
}

每隔15分钟刷新自我保护阈值

PeerAwareInstanceRegistryImpl.scheduleRenewalThresholdUpdateTask

每隔15分钟,更新一次自我保护阈值!

private void updateRenewalThreshold() {try {// 1. 计算应用实例数Applications apps = eurekaClient.getApplications();int count = 0;for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {if (this.isRegisterable(instance)) {++count;}}}synchronized (lock) {// Update threshold only if the threshold is greater than the// current expected threshold or if self preservation is disabled.//当节点数量count大于最小续约数量时,或者没有开启自我保护机制的情况下,重新计算expectedNumberOfClientsSendingRenews和numberOfRenewsPerMinThresholdif ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)|| (!this.isSelfPreservationModeEnabled())) {this.expectedNumberOfClientsSendingRenews = count;updateRenewsPerMinThreshold();}}logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);} catch (Throwable e) {logger.error("Cannot update renewal threshold", e);}
}

自我保护机制的触发

在 AbstractInstanceRegistry 的 postInit 方法中,会开启一个 EvictionTask 的任务,这个任务用来检测是否需要开启自我保护机制。

这个方法也是在EurekaServerBootstrap方法启动时触发。

protected void postInit() {renewsLastMin.start(); //开启一个定时任务,用来实现每分钟的续约数量,每隔60s归0重新计算if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}evictionTaskRef.set(new EvictionTask()); //启动一个定时任务EvictionTask,每隔60s执行一次evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}

其中,EvictionTask的代码如下。

private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);@Override
public void run() {try {//获取补偿时间毫秒数long compensationTimeMs = getCompensationTimeMs();logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);evict(compensationTimeMs);} catch (Throwable e) {logger.error("Could not run the evict task", e);}
}

evict方法

public void evict(long additionalLeaseMs) {logger.debug("Running the evict task");// 是否需要开启自我保护机制,如果需要,那么直接RETURE, 不需要继续往下执行了if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");return;}//这下面主要是做服务自动下线的操作的。
}

isLeaseExpirationEnabled

numberOfRenewsPerMinThreshold
public boolean isLeaseExpirationEnabled() {if (!isSelfPreservationModeEnabled()) {// The self preservation mode is disabled, hence allowing the instances to expire.return true;}return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

Spring Cloud Eureka源码分析之心跳续约及自我保护机制相关推荐

  1. 【SpringClould】Spring Cloud Eureka源码分析

    文章目录 1.概述 1.1 Eureka的一些概念 2.源码分析 2.1 Eureka Server源码 2.1.1 `@EnableEurekaServer`注解 2.1.2 EurekaServe ...

  2. Spring Cloud Eureka 源码分析(一) 服务端启动过程

    2019独角兽企业重金招聘Python工程师标准>>> 一. 前言 我们在使用Spring Cloud Eureka服务发现功能的时候,简单的引入maven依赖,且在项目入口类根据服 ...

  3. Java微服务组件Spring cloud ribbon源码分析

    微服务组件Spring Cloud Ribbon源码分析_哔哩哔哩_bilibili Ribbon源码分析 | ProcessOn免费在线作图,在线流程图,在线思维导图 | 1.什么是ribbon? ...

  4. Spring Cloud部分源码分析Eureka,Ribbon,Feign,Zuul

    Eureka SpringCloud Eureka使用NetFlix Eureka来实现的,它包括了服务端组件和客户端组件,并且都是用java 编写的. Eureka服务端就是服务注册中心, Eure ...

  5. 微服务发现与注册之Eureka源码分析

    作者:陌北有棵树,Java人,架构师社区合伙人! [一]微服务之服务发现概述 关于微服务,近年来可谓是大火,业界也吹刮着一种实践微服务的风潮.本人有幸在去年参与到一个向微服务过渡的产品,再结合自己所学 ...

  6. 【SpringCloud微服务】第3章 服务治理SpringCloudEureka(五)——Eureka源码分析

    2.8 Eureka 源码分析   首先,对于服务注册中心.服务提供者.服务消费者这三个主要元素来说,后两者(也就是Eureka客户端)在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处 ...

  7. Eureka源码分析

    Eureka源码分析 Eureka server 入口: Spring.factories PS: 意味着如果加载EurekaServerAutoConfiguration成功,需要 @Conditi ...

  8. Spring IOC 容器源码分析

    Spring IOC 容器源码分析 创建时间: 2017-11-15 00:00:00 [TOC] Spring 最重要的概念是 IOC 和 AOP,本篇文章其实就是要带领大家来分析下 Spring ...

  9. Spring IOC 容器源码分析系列文章导读

    1. 简介 前一段时间,我学习了 Spring IOC 容器方面的源码,并写了数篇文章对此进行讲解.在写完 Spring IOC 容器源码分析系列文章中的最后一篇后,没敢懈怠,趁热打铁,花了3天时间阅 ...

  10. Spring IOC 容器源码分析 - 余下的初始化工作

    1. 简介 本篇文章是"Spring IOC 容器源码分析"系列文章的最后一篇文章,本篇文章所分析的对象是 initializeBean 方法,该方法用于对已完成属性填充的 bea ...

最新文章

  1. 高性能的MySQL(6)查询慢与重构查询
  2. 关于Oracle 的url 连接 最后一个orcl的理解
  3. 武汉python培训哪一家好一些-武汉哪个Python培训机构比较好?
  4. python--微信小程序获取手机号码报错
  5. python分组求和_Python学习笔记之pandas索引列、过滤、分组、求和功能示例
  6. bzoj1670【Usaco2006 Oct】Building the Moat 护城河的挖掘
  7. html使用js的变量_2、温故而知新,再学一遍JavaScript-html中如何使用JS
  8. Pi 3B+编译安装python3.6.8
  9. DPDK内存篇(三): 标准大页、NUMA、DMA、IOMMU、IOVA、内存池
  10. openssl evp 对称加密(AES_ecb,ccb)
  11. HDU 1240 Asteroids!(DFS简单搜索)
  12. 邢台市初中计算机考试,2019年邢台中考总分多少分,邢台中考各个科目多少分
  13. VMware与Hyper-V不兼容
  14. 张小龙《微信产品观》PPT,经典值得收藏
  15. 【论文笔记】Face Anonymization by Manipulating Decoupled Identity Representation
  16. 从零开始学写脚本【第一天】
  17. linux mint借用deepin-wine安装QQ/微信
  18. mysql的identity_Mysql中Identity 详细介绍
  19. 对自己的反思 (闲暇中的面试总结)
  20. 苹果id登录_英雄联盟手游是用苹果ID好还是拳头好 账号选择建议

热门文章

  1. 使用Java的Graphics类进行绘图
  2. TCP/IP协议漏洞实验
  3. 本人的月末结账步骤备忘
  4. PS1应用之——修改linux终端命令行各字体颜色
  5. T410i开机显示fan error修复
  6. 基于java学生信息管理系统
  7. 计算机国际会议口头报告范例,国际会议报告开场白(共4篇).docx
  8. 移动Win7用户文件夹(我的文档)默认位置至D盘
  9. react-native 轮播组件 looped-carousel使用介绍
  10. 对国产操作操系统的一点看法