对于非临时实例(ephemeral=false),Nacos会采用主动的健康检测,定时向实例发送请求,根据响应来判断实例健康状态。

入口在ServiceManager类中的registerInstance方法:

创建空服务时:

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {// 如果服务不存在,创建新的服务createServiceIfAbsent(namespaceId, serviceName, local, null);
}

创建服务流程:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {// 尝试获取服务Service service = getService(namespaceId, serviceName);if (service == null) {// 发现服务不存在,开始创建新服务Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();// ** 写入注册表并初始化 **putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}
}

关键在putServiceAndInit(service)方法中:

private void putServiceAndInit(Service service) throws NacosException {// 将服务写入注册表putService(service);service = getService(service.getNamespaceId(), service.getName());// 完成服务的初始化service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

进入初始化逻辑:service.init(),这个会进入Service类中:

/*** Init service.*/
public void init() {// 开启临时实例的心跳监测任务HealthCheckReactor.scheduleCheck(clientBeatCheckTask);// 遍历注册表中的集群for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);// 完成集群初识化entry.getValue().init();}
}

这里集群的初始化entry.getValue().init();会进入Cluster类型的init()方法:

/*** Init cluster.*/
public void init() {if (inited) {return;}// 创建健康检测的任务checkTask = new HealthCheckTask(this);// 这里会开启对 非临时实例的 定时健康检测HealthCheckReactor.scheduleCheck(checkTask);inited = true;
}

这里的HealthCheckReactor.scheduleCheck(checkTask);会开启定时任务,对非临时实例做健康检测。检测逻辑定义在HealthCheckTask这个类中,是一个Runnable,其中的run方法:

public void run() {try {if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {// 开始健康检测healthCheckProcessor.process(this);// 记录日志 。。。}} catch (Throwable e) {// 记录日志 。。。} finally {if (!cancelled) {// 结束后,再次进行任务调度,一定延迟后执行HealthCheckReactor.scheduleCheck(this);// 。。。}}
}

健康检测逻辑定义在healthCheckProcessor.process(this);方法中,在HealthCheckProcessor接口中,这个接口也有很多实现,默认是TcpSuperSenseProcessor

进入TcpSuperSenseProcessor的process方法:

@Override
public void process(HealthCheckTask task) {// 获取所有 非临时实例的 集合List<Instance> ips = task.getCluster().allIPs(false);if (CollectionUtils.isEmpty(ips)) {return;}for (Instance ip : ips) {// 封装健康检测信息到 BeatBeat beat = new Beat(ip, task);// 放入一个阻塞队列中taskQueue.add(beat);MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();}
}

可以看到,所有的健康检测任务都被放入一个阻塞队列,而不是立即执行了。这里又采用了异步执行的策略,可以看到Nacos中大量这样的设计。

TcpSuperSenseProcessor本身就是一个Runnable,在它的构造函数中会把自己放入线程池中去执行,其run方法如下:

public void run() {while (true) {try {// 处理任务processTask();// ...} catch (Throwable e) {SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);}}
}

通过processTask来处理健康检测的任务:

private void processTask() throws Exception {// 将任务封装为一个 TaskProcessor,并放入集合Collection<Callable<Void>> tasks = new LinkedList<>();do {Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);if (beat == null) {return;}tasks.add(new TaskProcessor(beat));} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);// 批量处理集合中的任务for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {f.get();}
}

任务被封装到了TaskProcessor中去执行了,TaskProcessor是一个Callable,其中的call方法:

@Override
public Void call() {// 获取检测任务已经等待的时长long waited = System.currentTimeMillis() - beat.getStartTime();if (waited > MAX_WAIT_TIME_MILLISECONDS) {Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");}SocketChannel channel = null;try {// 获取实例信息Instance instance = beat.getIp();// 通过NIO建立TCP连接channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);Cluster cluster = beat.getTask().getCluster();int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();channel.connect(new InetSocketAddress(instance.getIp(), port));// 注册连接、读取事件SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key));beat.setStartTime(System.currentTimeMillis());GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),"tcp:error:" + e.getMessage());if (channel != null) {try {channel.close();} catch (Exception ignore) {}}}return null;
}

Nacos的健康检测有两种模式:

  • 临时实例:

    • 采用客户端心跳检测模式,心跳周期5秒

    • 心跳间隔超过15秒则标记为不健康

    • 心跳间隔超过30秒则从服务列表删除

  • 永久实例:

    • 采用服务端主动健康检测方式

    • 周期为2000 + 5000毫秒内的随机数

    • 检测异常只会标记为不健康,不会删除

那么为什么Nacos有临时和永久两种实例呢?

以淘宝为例,双十一大促期间,流量会比平常高出很多,此时服务肯定需要增加更多实例来应对高并发,而这些实例在双十一之后就无需继续使用了,采用临时实例比较合适。而对于服务的一些常备实例,则使用永久实例更合适。

与eureka相比,Nacos与Eureka在临时实例上都是基于心跳模式实现,差别不大,主要是心跳周期不同,eureka是30秒,Nacos是5秒。

另外,Nacos支持永久实例,而Eureka不支持,Eureka只提供了心跳模式的健康监测,而没有主动检测功能。

Nacos源码主动健康检测相关推荐

  1. Nacos源码心跳异常检测

    在服务注册时,一定会创建一个Service对象,而Service中有一个init方法,会在注册时被调用: public void init() {// 开启心跳检测的任务HealthCheckReac ...

  2. Nacos源码系列—关于服务注册的那些事

    点赞再看,养成习惯,微信搜索[牧小农]关注我获取更多资讯,风里雨里,小农等你,很高兴能够成为你的朋友. 项目源码地址:公众号回复 nacos,即可免费获取源码 简介 首先我们在看Nacos源码之前,要 ...

  3. Nacos源码系列——第三章(全网最经典的Nacos集群源码主线剖析)

    上两个章节讲述了Nacos在单机模式下的服务注册,发现等源码剖析过程,实战当中 其实单机是远远不够的,那么Nacos是如何在集群模式下是如何保证节点状态同步,以及服 务变动,新增数据同步的过程的!   ...

  4. Nacos源码系列——第一章(Nacos核心源码主线剖析上)

    在讲具体的源码之前,我有几点想说明下,很多开发可能觉得源码不重要,甚至觉得互联网 的知识,目前够用就可以,也不需要多么精通.的确,在大多数的公司中,你能用你的知识 解决问题就可以,不一定非要涉及到源码 ...

  5. 如何访问集群中指定的服务器,【Nacos源码之配置管理 六】集群模式下服务器之间是如何互相感知的...

    前言 我们用Nacos当配置中心的时候,上一篇文章中 [Nacos源码之配置管理 五]为什么把配置文件Dump到磁盘中 知道了,所有的配置文件都会Dump到服务器的本地磁盘中, 那么集群模式下: 服务 ...

  6. (Nacos源码解析五)Nacos服务事件变动源码解析

    Nacos源码解析系列目录 Nacos 源码编译运行 (Nacos源码解析一)Nacos 注册实例源码解析 (Nacos源码解析二)Nacos 服务发现源码解析 (Nacos源码解析三)Nacos 心 ...

  7. Nacos源码阅读开篇之下载源码

    文章目录 Nacos源码阅读开篇 看源码的方法 nacos服务注册与发现源码剖析 nacos核心功能点 nacos服务端原理 nacos 客户端原理 下载Nacos源码 配置单机启动 Nacos源码阅 ...

  8. 全网最火的Nacos源码构建,你找不到第二个有我仔细的!!

    本章开始带大家构建Nacos源码,后面我会开始分析Nacos源码的细节,结合本人在工作之余用到的Nacos点点滴滴,如果想和我一起学Nacos,就好好看这篇吧,废话不多说,先告诉你们Nacos怎么通过 ...

  9. 下载Nacos源码并运行

    要研究Nacos源码自然不能用打包好的Nacos服务端jar包来运行,需要下载源码自己编译来运行. 下载Nacos源码 Nacos的GitHub地址:GitHub - alibaba/nacos: a ...

最新文章

  1. 数据结构c语言版第一章答案,《c语言数据结构》第一章概论自测题答案
  2. hdu 5385 The path
  3. BootStrap2学习日记8---表单
  4. 浮点型数据2字节_C语言进阶之路:数据类型 - 整型、字符型和浮点型的扩展!...
  5. cacti及其相关插件的安装
  6. cordova 更改app版本_【ios马甲包cps联运】App上架难 马甲包不知道该怎么做?
  7. Go 空结构体的 3 种使用场景
  8. (5)verilog语言编写呼吸灯
  9. 【目标检测】单阶段算法--YOLOv2详解
  10. 易语言html实现报表打印,易语言报表统计功能例程可打印
  11. 实例源码--Android捕鱼达人经典游戏
  12. 后端Web开发框架(Java)
  13. GP数列 三角形斜边 小码哥的生日 完全平方数
  14. 支付宝客户端拉起支付
  15. model.compile
  16. 2022CCPC预选赛J Roulette
  17. 变分(Calculus of variations)的概念及运算规则(二)
  18. 【实例间对比】ICE: Inter-instance Contrastive Encoding for Unsupervised Person Re-identification论文笔记
  19. 微信H5端网页授权流程(在H5中的openid获取,网页绑定微信)
  20. kafka消费者如何才能从头开始消费某个topic的全量

热门文章

  1. gethostbyname()函数说明
  2. android中使用startActivityForResult回传数据
  3. kong 网关日志格式修改
  4. MongoDB- 简单操作命令
  5. Win10 Bash/WSL调试Linux环境下的.NET Core应用程序
  6. PLSQL的UTL_FILE使用例子
  7. 《JavaScript高级程序设计》阅读笔记(五):ECMAScript中的运算符(一)
  8. MVC3 Razor学习资料汇总(ScottGu的博客截至2011-02-15动态)
  9. 如何创建并发布Google Earth KML 地标文件?
  10. 基于WebSocket协议实现Broker