1.服务实例心跳续约

com.alibaba.nacos.naming.controllers.InstanceController#beat

/*** 给某个实例发送心跳** @param request http request* @return 实例详细信息* @throws Exception any error during handle*/
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {ObjectNode result = JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());// 获取实例心跳内容String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}// 获取集群名称String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);// 获取实例ipString ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);// 获取实例端口int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {// 如果心跳内容中指定了集群名称,那么就以心跳内容中指定的为准if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}// 如果心跳内容中指定了实例的ip和端口,那么就以该指定的为准ip = clientBeat.getIp();port = clientBeat.getPort();}// 获取命名空间idString namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);// 获取服务名称String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);// 校验服务名称是否合法NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);// 条件成立:说明对应的实例还没有注册if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());serviceManager.registerInstance(namespaceId, serviceName, instance);}// 获取到心跳实例对应的服务对象Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}// 处理客户端发送过来的实例心跳service.processClientBeat(clientBeat);result.put(CommonParams.CODE, NamingResponseCode.OK);if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;
}

我们可以直接找到nacos服务端进行心跳续约的接口,可以看到前面都是对请求参数进行的一些解析操作,先是根据请求参数中的命名空间id和服务名称去双层map中找到对应的服务对象,然后通过请求参数中的ip,port,clusterName构造出一个心跳包对象,然后把这个心跳包对象作为参数去调用服务对象的processClientBeat方法,这个方法中就是进行实例心跳续约的关键方法了

com.alibaba.nacos.naming.core.Service#processClientBeat

/*** 处理客户端发送过来的实例心跳** @param rsInfo metrics info of server*/
public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

可以看到该方法创建了一个ClientBeatProcessor对象,该对象实现了Runnable接口,所以它是一个可执行的线程任务,在创建该对象之后又把当前服务对应和传进来的心跳包传给这个ClientBeatProcessor对象,然后把这个任务对象交给线程池去执行,所以我们下面就来看下这个任务的具体执行逻辑

com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor

/*** 客户端心跳检测器** @author nkorange*/
public class ClientBeatProcessor implements Runnable {public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);/*** 客户端发送过来的心跳包*/private RsInfo rsInfo;/*** 当前心跳检测器所检测的服务*/private Service service;@JsonIgnorepublic PushService getPushService() {return ApplicationUtils.getBean(PushService.class);}public RsInfo getRsInfo() {return rsInfo;}public void setRsInfo(RsInfo rsInfo) {this.rsInfo = rsInfo;}public Service getService() {return service;}public void setService(Service service) {this.service = service;}@Overridepublic void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}// 获取心跳包中的ipString ip = rsInfo.getIp();// 获取心跳包中的集群名称String clusterName = rsInfo.getCluster();// 获取心跳包中的端口int port = rsInfo.getPort();// 根据集群名称获取到对应的集群对象Cluster cluster = service.getClusterMap().get(clusterName);// 获取该集群下面所有的临时实例List<Instance> instances = cluster.allIPs(true);// 遍历这些临时实例for (Instance instance : instances) {// 条件成立:说明这个实例就是要心跳续约的实例if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}// 更新心跳时间instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {if (!instance.isHealthy()) {// 如果实例是非健康状态就设置为健康状态instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);// 推送最新的服务实例信息给客户端getPushService().serviceChanged(service);}}}}}
}

首先会去从心跳包对象中去获取到ip,port,clusterName,根据clusterName去从传进来的服务对象中获取到具体的集群对象,再从集群对象中获取到它下面的所有实例对象,然后遍历这些实例对象,对每一个实例的ip和port都去与心跳包中的ip和port进行比较,筛选出具体要进行心跳续约的实例对象,筛选出来之后就更新它最近一次收到心跳包的时间戳,并且如果检查到此时这个实例是非健康状态的,那么就会把它更新为健康状态,最后通知给所有可推送的客户端,这个通知客户端的详细逻辑之前在订阅拉取的时候就讲过了,也就是说服务下的实例健康状态的改变都需要通知给客户端

2.服务端对实例进行过期下线检查

com.alibaba.nacos.naming.core.Service#init

在服务对象被创建加入双层map中的同时,会调用服务对象的init方法进行初始化的工作,其中就包括了启动线程对实例进行过期下线的检查

/*** 对该服务进行初始化的工作*/
public void init() {// 往实例健康检查组件中添加了健康检查任务,延迟5s开始,每5s执行一次HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}

把ClientBeatCheckTask这个任务放到线程池中去执行,看下ClientBeatCheckTask这个任务具体的执行逻辑

com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run

public void run() {try {// 条件成立:说明当前节点不需要进行服务实例的心跳检查(由其他节点执行)if (!getDistroMapper().responsible(service.getName())) {// 直接返回return;}// 条件成立:没有开启服务实例的健康检查if (!getSwitchDomain().isHealthCheckEnabled()) {// 直接返回return;}// 获取到要检查的服务下面的所有临时实例List<Instance> instances = service.allIPs(true);for (Instance instance : instances) {// 条件成立:当前时间与这个实例上一次心跳续约的时间超过15s,说明该实例已经是非健康状态的了if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {// 把这个实例的健康状态设置为falseinstance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());// 推送该服务下最新的实例信息给客户端getPushService().serviceChanged(service);// 发送一个实例心跳非健康的事件ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}// 是否需要判断实例的过期状态,默认需要if (!getGlobalConfig().isExpireInstance()) {// 如果不需要就不走下面检查实例过期状态的逻辑了return;}for (Instance instance : instances) {if (instance.isMarked()) {continue;}// 条件成立:当前时间与这个实例上一次心跳续约的时间超过30s,说明该实例已经是过期的了if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));// 删除实例deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}

一开始会有一个if逻辑的判断,这个判断是与nacos集群有关的,这是因为在nacos的AP架构中一个nacos集群中的每一个nacos节点都会具体负责分配给自己的服务,比如说服务A是分配给节点A的,那么节点A在这里需要对这个服务A进行实例是否过期下线的判断,而其他的节点则不需要,最终由节点A同步这个服务A的最新信息给到其他节点即可,关于nacos集群后面的文章会详细说,这里我们先直接忽略。然后再判断nacos服务是否开启了健康检查,默认都是开启的,如果没有开启则直接返回,接着就获取这个服务中所有的实例对象,并且遍历这些实例对象,对每一个实例对象中的最近一次心跳续约时间与当前时间进行比较,默认如果当前时间与该实例最近心跳续约的时间差大于15s,就把这个实例的健康状态更新为非健康状态,注意这里仅仅是该实例的健康状态改成了非健康状态,并没有把实例进行下线,然后接着会再一次遍历服务的所有实例,在这一次遍历中,如果当前时间与该实例最近心跳续约的时间差大于30s的,就对该实例进行真正的下线了,也就是把这个实例从双层map中移除掉

Nacos注册中心之服务实例心跳续约与实例过期下线源码解析相关推荐

  1. Nacos注册中心和服务消费方式

    哈喽朋友们本次小無分享Nacos注册中心和服务消费方式 前言:本期文章操作性不多,多在于详细的理论说明 还各位看官耐心看完 一,服务治理介绍 目录 一,服务治理介绍 二,nacos简介 nacos实战 ...

  2. Nacos注册中心和服务消费方式(服务治理)

    目录 一.服务治理介绍 什么是服务治理? 二.nacos简介 三.nacos实战入门 1.搭建nacos环境 2.将商品.订单.微服务注册到nacos 四.实现服务调用的负载均衡 1.什么是负载均衡 ...

  3. 阿里巴巴Nacos注册中心(服务注册,服务调用)

    一.Nacos 1.基本概念 (1)Nacos 是阿里巴巴推出来的一个新开源项目,是一个更易于构建云原生应用的动态服务发现.配置管理和服务管理平台.Nacos 致力于帮助您发现.配置和管理微服务.Na ...

  4. Nacos服务注册中心(微服务)

    为什么要用服务注册中心? 在微服务中,首先需要面对的问题就是如何查找服务(软件即服务), 其次就是如何在不同的服务之间进行通信? 如何更好更方便的管理应用中的每一个服务,如何建立各个服务之间联系的纽带 ...

  5. 2:Alibaba微服务组件Nacos注册中心

    Spring Cloud Alibaba系列目录 提示:这里是第二章:Alibaba微服务组件Nacos注册中心 微服务和Spring Cloud Alibaba介绍 Alibaba微服务组件Naco ...

  6. 微服务笔记:第一章_微服务简介|Eureka注册中心|Nacos注册中心|Nacos配置管理|Feign|Gateway服务网关

    微服务笔记:第一章_微服务简介|Eureka注册中心|Nacos注册中心|Nacos配置管理|Feign|Gateway服务网关 1. 微服务简介 1.1 服务架构演变 1.2 SpringCloud ...

  7. 【Spring Cloud Alibaba】(一)微服务介绍 及 Nacos注册中心实战

    文章目录 前言 I.微服务与Spring Cloud II.Nacos 注册中心 III.Spring Cloud Alibaba Nacos 实战 1.新建父工程 2.新建demo-a 服务 3.新 ...

  8. springboot+Nacos注册中心+sentinel高可用流量框架

    1.下载阿里巴巴Nacos注册中心,下载地址https://github.com/alibaba/nacos/releases,我这里下载的是windows版本 2.下载sentinel高可用流量框架 ...

  9. Spring boot整合nacos注册中心/配置中心报错:java.lang.IllegalArgumentException: no server available

    1.问题描述 我是近期在使用Springboot整合nacos,由于springboot和springcloud都是用最新版本,啪的一下,很快啊,就出现问题了,于是自己把版本降下来了,年轻人不讲武德降 ...

  10. 【微服务】—— Nacos注册中心

    文章目录 一.Nacos 注册中心的设计原理 1.数据模型 2.数据⼀致性 3.负载均衡 4.健康检查 二.Nacos 注册中心服务数据模型 1.服务(Service)和服务实例(Instance) ...

最新文章

  1. 高并发场景下数据库的常见问题及解决方案
  2. [C++] C++风格的强制类型转换探讨
  3. 对于WIFI版ipad(无GPS芯片)定位功能的释疑
  4. plsql连不上oracle6,是否遇到PLSQL Developer连不到oracle数据库呢
  5. html子布局不超出父布局,详解flex布局中保持内容不超出容器的解决办法
  6. 创业初期,处理好事情的优先级
  7. redis-数据类型-有序集合
  8. Redis系统管理相关指令简介
  9. DNS练习之反向解析
  10. 苹果ttc转ttf_iOS使用自定义字体的方法(内置和任意下载ttf\otf\ttc字体文件)
  11. Ubuntu20系统添加右键菜单:新建文档
  12. afx是什么意思呀,什么时候要include呢,这个头文件的作用是??
  13. android上拉菜单和下拉菜单的实现
  14. 99%的创业公司都不值得加入
  15. 无法连接到mysql数据库_无法连接到数据库服务器(mysql workbench)
  16. OOAD教学管理系统 设计类图
  17. js不改变原数组的情况,添加或删除指定的元素
  18. 竞价排名中的道德争议
  19. Matlab在线使用
  20. 最后三天,平头哥携手Unitimes带来AIoT行业大咖盛宴

热门文章

  1. an argument for principle #1:thoreau's new economics 36-38
  2. oracle 九阴真精,《九阴真经》真的很强吗?其实它杂而不精,顶级高手都不愿意用...
  3. python快速编程入门课本中的名片管理器_Python-名片管理器
  4. 2019年 AI 顶会速递
  5. 【数字图像处理系列五】图像滤波之空间滤波:图像平滑降噪和图像锐化
  6. Spring Cloud随记----远程配置文件资源库的建立-涉及一些简单的git操作
  7. 读论文 + 总结 + 笔记
  8. MATLAB 生成随机数
  9. python画图案 使用循环完成_利用python在终端模拟下雪的效果
  10. Scanner类(next,nextLine,nextInt)的用法与常见问题