前言

服务续约

默认情况下,客户端的服务实例每隔30秒向Eureka服务端发送一次心跳。如果90秒之内Eureka服务端没有收到服务实例的心跳,该服务实例会被执行剔除任务的线程(每隔60秒执行一次)从注册的服务实例列表中剔除。

自我保护机制

如果15分钟之内,心跳发送失败的比例低于85%,就会触发Eureka服务端的自我保护机制。Eureka不会剔除通信不正常的服务实例,并且仍然接收客户端的服务的注册与服务的查询。但是不会与其它Eureka服务端节点进行同步。自我保护机制是一种针对网络异常波动的安全保护措施,可以使Eureka集群更加的健壮、稳定的运行。

Peer to peer 架构

Eureka对于多个副本之间的复制方式并没有采用主从复制,而是选择了对等复制,即peer to peer的模式。副本之间没有主从,每个副本都可以处理读写请求,通过彼此之间的数据复制使数据进行同步更新。但是数据同步可能会出现冲突。Eureka对此的解决方案是校验lastDirtyTimestamp。

校验lastDirtyTimestamp:在peer节点之间的复制请求中,如果其它peer节点传递的服务实例的lastDirtyTimestamp大于当前服务端本地存储的服务实例的lastDirtyTimestamp,则返回404,要求服务实例重新注册;如果小于,则返回409,要求其同步最新的数据信息。

源码分析

接下来对Eureka的服务续约的分析分为客户端逻辑和服务端逻辑两个部分。

服务续约(客户端逻辑)

一、DiscoveryClient

在 DiscoveryClient 构造器方法中的 initScheduledTasks 方法,有如下内容:

// 获取客户端需要向Eureka服务端发送心跳的时间间隔,默认30秒
// 可以在配置文件中指定 eureka.instance.leaseRenewalIntervalInSeconds 来修改默认值
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();// 获取心跳任务的指数补偿的相关属性,默认10
// 可以在配置文件中指定 eureka.client.heartbeatExecutorExponentialBackOffBound 来修改默认值
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// 创建心跳任务
heartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()
);
// 初始延迟30秒执行心跳任务,之后每隔30秒重复执行一次
scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);

二、TimedSupervisorTask

@Override
public void run() {Future<?> future = null;try {// 执行 HeartbeatThread#run 方法future = executor.submit(task);threadPoolLevelGauge.set((long) executor.getActiveCount());future.get(timeoutMillis, TimeUnit.MILLISECONDS); delay.set(timeoutMillis);threadPoolLevelGauge.set((long) executor.getActiveCount());successCounter.increment();} catch (TimeoutException e) {logger.warn("task supervisor timed out", e);timeoutCounter.increment();long currentDelay = delay.get();long newDelay = Math.min(maxDelay, currentDelay * 2);delay.compareAndSet(currentDelay, newDelay);} catch (RejectedExecutionException e) {if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn("task supervisor shutting down, reject the task", e);} else {logger.warn("task supervisor rejected the task", e);}rejectedCounter.increment();} catch (Throwable e) {if (executor.isShutdown() || scheduler.isShutdown()) {logger.warn("task supervisor shutting down, can't accept the task");} else {logger.warn("task supervisor threw an exception", e);}throwableCounter.increment();} finally {if (future != null) {future.cancel(true);}if (!scheduler.isShutdown()) {// 每隔30秒重复执行一次scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);}}
}

三、HeartbeatThread

private class HeartbeatThread implements Runnable {public void run() {// 如果续约成功if (renew()) {// 更新lastSuccessfulHeartbeatTimestamp属性为当前时间lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}
}
boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {// 向服务端发起服务续约httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());// 如果响应的状态码是404,则需要重新注册服务if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();// 服务重新注册boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}// 用响应的状态码是否是200来判断是否续约成功return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}
}

服务续约(服务端逻辑)

一、InstanceResource

@PUT
public Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);// 服务续约boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);// 如果续约失败,返回404if (!isSuccess) {logger.warn("Not Found (Renew): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}Response response;// 客户端传递的lastDirtyTimestamp不为空,并且服务端开启了“时间戳不一致时进行同步”(默认开启)// 可以通过 eureka.server.syncWhenTimestampDiffers 属性来修改默认值if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {// 校验客户端传递的lastDirtyTimestamp与服务端本地的lastDirtyTimestampresponse = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()&& (overriddenStatus != null)&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))&& isFromReplicaNode) {registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));}// 其它情况,返回200} else {response = Response.ok().build();}logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());return response;
}

validateDirtyTimestamp

private Response validateDirtyTimestamp(Long lastDirtyTimestamp,boolean isReplication) {// 从registry缓存中获取指定应用名称、服务实例id对应的服务实例InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);if (appInfo != null) {if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};// 如果客户端传递的lastDirtyTimestamp大于服务端本地的dirtyTimestamp,则返回404if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {logger.debug("Time to sync, since the last dirty timestamp differs -"+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",args);return Response.status(Status.NOT_FOUND).build();// 如果客户端传递的lastDirtyTimestamp小于服务端本地的dirtyTimestamp} else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {// 如果是peer节点的复制请求,则返回409if (isReplication) {logger.debug("Time to sync, since the last dirty timestamp differs -"+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",args);return Response.status(Status.CONFLICT).entity(appInfo).build();// 否则返回200} else {return Response.ok().build();}}}}// 其余情况返回200return Response.ok().build();
}

二、AbstractInstanceRegistry

public boolean renew(String appName, String id, boolean isReplication) {RENEW.increment(isReplication);// 从registry缓存中获取指定应用名称对应的gMapMap<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToRenew = null;// 如果registry缓存命中if (gMap != null) {// 再从缓存gMap中获取指定实例id对应的Lease<InstanceInfo>实例// id:类似于“192.168.124.3:spring-cloud-order:8081”形式leaseToRenew = gMap.get(id);}// 如果缓存未命中if (leaseToRenew == null) {RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);// 返回false,表示续约失败return false;// 如果缓存命中} else {InstanceInfo instanceInfo = leaseToRenew.getHolder();if (instanceInfo != null) {// todoInstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);// 如果服务端获取到的实例状态是“UNKNOWN”,则返回false,表示续约失败if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);return false;}// 如果客户端传递的实例状态与服务端获取到的实例状态不一致,则覆盖客户端的实例状态if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),overriddenInstanceStatus.name(),instanceInfo.getId());instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}renewsLastMin.increment();// 对缓存中的Lease<InstanceInfo>实例的lastUpdateTimestamp属性在当前时间基础上加90秒// 90秒是由客户端传递的 eureka.instance.leaseExpirationDurationInSeconds 属性的值leaseToRenew.renew();// 返回true,表示续约成功return true;}
}

自我保护机制

一、EurekaServerInitializerConfiguration

SmartLicycle 接口的实现类 - Spring容器加载完所有的bean,并且初始化完成之后执行其关于生命周期的方法,比如 start、stop 等。

@Override
public void start() {new Thread(() -> {try {// EurekaServerBootstrap#contextInitialized 核心方法eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);log.info("Started Eureka Server");// 发布EurekaRegistryAvailableEvent事件publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));EurekaServerInitializerConfiguration.this.running = true;// 发布EurekaServerStartedEvent事件publish(new EurekaServerStartedEvent(getEurekaServerConfig()));}catch (Exception ex) {log.error("Could not initialize Eureka servlet context", ex);}}).start();
}

二、EurekaServerBootstrap

public void contextInitialized(ServletContext context) {try {// 初始化Eureka的环境变量initEurekaEnvironment();// 初始化Eureka的上下文(其中包括调用PeerAwareInstanceRegistryImpl#openForTraffic)initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);}catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);}
}

三、PeerAwareInstanceRegistryImpl

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {this.expectedNumberOfClientsSendingRenews = count;// 更新服务端每分钟应该收到的服务续约数量updateRenewsPerMinThreshold();logger.info("Got {} instances from neighboring DS node", count);logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");// 设置实例状态为“UP”,并且触发StatusChangeListener监听器对StatusChangeEvent事件的处理applicationInfoManager.setInstanceStatus(InstanceStatus.UP);super.postInit();
}
protected void postInit() {renewsLastMin.start();if (evictionTaskRef.get() != null) {// 取消上一次未完成的驱逐任务的执行evictionTaskRef.get().cancel();}evictionTaskRef.set(new EvictionTask());// 默认每隔60秒执行一次驱逐任务// 可以通过 eureka.server.evictionIntervalTimerInMs 属性修改默认值evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}
protected void updateRenewsPerMinThreshold() {// 计算每分钟应该收到的续约数量的阈值this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())* serverConfig.getRenewalPercentThreshold());
}

值得注意的是,PeerAwareInstanceRegistryImpl#init 方法中也会调用该方法,默认每隔15分钟统计一次每分钟应该收到的续约数量的阈值。可以通过配置文件中 eureka.server.renewalThresholdUpdateIntervalMs 属性修改,默认值15分钟。

四、EvictionTask

@Override
public void run() {try {// 获取补偿时间long compensationTimeMs = getCompensationTimeMs();logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);evict(compensationTimeMs);} catch (Throwable e) {logger.error("Could not run the evict task", e);}
}

五、AbstractInstanceRegistry

long getCompensationTimeMs() {// 获取当前时间long currNanos = getCurrentTimeNano();// 对lastExecutionNanosRef缓存设置新值并返回旧值long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);// 如果旧值为0,则直接返回0if (lastNanos == 0l) {return 0l;}// 计算执行时间,用当前时间减去旧值long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);// 计算补偿时间,用执行时间减去驱逐任务的执行间隔(默认60秒)// 可以通过 eureka.server.evictionIntervalTimerInMs 属性修改默认值long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();// 补偿时间与0比较,取最大值return compensationTime <= 0l ? 0l : compensationTime;
}
public void evict(long additionalLeaseMs) {logger.debug("Running the evict task");// 如果服务端开启了自我保护机制(默认开启),并且触发了自我保护机制,即最近一分钟收到的续约数<=阈值,则直接返回// 可以通过 eureka.server.enableSelfPreservation 属性修改默认值if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");return;}// 执行到这儿,说明服务端没有开启自我保护机制,或者开启了但是没有触发自我保护机制List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();// 遍历registry缓存for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();if (leaseMap != null) {// 遍历leaseMapfor (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue();// 如果服务实例的租约失效(也就是指定时间内服务端没有收到该服务实例的心跳)if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {// 添加到失效列表中expiredLeases.add(lease);}}}}// 获取registry缓存中的value数量int registrySize = (int) getLocalRegistrySize();int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());int evictionLimit = registrySize - registrySizeThreshold;int toEvict = Math.min(expiredLeases.size(), evictionLimit);if (toEvict > 0) {logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);Random random = new Random(System.currentTimeMillis());for (int i = 0; i < toEvict; i++) {int next = i + random.nextInt(expiredLeases.size() - i);Collections.swap(expiredLeases, i, next);// 从失效列表中选取服务实例Lease<InstanceInfo> lease = expiredLeases.get(i);String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment();logger.warn("DS: Registry: expired lease for {}/{}", appName, id);// 剔除该失效的服务实例internalCancel(appName, id, false);}}
}
protected boolean internalCancel(String appName, String id, boolean isReplication) {read.lock();try {CANCEL.increment(isReplication);// 从registry缓存中获取指定应用名称的gMapMap<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToCancel = null;if (gMap != null) {// 从gMap中删除该实例leaseToCancel = gMap.remove(id);}recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));// 从overriddenInstanceStatusMap缓存中删除该实例InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);if (instanceStatus != null) {logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());}if (leaseToCancel == null) {CANCEL_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);return false;} else {leaseToCancel.cancel();InstanceInfo instanceInfo = leaseToCancel.getHolder();String vip = null;String svip = null;if (instanceInfo != null) {instanceInfo.setActionType(ActionType.DELETED);// 将服务实例经过封装放到recentlyChangedQueue队列中recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));instanceInfo.setLastUpdatedTimestamp();vip = instanceInfo.getVIPAddress();svip = instanceInfo.getSecureVipAddress();}// 失效读写缓存invalidateCache(appName, vip, svip);logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);}} finally {read.unlock();}synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// 对应该收到的客户端续约的数量减一this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;// 更新每分钟应该收到的续约数量的阈值updateRenewsPerMinThreshold();}}return true;
}

todo

服务续约-(服务端逻辑)

1、AbstractInstanceRegistry#getOverriddenInstanceStatus

Eureka架构篇 - 服务续约与自我保护机制相关推荐

  1. Eureka服务续约(Renew)源码分析

    主要对Eureka的Renew(服务续约),从服务提供者发起续约请求开始分析,通过阅读源码和画时序图的方式,展示Eureka服务续约的整个生命周期.服务续约主要是把服务续约的信息更新到自身的Eurek ...

  2. Eureka实现服务注册与发现,服务续约

    介绍 Eureka 目前是 2.x 版本,并且官方已经宣布不再维护更新.不过其实 Eureka 已经很稳定了,当做注册中心完全没有问题.Spring Cloud 集成了 Eureka ,并做了完善的封 ...

  3. springcloud工作笔记106---eureka实现服务监控_监控服务下线_服务注册_服务续约_判断注册中心可用_监控eurekaserver启动

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 在一些业务场景下,需要对服务的上下线进行监控,比如上下线都需要进行邮件通知,可以通过eureka提 ...

  4. eureka服务续约机制

  5. nacis服务注册原理_服务注册和发现之Eureka原理篇

    概念 在传统应用组件间调用,通过接口规范约束来实现的,从而实现不同模块间良好协作:但是被拆分成微服务后,每个微服务实例的数量和网络地址都可能动态变化,使得原来硬编码的地址极不方便,故需要一个中心化的组 ...

  6. Eureka心跳续约机制

    本文来说下Eureka心跳续约机制 文章目录 概述 Eureka-Client发送心跳 DiscoverClient HeartbeatThread lastDirtyTimestamp Eureka ...

  7. 天荒地老修仙功-第六部第二篇:Spring Cloud Eureka自我保护机制

    Eureka Server 在运行期间会去统计心跳失败比例在 15 分钟之内是否低于 85%,如果低于 85%,Eureka Server 会将这些实例保护起来,让这些实例不会过期,但是在保护期内如果 ...

  8. Eureka服务治理 - 自我保护机制

    什么是自我保护机制? 在Eureka中,有两种角色: EurekaServer(注册中心服务端) EurekaClient(注册客户端) 自我保护机制其实就是当EurekaServer与EurekaC ...

  9. eureka自我保护时间_spring cloud中微服务之间的调用以及eureka的自我保护机制详解...

    上篇讲了spring cloud注册中心及客户端的注册,所以这篇主要讲一下服务和服务之间是怎样调用的 基于上一篇的搭建我又自己搭建了一个客户端微服务: 所以现在有两个微服务,我们所实现的就是微服务1和 ...

最新文章

  1. C#利用SerialPort类对串口发送接收数据
  2. wine 安装.netframework 2.0方法
  3. 中国金融出版社出版的2016版《个人贷款》
  4. 记Outlook插件与Web页面交互的各种坑 (含c# HttpWebRequest 连接https 的完美解决方法)
  5. shell脚本只运行一个实例
  6. Netlink 内核实现分析(二):通信
  7. 文件格式转换——DMG文件格式转换成ISO文件
  8. 使用Echarts绘制力导向图
  9. 二极管和稳压管的区别
  10. 《优柔有情人》读后感6000字
  11. 主流大数据调度工具对比(DolphinScheduler特点)
  12. 花生壳 linux客户端 命令
  13. 试除法解决分解质因数
  14. IIC协议下的OLED屏幕的三种寻址方式
  15. GitHub 里的笔记
  16. 微信公众号运营商,如何选择适合自己的微信第三方平台?
  17. 新库上线 | CnOpenData·A股上市公司现场检查随机抽查数据
  18. 相声《我的大学生活》台词
  19. mmap函数详解整理
  20. 搭建教程分享|祥云代刷网自助下单系统搭建教程【过程记录】赠源码

热门文章

  1. C++ 第13课 进阶面向对象 上 ---- (狄泰软件学院)
  2. 实时多人脸检测和识别
  3. 全国所有地级市不同所有制成分工业总产值(1999-2018年)
  4. ionic定位,签到
  5. 氙灯老化测试研究|汽车氙灯测试方法:相关性研究
  6. 怎样才可以通过Java培训拿到高薪?-粤嵌教育
  7. 电力系统IEEE33节点Simulink仿真研究(Matlab实现)
  8. 缺陷检测论文回顾(二)
  9. spark sql 出现 java.lang.RuntimeException: serious problem
  10. matlab实现悬臂梁非线性动力学分析