Nacos的动态配置源码解析
文章目录
- 1. 如何使用
- 2. 原理详解
- 2.1 采用延迟线程池定时执行"监听"文件是否有修改
- 2.2 通过长轮询的方式获得修改过的文件及其内容
- 2.3 拿到配置后通过applicationContext更新到项目内存中
- 3. 总结
Nacos简介
基于 nacos源码版本:
nacos-client-1.2.0.jar
spring-cloud-alibaba-starters.2.2.1.RELEASE
1. 如何使用
通常获取配置文件的方式
@Value
@ConfigurationProperties(Prefix)
如果是在运行时要动态更新的话,
第一种方式要在bean上加@RefreshScope
第二种方式是自动支持的。
2. 原理详解
2.1 采用延迟线程池定时执行"监听"文件是否有修改
在项目的日志中,会发现一直在定时打印get changedGroupKeys[]
, 其实这就是在定时刷新配置
当有配置被改动时, 这个[]
就会包含数据了, 借助IDEA的全局搜索功能直接搜索这个字符串就能找到这段代码, 如下:
ClientWorker.java
class LongPollingRunnable implements Runnable {private int taskId;public LongPollingRunnable(int taskId) {this.taskId = taskId;}@Overridepublic void run() {List<CacheData> cacheDatas = new ArrayList<CacheData>();List<String> inInitializingCacheList = new ArrayList<String>();try {// check failover configfor (CacheData cacheData : cacheMap.get().values()) {if (cacheData.getTaskId() == taskId) {cacheDatas.add(cacheData);try {checkLocalConfig(cacheData);if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}} catch (Exception e) {LOGGER.error("get local config info error", e);}}}// check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
// 省略剩下代码 .....}}
}
从这里就能看出, 先是对配置做了一些检查, 然后就打印结果, 而且这个是在run方法里, 说明这里肯定是开了线程在跑的, 找到调用LongPollingRunnable
这个类的地方
发现在同一个类中, 发现是在线程池的execute中执行的, 而且这里是在for循环里, 看一下任务, 就会联想到多个配置文件的情况, 是同时监听的
public void checkConfigInfo() {// 分任务int listenerSize = cacheMap.get().size();// 向上取整为批数int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}
先看一下谁调用了checkConfigInfo()
, 会发现是在构造函数中执行的, 代码如下:
@SuppressWarnings("PMD.ThreadPoolCreationRule")public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {this.agent = agent;this.configFilterChainManager = configFilterChainManager;// Initialize the timeout parameterinit(properties);// 初始化定时线程池, 只有一个核心线程, executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker." + agent.getName());t.setDaemon(true);return t;}});// 初始化 用来执行LongPollingRunnable的线程池executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());t.setDaemon(true);return t;}});// 执行 延迟线程池executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {// 检查配置信息(是否更新)checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);}
所以从构造函数中得知,
用一个 只有一个线程的定时线程池周期性的执行配置判断任务, 每10ms 执行一次,
然后这个线程中, 再用一个定时线程池 执行去判断配置是否有更新(也就是LongPollingRunnable
的run()
)
我们从get changedGroupKeys[]
作为切入口, 知道了它是怎么出来的, 它的上游是怎么处理的, 接下来, 具体看一下, 如何判断配置是否有更新的
2.2 通过长轮询的方式获得修改过的文件及其内容
从run()
方法继续看, 跟进com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateDataIds
/*** 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。*/List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {// 构造参数- 通过配置dataId/group/tenant等数据来指定文件StringBuilder sb = new StringBuilder();for (CacheData cacheData : cacheDatas) {if (!cacheData.isUseLocalConfigInfo()) {sb.append(cacheData.dataId).append(WORD_SEPARATOR);sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);} else {sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);}if (cacheData.isInitializing()) {// cacheData 首次出现在cacheMap中&首次check更新inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));}}}boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();// 核心方法- 检查更新文件return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}
这里做了一些参数构造(用来发请求的)
继续进入 com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateConfigStr
/*** 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。*/List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {List<String> params = new ArrayList<String>(2);params.add(Constants.PROBE_MODIFY_REQUEST);params.add(probeUpdateString);List<String> headers = new ArrayList<String>(2);headers.add("Long-Pulling-Timeout");// 设置长轮询的过期时间, 默认30秒headers.add("" + timeout);// told server do not hang me up if new initializing cacheData added inif (isInitializingCacheList) {headers.add("Long-Pulling-Timeout-No-Hangup");headers.add("true");}if (StringUtils.isBlank(probeUpdateString)) {return Collections.emptyList();}try {// In order to prevent the server from handling the delay of the client's long task,// increase the client's read timeout to avoid this problem.long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);// 长轮询请求HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,agent.getEncode(), readTimeoutMs);if (HttpURLConnection.HTTP_OK == result.code) {setHealthServer(true);// 解析返参return parseUpdateDataIdResponse(result.content);} else {setHealthServer(false);LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);}} catch (IOException e) {setHealthServer(false);LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);throw e;}return Collections.emptyList();}
就会发现它是发了一个请求过去, 然后通过parseUpdateDataIdResponse(result.content)
方法解析出返参里面的 dataId/group/tenant等数据
这个请求中设置了一些长轮询的参数,表示这是一个长轮询的请求
长轮询: 客户端发起Long Polling,此时如果服务端没有相关数据,会hold住请求,直到服务端有相关数据,或者等待一定时间超时才会返回。返回后,客户端又会立即再次发起下一次Long Polling。
继续看成功后做了什么解析, com.alibaba.nacos.client.config.impl.ClientWorker#parseUpdateDataIdResponse
/*** 从HTTP响应拿到变化的groupKey。保证不返回NULL。*/private List<String> parseUpdateDataIdResponse(String response) {if (StringUtils.isBlank(response)) {return Collections.emptyList();}try {response = URLDecoder.decode(response, "UTF-8");} catch (Exception e) {LOGGER.error("[" + agent.getName() + "] [polling-resp] decode modifiedDataIdsString error", e);}List<String> updateList = new LinkedList<String>();for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {if (!StringUtils.isBlank(dataIdAndGroup)) {String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);String dataId = keyArr[0];String group = keyArr[1];if (keyArr.length == 2) {updateList.add(GroupKey.getKey(dataId, group));LOGGER.info("[{}] [polling-resp] config changed. dataId={}, group={}", agent.getName(), dataId, group);} else if (keyArr.length == 3) {String tenant = keyArr[2];updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));LOGGER.info("[{}] [polling-resp] config changed. dataId={}, group={}, tenant={}", agent.getName(),dataId, group, tenant);} else {LOGGER.error("[{}] [polling-resp] invalid dataIdAndGroup error {}", agent.getName(), dataIdAndGroup);}}}return updateList;}
从这里看出, 它只解析了 dataId / group / tenant
三个值, 没有我们的具体配置信息, 那我们往回找, 看到底在哪处理的, 如此, 又回到run()
方法,我们接着看
@Overridepublic void run() {// ......省略代码// check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);LOGGER.info("get changedGroupKeys:" + changedGroupKeys);// 开始处理发送改变的配置文件for (String groupKey : changedGroupKeys) {String[] key = GroupKey.parseKey(groupKey);String dataId = key[0];String group = key[1];String tenant = null;if (key.length == 3) {tenant = key[2];}try {// 获得具体配置String[] ct = getServerConfig(dataId, group, tenant, 3000L);CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));// 把内容直接写到cacheMap中cache.setContent(ct[0]);if (null != ct[1]) {cache.setType(ct[1]);}LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",agent.getName(), dataId, group, tenant, cache.getMd5(),ContentUtils.truncateContent(ct[0]), ct[1]);} catch (NacosException ioe) {String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",agent.getName(), dataId, group, tenant);LOGGER.error(message, ioe);}}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// 检查md5cacheData.checkListenerMd5();cacheData.setInitializing(false);}}
// ....省略代码
把 dataId / group / tenant
三个取出来, 循环去获取具体配置com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig
public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {String[] ct = new String[2];if (StringUtils.isBlank(group)) {group = Constants.DEFAULT_GROUP;}HttpResult result = null;try {List<String> params = null;if (StringUtils.isBlank(tenant)) {params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group));} else {params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));}// 通过get请求,获得具体配置result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);} catch (IOException e) {String message = String.format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),dataId, group, tenant);LOGGER.error(message, e);throw new NacosException(NacosException.SERVER_ERROR, e);}switch (result.code) {case HttpURLConnection.HTTP_OK:// 先放到本地文件中LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);// 将请求返参放入ct数组中ct[0] = result.content;if (result.headers.containsKey(CONFIG_TYPE)) {ct[1] = result.headers.get(CONFIG_TYPE).get(0);} else {ct[1] = ConfigType.TEXT.getType();}return ct;case HttpURLConnection.HTTP_NOT_FOUND:// 省略剩下代码......}
至此, 清楚了它是如何拿到具体配置的了, 它通过(一次post请求)长轮询的方式和服务端建立连接, 获得dataId/group等数据, 再通过这些参数发起get请求获得具体的配置文件内容,并写到本地缓存中使用
因此: Nacos 客户端会循环请求服务端变更的数据,并且超时时间设置为30s,当配置发生变化时,请求的响应会立即返回,否则会一直等到 29.5s+ 之后再返回响应
这里只是写到内存, 我们的配置更新后, 是能在spring中拿到的, 那是怎么写到spring中的呢
2.3 拿到配置后通过applicationContext更新到项目内存中
它取到这些配置后,是如何写到项目的内存中并使其生效的呢?
try {String[] ct = getServerConfig(dataId, group, tenant, 3000L);CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));cache.setContent(ct[0]);if (null != ct[1]) {cache.setType(ct[1]);}LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",agent.getName(), dataId, group, tenant, cache.getMd5(),ContentUtils.truncateContent(ct[0]), ct[1]);} catch (NacosException ioe) {String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",agent.getName(), dataId, group, tenant);LOGGER.error(message, ioe);}}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// 检查md5cacheData.checkListenerMd5();cacheData.setInitializing(false);}}
// 省略剩下代码.....
}
在取到具体配置后,遍历cacheDatas数据,并检查md5, 跟进去看一下, 它开始出现监听器了
void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {safeNotifyListener(dataId, group, content, type, md5, wrap);}}}private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;Runnable job = new Runnable() {@Overridepublic void run() {ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);}// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();// 处理配置信息listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener)listener).receiveConfigChange(event);listenerWrap.lastContent = content;}listenerWrap.lastCallMd5 = md5;LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,listener);} catch (NacosException de) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,md5, listener, t.getCause());} finally {Thread.currentThread().setContextClassLoader(myClassLoader);}}};final long startNotify = System.currentTimeMillis();try {if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {job.run();}} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,md5, listener, t.getCause());}final long finishNotify = System.currentTimeMillis();LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",name, (finishNotify - startNotify), dataId, group, md5, listener);}
这么长的代码,核心就是处理了那个runable, 其中调用了listener.receiveConfigInfo(contentTmp)
方法处理的监听器,它是一个抽象类, 找到它的实现类
com.alibaba.cloud.nacos.refresh.NacosContextRefresher
private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key,lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group,String configInfo) {refreshCountIncrement();nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);// todo feature: support single refresh for listening// 通过applicationContext的事件去更新配置applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s",group, dataId, configInfo));}}});try {configService.addListener(dataKey, groupKey, listener);}catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,groupKey), e);}}
至此, 清楚了获得的配置是如何生效的, 它将获得发生修改过的文件, 如果md5不一样了, 则执行监听器,通过applicationContext 更新配置到项目内存中
明明已经知道了哪些文件被修改了,为啥还有比对md5, 因为可能是没有修改具体内容,只是点了编辑并保存
md5用的是java的digest和位移,md5可能存在冲突, 那怎么解决冲突问题的? (虚心求教,请知晓的大佬点拨一二)
3. 总结
- 启动一个10ms执行一次单线程的定时线程池A, 来进行检查配置是否有更新
- 并再启用一个定时线程池B来并发执行多个文件修改的场景
- 在B线程池中,使用30s的长轮询机制主动向服务端(Nacos)查询哪些文件发生了变化
- 然后拿到这些变化的文件id等信息, 再次请求服务端(Nacos)拿到具体的配置内容,并写到内存中
- 经过检查md5后, 将这些配置内容通过spring的监听机制写到spring中
参考文章:
Long Polling长轮询详解 - 简书 (jianshu.com)
NACOS动态配置 - barryzhou - 博客园 (cnblogs.com)
spring boot 配置文件动态更新原理 以Nacos为例 - 二奎 - 博客园 (cnblogs.com)
Nacos的动态配置源码解析相关推荐
- 【kafka】Kafka中的动态配置源码分析
1.概述 2.源码分析 Broker启动加载动态配置 KafkaServer.startup 启动加载动态配置总流程 2.1 动态配置初始化 config.dynamicConfig.initiali ...
- (Nacos源码解析五)Nacos服务事件变动源码解析
Nacos源码解析系列目录 Nacos 源码编译运行 (Nacos源码解析一)Nacos 注册实例源码解析 (Nacos源码解析二)Nacos 服务发现源码解析 (Nacos源码解析三)Nacos 心 ...
- 8145v5 参数_SpringBoot外化配置源码解析:外化配置简介、参数处理|value|spring|调用|参数值
SpringBoot外化配置源码解析 在前面章节我们讲解了 Spring Boot 的运作核心原理及启动过程中进行的一系列核心操作. 从本章开始,我们将针对在实践过程中应用的不同知识点的源代码进行解读 ...
- Spring Boot Profile使用详解及配置源码解析
点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 作者 | 二师兄 来源 | 程序新视界 在实践的过程中我 ...
- springboot(二)自动化配置源码解析
@EnableAutoConfiguration 是开启自动配置的注解,在创建的 SpringBoot 项目中并不能直接看到此注解,它是由组合注解@SpringBootApplication 引入的. ...
- SpringBoot-自动配置-源码解析,Java高级程序员面试笔记宝典
我们挨个分析. []( )@SpringBootConfiguration 点进去我们发现,它就是一个Configuration @Configuration@Indexedpublic @inter ...
- [源码解析] PyTorch 分布式(2) ----- DataParallel(上)
[源码解析] PyTorch 分布式(2) ----- DataParallel(上) 文章目录 [源码解析] PyTorch 分布式(2) ----- DataParallel(上) 0x00 摘要 ...
- SpringBoot入门-源码解析(雷神)
一.Spring Boot入门 视频学习资料(雷神): https://www.bilibili.com/video/BV19K4y1L7MT?p=1 github: https://github.c ...
- [源码解析] TensorFlow 分布式之 ClusterCoordinator
[源码解析] TensorFlow 分布式之 ClusterCoordinator 文章目录 [源码解析] TensorFlow 分布式之 ClusterCoordinator 1. 思路 1.1 使 ...
最新文章
- springboot 多线程_SpringBoot异步调用@Async
- Spring.net使用说明
- linux opencv
- %@page contentType=text/html;charset=gbk%与meta http-equiv=Content-Type content=text/html; ch...
- HD 2602 Bone Collector (0-1背包)
- 在Windows 2000下优化Oracle9i性能[转]
- Binary String Reconstruction CodeForces - 1352F(思维+构造)
- [置顶] Android开发者官方网站文档 - 国内踏得网镜像
- 无法生成部件汇总表_RFID在汽车零部件企业仓储物流中的应用
- 不与最大数相同的数字之和(信息学奥赛一本通-T1113)
- fatal error: dynlink_nvcuvid.h: No such file or directory
- 【路径规划】基于matlab GUI蚁群算法求解机器人栅格地图最短路径规划问题【含Matlab源码 927期】
- 【读书笔记《Android游戏编程之从零开始》】17.游戏开发基础(游戏适屏的简述和作用、让游戏主角动起来)
- 通达oa系统怎么转移到服务器,通达OA升级心通达OA操作步骤规范
- 木子-前端-方法标签属性小记(普通jsp/html篇)2020-11-24
- 精致生活品味相伴,Barsetto百胜图BAC025B胶囊咖啡机测评
- iphone主屏幕动态壁纸_iPhoneXLivePhoto动态壁纸
- 停止抱怨英语_停止抱怨
- 数字化转型导师坚鹏:数字化时代企业管理变革与创新营销
- 【工业大数据】张洁教授现场剖析制造业大数据制造的思考与实践