文章目录

  • 1. 如何使用
  • 2. 原理详解
    • 2.1 采用延迟线程池定时执行"监听"文件是否有修改
    • 2.2 通过长轮询的方式获得修改过的文件及其内容
    • 2.3 拿到配置后通过applicationContext更新到项目内存中
  • 3. 总结

Nacos简介

基于 nacos源码版本:
nacos-client-1.2.0.jar
spring-cloud-alibaba-starters.2.2.1.RELEASE

1. 如何使用

通常获取配置文件的方式

  1. @Value

  2. @ConfigurationProperties(Prefix)

如果是在运行时要动态更新的话,

第一种方式要在bean上加@RefreshScope

第二种方式是自动支持的。

2. 原理详解

2.1 采用延迟线程池定时执行"监听"文件是否有修改


在项目的日志中,会发现一直在定时打印get changedGroupKeys[], 其实这就是在定时刷新配置
当有配置被改动时, 这个[] 就会包含数据了, 借助IDEA的全局搜索功能直接搜索这个字符串就能找到这段代码, 如下:

ClientWorker.java

class LongPollingRunnable implements Runnable {private int taskId;public LongPollingRunnable(int taskId) {this.taskId = taskId;}@Overridepublic void run() {List<CacheData> cacheDatas = new ArrayList<CacheData>();List<String> inInitializingCacheList = new ArrayList<String>();try {// check failover configfor (CacheData cacheData : cacheMap.get().values()) {if (cacheData.getTaskId() == taskId) {cacheDatas.add(cacheData);try {checkLocalConfig(cacheData);if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}} catch (Exception e) {LOGGER.error("get local config info error", e);}}}// check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
//              省略剩下代码  .....}}
}

从这里就能看出, 先是对配置做了一些检查, 然后就打印结果, 而且这个是在run方法里, 说明这里肯定是开了线程在跑的, 找到调用LongPollingRunnable这个类的地方

发现在同一个类中, 发现是在线程池的execute中执行的, 而且这里是在for循环里, 看一下任务, 就会联想到多个配置文件的情况, 是同时监听的

public void checkConfigInfo() {// 分任务int listenerSize = cacheMap.get().size();// 向上取整为批数int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}

先看一下谁调用了checkConfigInfo(), 会发现是在构造函数中执行的, 代码如下:

@SuppressWarnings("PMD.ThreadPoolCreationRule")public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {this.agent = agent;this.configFilterChainManager = configFilterChainManager;// Initialize the timeout parameterinit(properties);// 初始化定时线程池, 只有一个核心线程, executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker." + agent.getName());t.setDaemon(true);return t;}});// 初始化 用来执行LongPollingRunnable的线程池executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());t.setDaemon(true);return t;}});// 执行 延迟线程池executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {// 检查配置信息(是否更新)checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);}

所以从构造函数中得知,
用一个 只有一个线程的定时线程池周期性的执行配置判断任务, 每10ms 执行一次,
然后这个线程中, 再用一个定时线程池 执行去判断配置是否有更新(也就是LongPollingRunnablerun())

我们从get changedGroupKeys[] 作为切入口, 知道了它是怎么出来的, 它的上游是怎么处理的, 接下来, 具体看一下, 如何判断配置是否有更新的

2.2 通过长轮询的方式获得修改过的文件及其内容

run()方法继续看, 跟进com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateDataIds

/*** 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。*/List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {// 构造参数- 通过配置dataId/group/tenant等数据来指定文件StringBuilder sb = new StringBuilder();for (CacheData cacheData : cacheDatas) {if (!cacheData.isUseLocalConfigInfo()) {sb.append(cacheData.dataId).append(WORD_SEPARATOR);sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);} else {sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);}if (cacheData.isInitializing()) {// cacheData 首次出现在cacheMap中&首次check更新inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));}}}boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();// 核心方法- 检查更新文件return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}

这里做了一些参数构造(用来发请求的)
继续进入 com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateConfigStr

/*** 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。*/List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {List<String> params = new ArrayList<String>(2);params.add(Constants.PROBE_MODIFY_REQUEST);params.add(probeUpdateString);List<String> headers = new ArrayList<String>(2);headers.add("Long-Pulling-Timeout");// 设置长轮询的过期时间, 默认30秒headers.add("" + timeout);// told server do not hang me up if new initializing cacheData added inif (isInitializingCacheList) {headers.add("Long-Pulling-Timeout-No-Hangup");headers.add("true");}if (StringUtils.isBlank(probeUpdateString)) {return Collections.emptyList();}try {// In order to prevent the server from handling the delay of the client's long task,// increase the client's read timeout to avoid this problem.long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);// 长轮询请求HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,agent.getEncode(), readTimeoutMs);if (HttpURLConnection.HTTP_OK == result.code) {setHealthServer(true);// 解析返参return parseUpdateDataIdResponse(result.content);} else {setHealthServer(false);LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);}} catch (IOException e) {setHealthServer(false);LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);throw e;}return Collections.emptyList();}

就会发现它是发了一个请求过去, 然后通过parseUpdateDataIdResponse(result.content) 方法解析出返参里面的 dataId/group/tenant等数据

这个请求中设置了一些长轮询的参数,表示这是一个长轮询的请求

长轮询: 客户端发起Long Polling,此时如果服务端没有相关数据,会hold住请求,直到服务端有相关数据,或者等待一定时间超时才会返回。返回后,客户端又会立即再次发起下一次Long Polling。

继续看成功后做了什么解析, com.alibaba.nacos.client.config.impl.ClientWorker#parseUpdateDataIdResponse

/*** 从HTTP响应拿到变化的groupKey。保证不返回NULL。*/private List<String> parseUpdateDataIdResponse(String response) {if (StringUtils.isBlank(response)) {return Collections.emptyList();}try {response = URLDecoder.decode(response, "UTF-8");} catch (Exception e) {LOGGER.error("[" + agent.getName() + "] [polling-resp] decode modifiedDataIdsString error", e);}List<String> updateList = new LinkedList<String>();for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {if (!StringUtils.isBlank(dataIdAndGroup)) {String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);String dataId = keyArr[0];String group = keyArr[1];if (keyArr.length == 2) {updateList.add(GroupKey.getKey(dataId, group));LOGGER.info("[{}] [polling-resp] config changed. dataId={}, group={}", agent.getName(), dataId, group);} else if (keyArr.length == 3) {String tenant = keyArr[2];updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));LOGGER.info("[{}] [polling-resp] config changed. dataId={}, group={}, tenant={}", agent.getName(),dataId, group, tenant);} else {LOGGER.error("[{}] [polling-resp] invalid dataIdAndGroup error {}", agent.getName(), dataIdAndGroup);}}}return updateList;}

从这里看出, 它只解析了 dataId / group / tenant 三个值, 没有我们的具体配置信息, 那我们往回找, 看到底在哪处理的, 如此, 又回到run()方法,我们接着看

 @Overridepublic void run() {// ......省略代码// check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);LOGGER.info("get changedGroupKeys:" + changedGroupKeys);// 开始处理发送改变的配置文件for (String groupKey : changedGroupKeys) {String[] key = GroupKey.parseKey(groupKey);String dataId = key[0];String group = key[1];String tenant = null;if (key.length == 3) {tenant = key[2];}try {// 获得具体配置String[] ct = getServerConfig(dataId, group, tenant, 3000L);CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));// 把内容直接写到cacheMap中cache.setContent(ct[0]);if (null != ct[1]) {cache.setType(ct[1]);}LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",agent.getName(), dataId, group, tenant, cache.getMd5(),ContentUtils.truncateContent(ct[0]), ct[1]);} catch (NacosException ioe) {String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",agent.getName(), dataId, group, tenant);LOGGER.error(message, ioe);}}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// 检查md5cacheData.checkListenerMd5();cacheData.setInitializing(false);}}
// ....省略代码

dataId / group / tenant 三个取出来, 循环去获取具体配置com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig

 public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)  throws NacosException {String[] ct = new String[2];if (StringUtils.isBlank(group)) {group = Constants.DEFAULT_GROUP;}HttpResult result = null;try {List<String> params = null;if (StringUtils.isBlank(tenant)) {params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group));} else {params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));}// 通过get请求,获得具体配置result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);} catch (IOException e) {String message = String.format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),dataId, group, tenant);LOGGER.error(message, e);throw new NacosException(NacosException.SERVER_ERROR, e);}switch (result.code) {case HttpURLConnection.HTTP_OK:// 先放到本地文件中LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);// 将请求返参放入ct数组中ct[0] = result.content;if (result.headers.containsKey(CONFIG_TYPE)) {ct[1] = result.headers.get(CONFIG_TYPE).get(0);} else {ct[1] = ConfigType.TEXT.getType();}return ct;case HttpURLConnection.HTTP_NOT_FOUND://  省略剩下代码......}

至此, 清楚了它是如何拿到具体配置的了, 它通过(一次post请求)长轮询的方式和服务端建立连接, 获得dataId/group等数据, 再通过这些参数发起get请求获得具体的配置文件内容,并写到本地缓存中使用

因此: Nacos 客户端会循环请求服务端变更的数据,并且超时时间设置为30s,当配置发生变化时,请求的响应会立即返回,否则会一直等到 29.5s+ 之后再返回响应

这里只是写到内存, 我们的配置更新后, 是能在spring中拿到的, 那是怎么写到spring中的呢

2.3 拿到配置后通过applicationContext更新到项目内存中

它取到这些配置后,是如何写到项目的内存中并使其生效的呢?

 try {String[] ct = getServerConfig(dataId, group, tenant, 3000L);CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));cache.setContent(ct[0]);if (null != ct[1]) {cache.setType(ct[1]);}LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",agent.getName(), dataId, group, tenant, cache.getMd5(),ContentUtils.truncateContent(ct[0]), ct[1]);} catch (NacosException ioe) {String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",agent.getName(), dataId, group, tenant);LOGGER.error(message, ioe);}}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// 检查md5cacheData.checkListenerMd5();cacheData.setInitializing(false);}}
// 省略剩下代码.....
}

在取到具体配置后,遍历cacheDatas数据,并检查md5, 跟进去看一下, 它开始出现监听器了

void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {safeNotifyListener(dataId, group, content, type, md5, wrap);}}}private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;Runnable job = new Runnable() {@Overridepublic void run() {ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);}// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();// 处理配置信息listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener)listener).receiveConfigChange(event);listenerWrap.lastContent = content;}listenerWrap.lastCallMd5 = md5;LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,listener);} catch (NacosException de) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,md5, listener, t.getCause());} finally {Thread.currentThread().setContextClassLoader(myClassLoader);}}};final long startNotify = System.currentTimeMillis();try {if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {job.run();}} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,md5, listener, t.getCause());}final long finishNotify = System.currentTimeMillis();LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",name, (finishNotify - startNotify), dataId, group, md5, listener);}

这么长的代码,核心就是处理了那个runable, 其中调用了listener.receiveConfigInfo(contentTmp) 方法处理的监听器,它是一个抽象类, 找到它的实现类

com.alibaba.cloud.nacos.refresh.NacosContextRefresher

private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key,lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group,String configInfo) {refreshCountIncrement();nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);// todo feature: support single refresh for listening// 通过applicationContext的事件去更新配置applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s",group, dataId, configInfo));}}});try {configService.addListener(dataKey, groupKey, listener);}catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,groupKey), e);}}

至此, 清楚了获得的配置是如何生效的, 它将获得发生修改过的文件, 如果md5不一样了, 则执行监听器,通过applicationContext 更新配置到项目内存中

明明已经知道了哪些文件被修改了,为啥还有比对md5, 因为可能是没有修改具体内容,只是点了编辑并保存

md5用的是java的digest和位移,md5可能存在冲突, 那怎么解决冲突问题的? (虚心求教,请知晓的大佬点拨一二)

3. 总结

  1. 启动一个10ms执行一次单线程的定时线程池A, 来进行检查配置是否有更新
  2. 并再启用一个定时线程池B来并发执行多个文件修改的场景
  3. 在B线程池中,使用30s的长轮询机制主动向服务端(Nacos)查询哪些文件发生了变化
  4. 然后拿到这些变化的文件id等信息, 再次请求服务端(Nacos)拿到具体的配置内容,并写到内存中
  5. 经过检查md5后, 将这些配置内容通过spring的监听机制写到spring中

参考文章:

Long Polling长轮询详解 - 简书 (jianshu.com)

NACOS动态配置 - barryzhou - 博客园 (cnblogs.com)

spring boot 配置文件动态更新原理 以Nacos为例 - 二奎 - 博客园 (cnblogs.com)

Nacos的动态配置源码解析相关推荐

  1. 【kafka】Kafka中的动态配置源码分析

    1.概述 2.源码分析 Broker启动加载动态配置 KafkaServer.startup 启动加载动态配置总流程 2.1 动态配置初始化 config.dynamicConfig.initiali ...

  2. (Nacos源码解析五)Nacos服务事件变动源码解析

    Nacos源码解析系列目录 Nacos 源码编译运行 (Nacos源码解析一)Nacos 注册实例源码解析 (Nacos源码解析二)Nacos 服务发现源码解析 (Nacos源码解析三)Nacos 心 ...

  3. 8145v5 参数_SpringBoot外化配置源码解析:外化配置简介、参数处理|value|spring|调用|参数值

    SpringBoot外化配置源码解析 在前面章节我们讲解了 Spring Boot 的运作核心原理及启动过程中进行的一系列核心操作. 从本章开始,我们将针对在实践过程中应用的不同知识点的源代码进行解读 ...

  4. Spring Boot Profile使用详解及配置源码解析

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 作者 | 二师兄 来源 | 程序新视界 在实践的过程中我 ...

  5. springboot(二)自动化配置源码解析

    @EnableAutoConfiguration 是开启自动配置的注解,在创建的 SpringBoot 项目中并不能直接看到此注解,它是由组合注解@SpringBootApplication 引入的. ...

  6. SpringBoot-自动配置-源码解析,Java高级程序员面试笔记宝典

    我们挨个分析. []( )@SpringBootConfiguration 点进去我们发现,它就是一个Configuration @Configuration@Indexedpublic @inter ...

  7. [源码解析] PyTorch 分布式(2) ----- DataParallel(上)

    [源码解析] PyTorch 分布式(2) ----- DataParallel(上) 文章目录 [源码解析] PyTorch 分布式(2) ----- DataParallel(上) 0x00 摘要 ...

  8. SpringBoot入门-源码解析(雷神)

    一.Spring Boot入门 视频学习资料(雷神): https://www.bilibili.com/video/BV19K4y1L7MT?p=1 github: https://github.c ...

  9. [源码解析] TensorFlow 分布式之 ClusterCoordinator

    [源码解析] TensorFlow 分布式之 ClusterCoordinator 文章目录 [源码解析] TensorFlow 分布式之 ClusterCoordinator 1. 思路 1.1 使 ...

最新文章

  1. springboot 多线程_SpringBoot异步调用@Async
  2. Spring.net使用说明
  3. linux opencv
  4. %@page contentType=text/html;charset=gbk%与meta http-equiv=Content-Type content=text/html; ch...
  5. HD 2602 Bone Collector (0-1背包)
  6. 在Windows 2000下优化Oracle9i性能[转]
  7. Binary String Reconstruction CodeForces - 1352F(思维+构造)
  8. [置顶] Android开发者官方网站文档 - 国内踏得网镜像
  9. 无法生成部件汇总表_RFID在汽车零部件企业仓储物流中的应用
  10. 不与最大数相同的数字之和(信息学奥赛一本通-T1113)
  11. fatal error: dynlink_nvcuvid.h: No such file or directory
  12. 【路径规划】基于matlab GUI蚁群算法求解机器人栅格地图最短路径规划问题【含Matlab源码 927期】
  13. 【读书笔记《Android游戏编程之从零开始》】17.游戏开发基础(游戏适屏的简述和作用、让游戏主角动起来)
  14. 通达oa系统怎么转移到服务器,通达OA升级心通达OA操作步骤规范
  15. 木子-前端-方法标签属性小记(普通jsp/html篇)2020-11-24
  16. 精致生活品味相伴,Barsetto百胜图BAC025B胶囊咖啡机测评
  17. iphone主屏幕动态壁纸_iPhoneXLivePhoto动态壁纸
  18. 停止抱怨英语_停止抱怨
  19. 数字化转型导师坚鹏:数字化时代企业管理变革与创新营销
  20. 【工业大数据】张洁教授现场剖析制造业大数据制造的思考与实践

热门文章

  1. ext-gwt分页实现送给正在学习gxt的朋友们
  2. 蓝桥杯第3届省赛(单片机)_自动售水机
  3. Google Adsense收入计算以及提高的5种方式
  4. 19_ue4蓝图通讯与自定义事件触发加速
  5. Css选择器+字体排版
  6. Java使用swing实现调色板
  7. 十分钟了解“微服务”
  8. python 排序算法:插入排序
  9. Invoking the Magic
  10. 如何调试windows图片查看器